encoder_decoder.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package sarama
  2. import (
  3. "fmt"
  4. "github.com/rcrowley/go-metrics"
  5. )
  6. // Encoder is the interface that wraps the basic Encode method.
  7. // Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
  8. type encoder interface {
  9. encode(pe packetEncoder) error
  10. }
  11. type encoderWithHeader interface {
  12. encoder
  13. headerVersion() int16
  14. }
  15. // Encode takes an Encoder and turns it into bytes while potentially recording metrics.
  16. func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
  17. if e == nil {
  18. return nil, nil
  19. }
  20. var prepEnc prepEncoder
  21. var realEnc realEncoder
  22. err := e.encode(&prepEnc)
  23. if err != nil {
  24. return nil, err
  25. }
  26. if prepEnc.length < 0 || prepEnc.length > int(MaxRequestSize) {
  27. return nil, PacketEncodingError{fmt.Sprintf("invalid request size (%d)", prepEnc.length)}
  28. }
  29. realEnc.raw = make([]byte, prepEnc.length)
  30. realEnc.registry = metricRegistry
  31. err = e.encode(&realEnc)
  32. if err != nil {
  33. return nil, err
  34. }
  35. return realEnc.raw, nil
  36. }
  37. // decoder is the interface that wraps the basic Decode method.
  38. // Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules.
  39. type decoder interface {
  40. decode(pd packetDecoder) error
  41. }
  42. type versionedDecoder interface {
  43. decode(pd packetDecoder, version int16) error
  44. }
  45. // decode takes bytes and a decoder and fills the fields of the decoder from the bytes,
  46. // interpreted using Kafka's encoding rules.
  47. func decode(buf []byte, in decoder) error {
  48. if buf == nil {
  49. return nil
  50. }
  51. helper := realDecoder{raw: buf}
  52. err := in.decode(&helper)
  53. if err != nil {
  54. return err
  55. }
  56. if helper.off != len(buf) {
  57. return PacketDecodingError{"invalid length"}
  58. }
  59. return nil
  60. }
  61. func versionedDecode(buf []byte, in versionedDecoder, version int16) error {
  62. if buf == nil {
  63. return nil
  64. }
  65. helper := realDecoder{raw: buf}
  66. err := in.decode(&helper, version)
  67. if err != nil {
  68. return err
  69. }
  70. if helper.off != len(buf) {
  71. return PacketDecodingError{"invalid length"}
  72. }
  73. return nil
  74. }