subscribe.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package packets
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. )
  7. // SubscribePacket is an internal representation of the fields of the
  8. // Subscribe MQTT packet
  9. type SubscribePacket struct {
  10. FixedHeader
  11. MessageID uint16
  12. Topics []string
  13. Qoss []byte
  14. }
  15. func (s *SubscribePacket) String() string {
  16. return fmt.Sprintf("%s MessageID: %d topics: %s", s.FixedHeader, s.MessageID, s.Topics)
  17. }
  18. func (s *SubscribePacket) Write(w io.Writer) error {
  19. var body bytes.Buffer
  20. var err error
  21. body.Write(encodeUint16(s.MessageID))
  22. for i, topic := range s.Topics {
  23. body.Write(encodeString(topic))
  24. body.WriteByte(s.Qoss[i])
  25. }
  26. s.FixedHeader.RemainingLength = body.Len()
  27. packet := s.FixedHeader.pack()
  28. packet.Write(body.Bytes())
  29. _, err = packet.WriteTo(w)
  30. return err
  31. }
  32. // Unpack decodes the details of a ControlPacket after the fixed
  33. // header has been read
  34. func (s *SubscribePacket) Unpack(b io.Reader) error {
  35. var err error
  36. s.MessageID, err = decodeUint16(b)
  37. if err != nil {
  38. return err
  39. }
  40. payloadLength := s.FixedHeader.RemainingLength - 2
  41. for payloadLength > 0 {
  42. topic, err := decodeString(b)
  43. if err != nil {
  44. return err
  45. }
  46. s.Topics = append(s.Topics, topic)
  47. qos, err := decodeByte(b)
  48. if err != nil {
  49. return err
  50. }
  51. s.Qoss = append(s.Qoss, qos)
  52. payloadLength -= 2 + len(topic) + 1 // 2 bytes of string length, plus string, plus 1 byte for Qos
  53. }
  54. return nil
  55. }
  56. // Details returns a Details struct containing the Qos and
  57. // MessageID of this ControlPacket
  58. func (s *SubscribePacket) Details() Details {
  59. return Details{Qos: 1, MessageID: s.MessageID}
  60. }