decoder.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. // Copyright 2019+ Klaus Post. All rights reserved.
  2. // License information can be found in the LICENSE file.
  3. // Based on work by Yann Collet, released under BSD License.
  4. package zstd
  5. import (
  6. "errors"
  7. "io"
  8. "sync"
  9. )
  10. // Decoder provides decoding of zstandard streams.
  11. // The decoder has been designed to operate without allocations after a warmup.
  12. // This means that you should store the decoder for best performance.
  13. // To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
  14. // A decoder can safely be re-used even if the previous stream failed.
  15. // To release the resources, you must call the Close() function on a decoder.
  16. type Decoder struct {
  17. o decoderOptions
  18. // Unreferenced decoders, ready for use.
  19. decoders chan *blockDec
  20. // Streams ready to be decoded.
  21. stream chan decodeStream
  22. // Current read position used for Reader functionality.
  23. current decoderState
  24. // Custom dictionaries.
  25. // Always uses copies.
  26. dicts map[uint32]dict
  27. // streamWg is the waitgroup for all streams
  28. streamWg sync.WaitGroup
  29. }
  30. // decoderState is used for maintaining state when the decoder
  31. // is used for streaming.
  32. type decoderState struct {
  33. // current block being written to stream.
  34. decodeOutput
  35. // output in order to be written to stream.
  36. output chan decodeOutput
  37. // cancel remaining output.
  38. cancel chan struct{}
  39. flushed bool
  40. }
  41. var (
  42. // Check the interfaces we want to support.
  43. _ = io.WriterTo(&Decoder{})
  44. _ = io.Reader(&Decoder{})
  45. )
  46. // NewReader creates a new decoder.
  47. // A nil Reader can be provided in which case Reset can be used to start a decode.
  48. //
  49. // A Decoder can be used in two modes:
  50. //
  51. // 1) As a stream, or
  52. // 2) For stateless decoding using DecodeAll.
  53. //
  54. // Only a single stream can be decoded concurrently, but the same decoder
  55. // can run multiple concurrent stateless decodes. It is even possible to
  56. // use stateless decodes while a stream is being decoded.
  57. //
  58. // The Reset function can be used to initiate a new stream, which is will considerably
  59. // reduce the allocations normally caused by NewReader.
  60. func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
  61. initPredefined()
  62. var d Decoder
  63. d.o.setDefault()
  64. for _, o := range opts {
  65. err := o(&d.o)
  66. if err != nil {
  67. return nil, err
  68. }
  69. }
  70. d.current.output = make(chan decodeOutput, d.o.concurrent)
  71. d.current.flushed = true
  72. if r == nil {
  73. d.current.err = ErrDecoderNilInput
  74. }
  75. // Transfer option dicts.
  76. d.dicts = make(map[uint32]dict, len(d.o.dicts))
  77. for _, dc := range d.o.dicts {
  78. d.dicts[dc.id] = dc
  79. }
  80. d.o.dicts = nil
  81. // Create decoders
  82. d.decoders = make(chan *blockDec, d.o.concurrent)
  83. for i := 0; i < d.o.concurrent; i++ {
  84. dec := newBlockDec(d.o.lowMem)
  85. dec.localFrame = newFrameDec(d.o)
  86. d.decoders <- dec
  87. }
  88. if r == nil {
  89. return &d, nil
  90. }
  91. return &d, d.Reset(r)
  92. }
  93. // Read bytes from the decompressed stream into p.
  94. // Returns the number of bytes written and any error that occurred.
  95. // When the stream is done, io.EOF will be returned.
  96. func (d *Decoder) Read(p []byte) (int, error) {
  97. var n int
  98. for {
  99. if len(d.current.b) > 0 {
  100. filled := copy(p, d.current.b)
  101. p = p[filled:]
  102. d.current.b = d.current.b[filled:]
  103. n += filled
  104. }
  105. if len(p) == 0 {
  106. break
  107. }
  108. if len(d.current.b) == 0 {
  109. // We have an error and no more data
  110. if d.current.err != nil {
  111. break
  112. }
  113. if !d.nextBlock(n == 0) {
  114. return n, nil
  115. }
  116. }
  117. }
  118. if len(d.current.b) > 0 {
  119. if debugDecoder {
  120. println("returning", n, "still bytes left:", len(d.current.b))
  121. }
  122. // Only return error at end of block
  123. return n, nil
  124. }
  125. if d.current.err != nil {
  126. d.drainOutput()
  127. }
  128. if debugDecoder {
  129. println("returning", n, d.current.err, len(d.decoders))
  130. }
  131. return n, d.current.err
  132. }
  133. // Reset will reset the decoder the supplied stream after the current has finished processing.
  134. // Note that this functionality cannot be used after Close has been called.
  135. // Reset can be called with a nil reader to release references to the previous reader.
  136. // After being called with a nil reader, no other operations than Reset or DecodeAll or Close
  137. // should be used.
  138. func (d *Decoder) Reset(r io.Reader) error {
  139. if d.current.err == ErrDecoderClosed {
  140. return d.current.err
  141. }
  142. d.drainOutput()
  143. if r == nil {
  144. d.current.err = ErrDecoderNilInput
  145. if len(d.current.b) > 0 {
  146. d.current.b = d.current.b[:0]
  147. }
  148. d.current.flushed = true
  149. return nil
  150. }
  151. // If bytes buffer and < 5MB, do sync decoding anyway.
  152. if bb, ok := r.(byter); ok && bb.Len() < 5<<20 {
  153. bb2 := bb
  154. if debugDecoder {
  155. println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
  156. }
  157. b := bb2.Bytes()
  158. var dst []byte
  159. if cap(d.current.b) > 0 {
  160. dst = d.current.b
  161. }
  162. dst, err := d.DecodeAll(b, dst[:0])
  163. if err == nil {
  164. err = io.EOF
  165. }
  166. d.current.b = dst
  167. d.current.err = err
  168. d.current.flushed = true
  169. if debugDecoder {
  170. println("sync decode to", len(dst), "bytes, err:", err)
  171. }
  172. return nil
  173. }
  174. if d.stream == nil {
  175. d.stream = make(chan decodeStream, 1)
  176. d.streamWg.Add(1)
  177. go d.startStreamDecoder(d.stream)
  178. }
  179. // Remove current block.
  180. d.current.decodeOutput = decodeOutput{}
  181. d.current.err = nil
  182. d.current.cancel = make(chan struct{})
  183. d.current.flushed = false
  184. d.current.d = nil
  185. d.stream <- decodeStream{
  186. r: r,
  187. output: d.current.output,
  188. cancel: d.current.cancel,
  189. }
  190. return nil
  191. }
  192. // drainOutput will drain the output until errEndOfStream is sent.
  193. func (d *Decoder) drainOutput() {
  194. if d.current.cancel != nil {
  195. println("cancelling current")
  196. close(d.current.cancel)
  197. d.current.cancel = nil
  198. }
  199. if d.current.d != nil {
  200. if debugDecoder {
  201. printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
  202. }
  203. d.decoders <- d.current.d
  204. d.current.d = nil
  205. d.current.b = nil
  206. }
  207. if d.current.output == nil || d.current.flushed {
  208. println("current already flushed")
  209. return
  210. }
  211. for v := range d.current.output {
  212. if v.d != nil {
  213. if debugDecoder {
  214. printf("re-adding decoder %p", v.d)
  215. }
  216. d.decoders <- v.d
  217. }
  218. if v.err == errEndOfStream {
  219. println("current flushed")
  220. d.current.flushed = true
  221. return
  222. }
  223. }
  224. }
  225. // WriteTo writes data to w until there's no more data to write or when an error occurs.
  226. // The return value n is the number of bytes written.
  227. // Any error encountered during the write is also returned.
  228. func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
  229. var n int64
  230. for {
  231. if len(d.current.b) > 0 {
  232. n2, err2 := w.Write(d.current.b)
  233. n += int64(n2)
  234. if err2 != nil && (d.current.err == nil || d.current.err == io.EOF) {
  235. d.current.err = err2
  236. } else if n2 != len(d.current.b) {
  237. d.current.err = io.ErrShortWrite
  238. }
  239. }
  240. if d.current.err != nil {
  241. break
  242. }
  243. d.nextBlock(true)
  244. }
  245. err := d.current.err
  246. if err != nil {
  247. d.drainOutput()
  248. }
  249. if err == io.EOF {
  250. err = nil
  251. }
  252. return n, err
  253. }
  254. // DecodeAll allows stateless decoding of a blob of bytes.
  255. // Output will be appended to dst, so if the destination size is known
  256. // you can pre-allocate the destination slice to avoid allocations.
  257. // DecodeAll can be used concurrently.
  258. // The Decoder concurrency limits will be respected.
  259. func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
  260. if d.current.err == ErrDecoderClosed {
  261. return dst, ErrDecoderClosed
  262. }
  263. // Grab a block decoder and frame decoder.
  264. block := <-d.decoders
  265. frame := block.localFrame
  266. defer func() {
  267. if debugDecoder {
  268. printf("re-adding decoder: %p", block)
  269. }
  270. frame.rawInput = nil
  271. frame.bBuf = nil
  272. d.decoders <- block
  273. }()
  274. frame.bBuf = input
  275. for {
  276. frame.history.reset()
  277. err := frame.reset(&frame.bBuf)
  278. if err == io.EOF {
  279. if debugDecoder {
  280. println("frame reset return EOF")
  281. }
  282. return dst, nil
  283. }
  284. if frame.DictionaryID != nil {
  285. dict, ok := d.dicts[*frame.DictionaryID]
  286. if !ok {
  287. return nil, ErrUnknownDictionary
  288. }
  289. frame.history.setDict(&dict)
  290. }
  291. if err != nil {
  292. return dst, err
  293. }
  294. if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)) {
  295. return dst, ErrDecoderSizeExceeded
  296. }
  297. if frame.FrameContentSize > 0 && frame.FrameContentSize < 1<<30 {
  298. // Never preallocate moe than 1 GB up front.
  299. if cap(dst)-len(dst) < int(frame.FrameContentSize) {
  300. dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize))
  301. copy(dst2, dst)
  302. dst = dst2
  303. }
  304. }
  305. if cap(dst) == 0 {
  306. // Allocate len(input) * 2 by default if nothing is provided
  307. // and we didn't get frame content size.
  308. size := len(input) * 2
  309. // Cap to 1 MB.
  310. if size > 1<<20 {
  311. size = 1 << 20
  312. }
  313. if uint64(size) > d.o.maxDecodedSize {
  314. size = int(d.o.maxDecodedSize)
  315. }
  316. dst = make([]byte, 0, size)
  317. }
  318. dst, err = frame.runDecoder(dst, block)
  319. if err != nil {
  320. return dst, err
  321. }
  322. if len(frame.bBuf) == 0 {
  323. if debugDecoder {
  324. println("frame dbuf empty")
  325. }
  326. break
  327. }
  328. }
  329. return dst, nil
  330. }
  331. // nextBlock returns the next block.
  332. // If an error occurs d.err will be set.
  333. // Optionally the function can block for new output.
  334. // If non-blocking mode is used the returned boolean will be false
  335. // if no data was available without blocking.
  336. func (d *Decoder) nextBlock(blocking bool) (ok bool) {
  337. if d.current.d != nil {
  338. if debugDecoder {
  339. printf("re-adding current decoder %p", d.current.d)
  340. }
  341. d.decoders <- d.current.d
  342. d.current.d = nil
  343. }
  344. if d.current.err != nil {
  345. // Keep error state.
  346. return blocking
  347. }
  348. if blocking {
  349. d.current.decodeOutput = <-d.current.output
  350. } else {
  351. select {
  352. case d.current.decodeOutput = <-d.current.output:
  353. default:
  354. return false
  355. }
  356. }
  357. if debugDecoder {
  358. println("got", len(d.current.b), "bytes, error:", d.current.err)
  359. }
  360. return true
  361. }
  362. // Close will release all resources.
  363. // It is NOT possible to reuse the decoder after this.
  364. func (d *Decoder) Close() {
  365. if d.current.err == ErrDecoderClosed {
  366. return
  367. }
  368. d.drainOutput()
  369. if d.stream != nil {
  370. close(d.stream)
  371. d.streamWg.Wait()
  372. d.stream = nil
  373. }
  374. if d.decoders != nil {
  375. close(d.decoders)
  376. for dec := range d.decoders {
  377. dec.Close()
  378. }
  379. d.decoders = nil
  380. }
  381. if d.current.d != nil {
  382. d.current.d.Close()
  383. d.current.d = nil
  384. }
  385. d.current.err = ErrDecoderClosed
  386. }
  387. // IOReadCloser returns the decoder as an io.ReadCloser for convenience.
  388. // Any changes to the decoder will be reflected, so the returned ReadCloser
  389. // can be reused along with the decoder.
  390. // io.WriterTo is also supported by the returned ReadCloser.
  391. func (d *Decoder) IOReadCloser() io.ReadCloser {
  392. return closeWrapper{d: d}
  393. }
  394. // closeWrapper wraps a function call as a closer.
  395. type closeWrapper struct {
  396. d *Decoder
  397. }
  398. // WriteTo forwards WriteTo calls to the decoder.
  399. func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
  400. return c.d.WriteTo(w)
  401. }
  402. // Read forwards read calls to the decoder.
  403. func (c closeWrapper) Read(p []byte) (n int, err error) {
  404. return c.d.Read(p)
  405. }
  406. // Close closes the decoder.
  407. func (c closeWrapper) Close() error {
  408. c.d.Close()
  409. return nil
  410. }
  411. type decodeOutput struct {
  412. d *blockDec
  413. b []byte
  414. err error
  415. }
  416. type decodeStream struct {
  417. r io.Reader
  418. // Blocks ready to be written to output.
  419. output chan decodeOutput
  420. // cancel reading from the input
  421. cancel chan struct{}
  422. }
  423. // errEndOfStream indicates that everything from the stream was read.
  424. var errEndOfStream = errors.New("end-of-stream")
  425. // Create Decoder:
  426. // Spawn n block decoders. These accept tasks to decode a block.
  427. // Create goroutine that handles stream processing, this will send history to decoders as they are available.
  428. // Decoders update the history as they decode.
  429. // When a block is returned:
  430. // a) history is sent to the next decoder,
  431. // b) content written to CRC.
  432. // c) return data to WRITER.
  433. // d) wait for next block to return data.
  434. // Once WRITTEN, the decoders reused by the writer frame decoder for re-use.
  435. func (d *Decoder) startStreamDecoder(inStream chan decodeStream) {
  436. defer d.streamWg.Done()
  437. frame := newFrameDec(d.o)
  438. for stream := range inStream {
  439. if debugDecoder {
  440. println("got new stream")
  441. }
  442. br := readerWrapper{r: stream.r}
  443. decodeStream:
  444. for {
  445. frame.history.reset()
  446. err := frame.reset(&br)
  447. if debugDecoder && err != nil {
  448. println("Frame decoder returned", err)
  449. }
  450. if err == nil && frame.DictionaryID != nil {
  451. dict, ok := d.dicts[*frame.DictionaryID]
  452. if !ok {
  453. err = ErrUnknownDictionary
  454. } else {
  455. frame.history.setDict(&dict)
  456. }
  457. }
  458. if err != nil {
  459. stream.output <- decodeOutput{
  460. err: err,
  461. }
  462. break
  463. }
  464. if debugDecoder {
  465. println("starting frame decoder")
  466. }
  467. // This goroutine will forward history between frames.
  468. frame.frameDone.Add(1)
  469. frame.initAsync()
  470. go frame.startDecoder(stream.output)
  471. decodeFrame:
  472. // Go through all blocks of the frame.
  473. for {
  474. dec := <-d.decoders
  475. select {
  476. case <-stream.cancel:
  477. if !frame.sendErr(dec, io.EOF) {
  478. // To not let the decoder dangle, send it back.
  479. stream.output <- decodeOutput{d: dec}
  480. }
  481. break decodeStream
  482. default:
  483. }
  484. err := frame.next(dec)
  485. switch err {
  486. case io.EOF:
  487. // End of current frame, no error
  488. println("EOF on next block")
  489. break decodeFrame
  490. case nil:
  491. continue
  492. default:
  493. println("block decoder returned", err)
  494. break decodeStream
  495. }
  496. }
  497. // All blocks have started decoding, check if there are more frames.
  498. println("waiting for done")
  499. frame.frameDone.Wait()
  500. println("done waiting...")
  501. }
  502. frame.frameDone.Wait()
  503. println("Sending EOS")
  504. stream.output <- decodeOutput{err: errEndOfStream}
  505. }
  506. }