fetch_response.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. package sarama
  2. import (
  3. "sort"
  4. "time"
  5. )
  6. const invalidPreferredReplicaID = -1
  7. type AbortedTransaction struct {
  8. ProducerID int64
  9. FirstOffset int64
  10. }
  11. func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
  12. if t.ProducerID, err = pd.getInt64(); err != nil {
  13. return err
  14. }
  15. if t.FirstOffset, err = pd.getInt64(); err != nil {
  16. return err
  17. }
  18. return nil
  19. }
  20. func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
  21. pe.putInt64(t.ProducerID)
  22. pe.putInt64(t.FirstOffset)
  23. return nil
  24. }
  25. type FetchResponseBlock struct {
  26. Err KError
  27. HighWaterMarkOffset int64
  28. LastStableOffset int64
  29. LastRecordsBatchOffset *int64
  30. LogStartOffset int64
  31. AbortedTransactions []*AbortedTransaction
  32. PreferredReadReplica int32
  33. Records *Records // deprecated: use FetchResponseBlock.RecordsSet
  34. RecordsSet []*Records
  35. Partial bool
  36. }
  37. func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
  38. tmp, err := pd.getInt16()
  39. if err != nil {
  40. return err
  41. }
  42. b.Err = KError(tmp)
  43. b.HighWaterMarkOffset, err = pd.getInt64()
  44. if err != nil {
  45. return err
  46. }
  47. if version >= 4 {
  48. b.LastStableOffset, err = pd.getInt64()
  49. if err != nil {
  50. return err
  51. }
  52. if version >= 5 {
  53. b.LogStartOffset, err = pd.getInt64()
  54. if err != nil {
  55. return err
  56. }
  57. }
  58. numTransact, err := pd.getArrayLength()
  59. if err != nil {
  60. return err
  61. }
  62. if numTransact >= 0 {
  63. b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
  64. }
  65. for i := 0; i < numTransact; i++ {
  66. transact := new(AbortedTransaction)
  67. if err = transact.decode(pd); err != nil {
  68. return err
  69. }
  70. b.AbortedTransactions[i] = transact
  71. }
  72. }
  73. if version >= 11 {
  74. b.PreferredReadReplica, err = pd.getInt32()
  75. if err != nil {
  76. return err
  77. }
  78. } else {
  79. b.PreferredReadReplica = -1
  80. }
  81. recordsSize, err := pd.getInt32()
  82. if err != nil {
  83. return err
  84. }
  85. recordsDecoder, err := pd.getSubset(int(recordsSize))
  86. if err != nil {
  87. return err
  88. }
  89. b.RecordsSet = []*Records{}
  90. for recordsDecoder.remaining() > 0 {
  91. records := &Records{}
  92. if err := records.decode(recordsDecoder); err != nil {
  93. // If we have at least one decoded records, this is not an error
  94. if err == ErrInsufficientData {
  95. if len(b.RecordsSet) == 0 {
  96. b.Partial = true
  97. }
  98. break
  99. }
  100. return err
  101. }
  102. b.LastRecordsBatchOffset, err = records.recordsOffset()
  103. if err != nil {
  104. return err
  105. }
  106. partial, err := records.isPartial()
  107. if err != nil {
  108. return err
  109. }
  110. n, err := records.numRecords()
  111. if err != nil {
  112. return err
  113. }
  114. if n > 0 || (partial && len(b.RecordsSet) == 0) {
  115. b.RecordsSet = append(b.RecordsSet, records)
  116. if b.Records == nil {
  117. b.Records = records
  118. }
  119. }
  120. overflow, err := records.isOverflow()
  121. if err != nil {
  122. return err
  123. }
  124. if partial || overflow {
  125. break
  126. }
  127. }
  128. return nil
  129. }
  130. func (b *FetchResponseBlock) numRecords() (int, error) {
  131. sum := 0
  132. for _, records := range b.RecordsSet {
  133. count, err := records.numRecords()
  134. if err != nil {
  135. return 0, err
  136. }
  137. sum += count
  138. }
  139. return sum, nil
  140. }
  141. func (b *FetchResponseBlock) isPartial() (bool, error) {
  142. if b.Partial {
  143. return true, nil
  144. }
  145. if len(b.RecordsSet) == 1 {
  146. return b.RecordsSet[0].isPartial()
  147. }
  148. return false, nil
  149. }
  150. func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
  151. pe.putInt16(int16(b.Err))
  152. pe.putInt64(b.HighWaterMarkOffset)
  153. if version >= 4 {
  154. pe.putInt64(b.LastStableOffset)
  155. if version >= 5 {
  156. pe.putInt64(b.LogStartOffset)
  157. }
  158. if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
  159. return err
  160. }
  161. for _, transact := range b.AbortedTransactions {
  162. if err = transact.encode(pe); err != nil {
  163. return err
  164. }
  165. }
  166. }
  167. if version >= 11 {
  168. pe.putInt32(b.PreferredReadReplica)
  169. }
  170. pe.push(&lengthField{})
  171. for _, records := range b.RecordsSet {
  172. err = records.encode(pe)
  173. if err != nil {
  174. return err
  175. }
  176. }
  177. return pe.pop()
  178. }
  179. func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
  180. // I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
  181. // plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
  182. at := b.AbortedTransactions
  183. sort.Slice(
  184. at,
  185. func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
  186. )
  187. return at
  188. }
  189. type FetchResponse struct {
  190. Blocks map[string]map[int32]*FetchResponseBlock
  191. ThrottleTime time.Duration
  192. ErrorCode int16
  193. SessionID int32
  194. Version int16
  195. LogAppendTime bool
  196. Timestamp time.Time
  197. }
  198. func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
  199. r.Version = version
  200. if r.Version >= 1 {
  201. throttle, err := pd.getInt32()
  202. if err != nil {
  203. return err
  204. }
  205. r.ThrottleTime = time.Duration(throttle) * time.Millisecond
  206. }
  207. if r.Version >= 7 {
  208. r.ErrorCode, err = pd.getInt16()
  209. if err != nil {
  210. return err
  211. }
  212. r.SessionID, err = pd.getInt32()
  213. if err != nil {
  214. return err
  215. }
  216. }
  217. numTopics, err := pd.getArrayLength()
  218. if err != nil {
  219. return err
  220. }
  221. r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
  222. for i := 0; i < numTopics; i++ {
  223. name, err := pd.getString()
  224. if err != nil {
  225. return err
  226. }
  227. numBlocks, err := pd.getArrayLength()
  228. if err != nil {
  229. return err
  230. }
  231. r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
  232. for j := 0; j < numBlocks; j++ {
  233. id, err := pd.getInt32()
  234. if err != nil {
  235. return err
  236. }
  237. block := new(FetchResponseBlock)
  238. err = block.decode(pd, version)
  239. if err != nil {
  240. return err
  241. }
  242. r.Blocks[name][id] = block
  243. }
  244. }
  245. return nil
  246. }
  247. func (r *FetchResponse) encode(pe packetEncoder) (err error) {
  248. if r.Version >= 1 {
  249. pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
  250. }
  251. if r.Version >= 7 {
  252. pe.putInt16(r.ErrorCode)
  253. pe.putInt32(r.SessionID)
  254. }
  255. err = pe.putArrayLength(len(r.Blocks))
  256. if err != nil {
  257. return err
  258. }
  259. for topic, partitions := range r.Blocks {
  260. err = pe.putString(topic)
  261. if err != nil {
  262. return err
  263. }
  264. err = pe.putArrayLength(len(partitions))
  265. if err != nil {
  266. return err
  267. }
  268. for id, block := range partitions {
  269. pe.putInt32(id)
  270. err = block.encode(pe, r.Version)
  271. if err != nil {
  272. return err
  273. }
  274. }
  275. }
  276. return nil
  277. }
  278. func (r *FetchResponse) key() int16 {
  279. return 1
  280. }
  281. func (r *FetchResponse) version() int16 {
  282. return r.Version
  283. }
  284. func (r *FetchResponse) headerVersion() int16 {
  285. return 0
  286. }
  287. func (r *FetchResponse) requiredVersion() KafkaVersion {
  288. switch r.Version {
  289. case 0:
  290. return MinVersion
  291. case 1:
  292. return V0_9_0_0
  293. case 2:
  294. return V0_10_0_0
  295. case 3:
  296. return V0_10_1_0
  297. case 4, 5:
  298. return V0_11_0_0
  299. case 6:
  300. return V1_0_0_0
  301. case 7:
  302. return V1_1_0_0
  303. case 8:
  304. return V2_0_0_0
  305. case 9, 10:
  306. return V2_1_0_0
  307. case 11:
  308. return V2_3_0_0
  309. default:
  310. return MaxVersion
  311. }
  312. }
  313. func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
  314. if r.Blocks == nil {
  315. return nil
  316. }
  317. if r.Blocks[topic] == nil {
  318. return nil
  319. }
  320. return r.Blocks[topic][partition]
  321. }
  322. func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
  323. if r.Blocks == nil {
  324. r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
  325. }
  326. partitions, ok := r.Blocks[topic]
  327. if !ok {
  328. partitions = make(map[int32]*FetchResponseBlock)
  329. r.Blocks[topic] = partitions
  330. }
  331. frb, ok := partitions[partition]
  332. if !ok {
  333. frb = new(FetchResponseBlock)
  334. partitions[partition] = frb
  335. }
  336. frb.Err = err
  337. }
  338. func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
  339. if r.Blocks == nil {
  340. r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
  341. }
  342. partitions, ok := r.Blocks[topic]
  343. if !ok {
  344. partitions = make(map[int32]*FetchResponseBlock)
  345. r.Blocks[topic] = partitions
  346. }
  347. frb, ok := partitions[partition]
  348. if !ok {
  349. frb = new(FetchResponseBlock)
  350. partitions[partition] = frb
  351. }
  352. return frb
  353. }
  354. func encodeKV(key, value Encoder) ([]byte, []byte) {
  355. var kb []byte
  356. var vb []byte
  357. if key != nil {
  358. kb, _ = key.Encode()
  359. }
  360. if value != nil {
  361. vb, _ = value.Encode()
  362. }
  363. return kb, vb
  364. }
  365. func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
  366. frb := r.getOrCreateBlock(topic, partition)
  367. kb, vb := encodeKV(key, value)
  368. if r.LogAppendTime {
  369. timestamp = r.Timestamp
  370. }
  371. msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
  372. msgBlock := &MessageBlock{Msg: msg, Offset: offset}
  373. if len(frb.RecordsSet) == 0 {
  374. records := newLegacyRecords(&MessageSet{})
  375. frb.RecordsSet = []*Records{&records}
  376. }
  377. set := frb.RecordsSet[0].MsgSet
  378. set.Messages = append(set.Messages, msgBlock)
  379. }
  380. func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
  381. frb := r.getOrCreateBlock(topic, partition)
  382. kb, vb := encodeKV(key, value)
  383. if len(frb.RecordsSet) == 0 {
  384. records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
  385. frb.RecordsSet = []*Records{&records}
  386. }
  387. batch := frb.RecordsSet[0].RecordBatch
  388. rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  389. batch.addRecord(rec)
  390. }
  391. // AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
  392. // But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
  393. // Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
  394. func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
  395. frb := r.getOrCreateBlock(topic, partition)
  396. kb, vb := encodeKV(key, value)
  397. records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
  398. batch := &RecordBatch{
  399. Version: 2,
  400. LogAppendTime: r.LogAppendTime,
  401. FirstTimestamp: timestamp,
  402. MaxTimestamp: r.Timestamp,
  403. FirstOffset: offset,
  404. LastOffsetDelta: 0,
  405. ProducerID: producerID,
  406. IsTransactional: isTransactional,
  407. }
  408. rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  409. batch.addRecord(rec)
  410. records.RecordBatch = batch
  411. frb.RecordsSet = append(frb.RecordsSet, &records)
  412. }
  413. func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
  414. frb := r.getOrCreateBlock(topic, partition)
  415. // batch
  416. batch := &RecordBatch{
  417. Version: 2,
  418. LogAppendTime: r.LogAppendTime,
  419. FirstTimestamp: timestamp,
  420. MaxTimestamp: r.Timestamp,
  421. FirstOffset: offset,
  422. LastOffsetDelta: 0,
  423. ProducerID: producerID,
  424. IsTransactional: true,
  425. Control: true,
  426. }
  427. // records
  428. records := newDefaultRecords(nil)
  429. records.RecordBatch = batch
  430. // record
  431. crAbort := ControlRecord{
  432. Version: 0,
  433. Type: recordType,
  434. }
  435. crKey := &realEncoder{raw: make([]byte, 4)}
  436. crValue := &realEncoder{raw: make([]byte, 6)}
  437. crAbort.encode(crKey, crValue)
  438. rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
  439. batch.addRecord(rec)
  440. frb.RecordsSet = append(frb.RecordsSet, &records)
  441. }
  442. func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
  443. r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
  444. }
  445. func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
  446. r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
  447. }
  448. func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
  449. r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
  450. }
  451. func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
  452. // define controlRecord key and value
  453. r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
  454. }
  455. func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
  456. frb := r.getOrCreateBlock(topic, partition)
  457. if len(frb.RecordsSet) == 0 {
  458. records := newDefaultRecords(&RecordBatch{Version: 2})
  459. frb.RecordsSet = []*Records{&records}
  460. }
  461. batch := frb.RecordsSet[0].RecordBatch
  462. batch.LastOffsetDelta = offset
  463. }
  464. func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
  465. frb := r.getOrCreateBlock(topic, partition)
  466. frb.LastStableOffset = offset
  467. }