message.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. /*
  2. * Copyright (c) 2013 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Seth Hoenig
  11. * Allan Stockdill-Mander
  12. * Mike Robertson
  13. */
  14. package mqtt
  15. import (
  16. "net/url"
  17. "sync"
  18. "github.com/eclipse/paho.mqtt.golang/packets"
  19. )
  20. // Message defines the externals that a message implementation must support
  21. // these are received messages that are passed to the callbacks, not internal
  22. // messages
  23. type Message interface {
  24. Duplicate() bool
  25. Qos() byte
  26. Retained() bool
  27. Topic() string
  28. MessageID() uint16
  29. Payload() []byte
  30. Ack()
  31. }
  32. type message struct {
  33. duplicate bool
  34. qos byte
  35. retained bool
  36. topic string
  37. messageID uint16
  38. payload []byte
  39. once sync.Once
  40. ack func()
  41. }
  42. func (m *message) Duplicate() bool {
  43. return m.duplicate
  44. }
  45. func (m *message) Qos() byte {
  46. return m.qos
  47. }
  48. func (m *message) Retained() bool {
  49. return m.retained
  50. }
  51. func (m *message) Topic() string {
  52. return m.topic
  53. }
  54. func (m *message) MessageID() uint16 {
  55. return m.messageID
  56. }
  57. func (m *message) Payload() []byte {
  58. return m.payload
  59. }
  60. func (m *message) Ack() {
  61. m.once.Do(m.ack)
  62. }
  63. func messageFromPublish(p *packets.PublishPacket, ack func()) Message {
  64. return &message{
  65. duplicate: p.Dup,
  66. qos: p.Qos,
  67. retained: p.Retain,
  68. topic: p.TopicName,
  69. messageID: p.MessageID,
  70. payload: p.Payload,
  71. ack: ack,
  72. }
  73. }
  74. func newConnectMsgFromOptions(options *ClientOptions, broker *url.URL) *packets.ConnectPacket {
  75. m := packets.NewControlPacket(packets.Connect).(*packets.ConnectPacket)
  76. m.CleanSession = options.CleanSession
  77. m.WillFlag = options.WillEnabled
  78. m.WillRetain = options.WillRetained
  79. m.ClientIdentifier = options.ClientID
  80. if options.WillEnabled {
  81. m.WillQos = options.WillQos
  82. m.WillTopic = options.WillTopic
  83. m.WillMessage = options.WillPayload
  84. }
  85. username := options.Username
  86. password := options.Password
  87. if broker.User != nil {
  88. username = broker.User.Username()
  89. if pwd, ok := broker.User.Password(); ok {
  90. password = pwd
  91. }
  92. }
  93. if options.CredentialsProvider != nil {
  94. username, password = options.CredentialsProvider()
  95. }
  96. if username != "" {
  97. m.UsernameFlag = true
  98. m.Username = username
  99. // mustn't have password without user as well
  100. if password != "" {
  101. m.PasswordFlag = true
  102. m.Password = []byte(password)
  103. }
  104. }
  105. m.Keepalive = uint16(options.KeepAlive)
  106. return m
  107. }