websocket.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package mqtt
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. "io"
  6. "net"
  7. "net/http"
  8. "net/url"
  9. "sync"
  10. "time"
  11. "github.com/gorilla/websocket"
  12. )
  13. // WebsocketOptions are config options for a websocket dialer
  14. type WebsocketOptions struct {
  15. ReadBufferSize int
  16. WriteBufferSize int
  17. Proxy ProxyFunction
  18. }
  19. type ProxyFunction func(req *http.Request) (*url.URL, error)
  20. // NewWebsocket returns a new websocket and returns a net.Conn compatible interface using the gorilla/websocket package
  21. func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestHeader http.Header, options *WebsocketOptions) (net.Conn, error) {
  22. if timeout == 0 {
  23. timeout = 10 * time.Second
  24. }
  25. if options == nil {
  26. // Apply default options
  27. options = &WebsocketOptions{}
  28. }
  29. if options.Proxy == nil {
  30. options.Proxy = http.ProxyFromEnvironment
  31. }
  32. dialer := &websocket.Dialer{
  33. Proxy: options.Proxy,
  34. HandshakeTimeout: timeout,
  35. EnableCompression: false,
  36. TLSClientConfig: tlsc,
  37. Subprotocols: []string{"mqtt"},
  38. ReadBufferSize: options.ReadBufferSize,
  39. WriteBufferSize: options.WriteBufferSize,
  40. }
  41. ws, resp, err := dialer.Dial(host, requestHeader)
  42. if err != nil {
  43. if resp != nil {
  44. WARN.Println(CLI, fmt.Sprintf("Websocket handshake failure. StatusCode: %d. Body: %s", resp.StatusCode, resp.Body))
  45. }
  46. return nil, err
  47. }
  48. wrapper := &websocketConnector{
  49. Conn: ws,
  50. }
  51. return wrapper, err
  52. }
  53. // websocketConnector is a websocket wrapper so it satisfies the net.Conn interface so it is a
  54. // drop in replacement of the golang.org/x/net/websocket package.
  55. // Implementation guide taken from https://github.com/gorilla/websocket/issues/282
  56. type websocketConnector struct {
  57. *websocket.Conn
  58. r io.Reader
  59. rio sync.Mutex
  60. wio sync.Mutex
  61. }
  62. // SetDeadline sets both the read and write deadlines
  63. func (c *websocketConnector) SetDeadline(t time.Time) error {
  64. if err := c.SetReadDeadline(t); err != nil {
  65. return err
  66. }
  67. err := c.SetWriteDeadline(t)
  68. return err
  69. }
  70. // Write writes data to the websocket
  71. func (c *websocketConnector) Write(p []byte) (int, error) {
  72. c.wio.Lock()
  73. defer c.wio.Unlock()
  74. err := c.WriteMessage(websocket.BinaryMessage, p)
  75. if err != nil {
  76. return 0, err
  77. }
  78. return len(p), nil
  79. }
  80. // Read reads the current websocket frame
  81. func (c *websocketConnector) Read(p []byte) (int, error) {
  82. c.rio.Lock()
  83. defer c.rio.Unlock()
  84. for {
  85. if c.r == nil {
  86. // Advance to next message.
  87. var err error
  88. _, c.r, err = c.NextReader()
  89. if err != nil {
  90. return 0, err
  91. }
  92. }
  93. n, err := c.r.Read(p)
  94. if err == io.EOF {
  95. // At end of message.
  96. c.r = nil
  97. if n > 0 {
  98. return n, nil
  99. }
  100. // No data read, continue to next message.
  101. continue
  102. }
  103. return n, err
  104. }
  105. }