12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- package sarama
- import (
- "bytes"
- "compress/gzip"
- "fmt"
- "io"
- "sync"
- snappy "github.com/eapache/go-xerial-snappy"
- "github.com/pierrec/lz4"
- )
- var (
- lz4ReaderPool = sync.Pool{
- New: func() interface{} {
- return lz4.NewReader(nil)
- },
- }
- gzipReaderPool sync.Pool
- )
- func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
- switch cc {
- case CompressionNone:
- return data, nil
- case CompressionGZIP:
- var err error
- reader, ok := gzipReaderPool.Get().(*gzip.Reader)
- if !ok {
- reader, err = gzip.NewReader(bytes.NewReader(data))
- } else {
- err = reader.Reset(bytes.NewReader(data))
- }
- if err != nil {
- return nil, err
- }
- defer gzipReaderPool.Put(reader)
- return io.ReadAll(reader)
- case CompressionSnappy:
- return snappy.Decode(data)
- case CompressionLZ4:
- reader, ok := lz4ReaderPool.Get().(*lz4.Reader)
- if !ok {
- reader = lz4.NewReader(bytes.NewReader(data))
- } else {
- reader.Reset(bytes.NewReader(data))
- }
- defer lz4ReaderPool.Put(reader)
- return io.ReadAll(reader)
- case CompressionZSTD:
- return zstdDecompress(ZstdDecoderParams{}, nil, data)
- default:
- return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)}
- }
- }
|