decompress.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "sync"
  8. snappy "github.com/eapache/go-xerial-snappy"
  9. "github.com/pierrec/lz4"
  10. )
  11. var (
  12. lz4ReaderPool = sync.Pool{
  13. New: func() interface{} {
  14. return lz4.NewReader(nil)
  15. },
  16. }
  17. gzipReaderPool sync.Pool
  18. )
  19. func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
  20. switch cc {
  21. case CompressionNone:
  22. return data, nil
  23. case CompressionGZIP:
  24. var err error
  25. reader, ok := gzipReaderPool.Get().(*gzip.Reader)
  26. if !ok {
  27. reader, err = gzip.NewReader(bytes.NewReader(data))
  28. } else {
  29. err = reader.Reset(bytes.NewReader(data))
  30. }
  31. if err != nil {
  32. return nil, err
  33. }
  34. defer gzipReaderPool.Put(reader)
  35. return io.ReadAll(reader)
  36. case CompressionSnappy:
  37. return snappy.Decode(data)
  38. case CompressionLZ4:
  39. reader, ok := lz4ReaderPool.Get().(*lz4.Reader)
  40. if !ok {
  41. reader = lz4.NewReader(bytes.NewReader(data))
  42. } else {
  43. reader.Reset(bytes.NewReader(data))
  44. }
  45. defer lz4ReaderPool.Put(reader)
  46. return io.ReadAll(reader)
  47. case CompressionZSTD:
  48. return zstdDecompress(ZstdDecoderParams{}, nil, data)
  49. default:
  50. return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)}
  51. }
  52. }