mockresponses.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332
  1. package sarama
  2. import (
  3. "fmt"
  4. "strings"
  5. )
  6. // TestReporter has methods matching go's testing.T to avoid importing
  7. // `testing` in the main part of the library.
  8. type TestReporter interface {
  9. Error(...interface{})
  10. Errorf(string, ...interface{})
  11. Fatal(...interface{})
  12. Fatalf(string, ...interface{})
  13. }
  14. // MockResponse is a response builder interface it defines one method that
  15. // allows generating a response based on a request body. MockResponses are used
  16. // to program behavior of MockBroker in tests.
  17. type MockResponse interface {
  18. For(reqBody versionedDecoder) (res encoderWithHeader)
  19. }
  20. // MockWrapper is a mock response builder that returns a particular concrete
  21. // response regardless of the actual request passed to the `For` method.
  22. type MockWrapper struct {
  23. res encoderWithHeader
  24. }
  25. func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {
  26. return mw.res
  27. }
  28. func NewMockWrapper(res encoderWithHeader) *MockWrapper {
  29. return &MockWrapper{res: res}
  30. }
  31. // MockSequence is a mock response builder that is created from a sequence of
  32. // concrete responses. Every time when a `MockBroker` calls its `For` method
  33. // the next response from the sequence is returned. When the end of the
  34. // sequence is reached the last element from the sequence is returned.
  35. type MockSequence struct {
  36. responses []MockResponse
  37. }
  38. func NewMockSequence(responses ...interface{}) *MockSequence {
  39. ms := &MockSequence{}
  40. ms.responses = make([]MockResponse, len(responses))
  41. for i, res := range responses {
  42. switch res := res.(type) {
  43. case MockResponse:
  44. ms.responses[i] = res
  45. case encoderWithHeader:
  46. ms.responses[i] = NewMockWrapper(res)
  47. default:
  48. panic(fmt.Sprintf("Unexpected response type: %T", res))
  49. }
  50. }
  51. return ms
  52. }
  53. func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {
  54. res = mc.responses[0].For(reqBody)
  55. if len(mc.responses) > 1 {
  56. mc.responses = mc.responses[1:]
  57. }
  58. return res
  59. }
  60. type MockListGroupsResponse struct {
  61. groups map[string]string
  62. t TestReporter
  63. }
  64. func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
  65. return &MockListGroupsResponse{
  66. groups: make(map[string]string),
  67. t: t,
  68. }
  69. }
  70. func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  71. request := reqBody.(*ListGroupsRequest)
  72. _ = request
  73. response := &ListGroupsResponse{
  74. Groups: m.groups,
  75. }
  76. return response
  77. }
  78. func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
  79. m.groups[groupID] = protocolType
  80. return m
  81. }
  82. type MockDescribeGroupsResponse struct {
  83. groups map[string]*GroupDescription
  84. t TestReporter
  85. }
  86. func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
  87. return &MockDescribeGroupsResponse{
  88. t: t,
  89. groups: make(map[string]*GroupDescription),
  90. }
  91. }
  92. func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
  93. m.groups[groupID] = description
  94. return m
  95. }
  96. func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  97. request := reqBody.(*DescribeGroupsRequest)
  98. response := &DescribeGroupsResponse{}
  99. for _, requestedGroup := range request.Groups {
  100. if group, ok := m.groups[requestedGroup]; ok {
  101. response.Groups = append(response.Groups, group)
  102. } else {
  103. // Mimic real kafka - if a group doesn't exist, return
  104. // an entry with state "Dead"
  105. response.Groups = append(response.Groups, &GroupDescription{
  106. GroupId: requestedGroup,
  107. State: "Dead",
  108. })
  109. }
  110. }
  111. return response
  112. }
  113. // MockMetadataResponse is a `MetadataResponse` builder.
  114. type MockMetadataResponse struct {
  115. controllerID int32
  116. leaders map[string]map[int32]int32
  117. brokers map[string]int32
  118. t TestReporter
  119. }
  120. func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
  121. return &MockMetadataResponse{
  122. leaders: make(map[string]map[int32]int32),
  123. brokers: make(map[string]int32),
  124. t: t,
  125. }
  126. }
  127. func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
  128. partitions := mmr.leaders[topic]
  129. if partitions == nil {
  130. partitions = make(map[int32]int32)
  131. mmr.leaders[topic] = partitions
  132. }
  133. partitions[partition] = brokerID
  134. return mmr
  135. }
  136. func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
  137. mmr.brokers[addr] = brokerID
  138. return mmr
  139. }
  140. func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
  141. mmr.controllerID = brokerID
  142. return mmr
  143. }
  144. func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
  145. metadataRequest := reqBody.(*MetadataRequest)
  146. metadataResponse := &MetadataResponse{
  147. Version: metadataRequest.version(),
  148. ControllerID: mmr.controllerID,
  149. }
  150. for addr, brokerID := range mmr.brokers {
  151. metadataResponse.AddBroker(addr, brokerID)
  152. }
  153. // Generate set of replicas
  154. var replicas []int32
  155. var offlineReplicas []int32
  156. for _, brokerID := range mmr.brokers {
  157. replicas = append(replicas, brokerID)
  158. }
  159. if len(metadataRequest.Topics) == 0 {
  160. for topic, partitions := range mmr.leaders {
  161. for partition, brokerID := range partitions {
  162. metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
  163. }
  164. }
  165. return metadataResponse
  166. }
  167. for _, topic := range metadataRequest.Topics {
  168. for partition, brokerID := range mmr.leaders[topic] {
  169. metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
  170. }
  171. }
  172. return metadataResponse
  173. }
  174. // MockOffsetResponse is an `OffsetResponse` builder.
  175. type MockOffsetResponse struct {
  176. offsets map[string]map[int32]map[int64]int64
  177. t TestReporter
  178. version int16
  179. }
  180. func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
  181. return &MockOffsetResponse{
  182. offsets: make(map[string]map[int32]map[int64]int64),
  183. t: t,
  184. }
  185. }
  186. func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
  187. mor.version = version
  188. return mor
  189. }
  190. func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
  191. partitions := mor.offsets[topic]
  192. if partitions == nil {
  193. partitions = make(map[int32]map[int64]int64)
  194. mor.offsets[topic] = partitions
  195. }
  196. times := partitions[partition]
  197. if times == nil {
  198. times = make(map[int64]int64)
  199. partitions[partition] = times
  200. }
  201. times[time] = offset
  202. return mor
  203. }
  204. func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
  205. offsetRequest := reqBody.(*OffsetRequest)
  206. offsetResponse := &OffsetResponse{Version: mor.version}
  207. for topic, partitions := range offsetRequest.blocks {
  208. for partition, block := range partitions {
  209. offset := mor.getOffset(topic, partition, block.time)
  210. offsetResponse.AddTopicPartition(topic, partition, offset)
  211. }
  212. }
  213. return offsetResponse
  214. }
  215. func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
  216. partitions := mor.offsets[topic]
  217. if partitions == nil {
  218. mor.t.Errorf("missing topic: %s", topic)
  219. }
  220. times := partitions[partition]
  221. if times == nil {
  222. mor.t.Errorf("missing partition: %d", partition)
  223. }
  224. offset, ok := times[time]
  225. if !ok {
  226. mor.t.Errorf("missing time: %d", time)
  227. }
  228. return offset
  229. }
  230. // MockFetchResponse is a `FetchResponse` builder.
  231. type MockFetchResponse struct {
  232. messages map[string]map[int32]map[int64]Encoder
  233. highWaterMarks map[string]map[int32]int64
  234. t TestReporter
  235. batchSize int
  236. version int16
  237. }
  238. func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
  239. return &MockFetchResponse{
  240. messages: make(map[string]map[int32]map[int64]Encoder),
  241. highWaterMarks: make(map[string]map[int32]int64),
  242. t: t,
  243. batchSize: batchSize,
  244. }
  245. }
  246. func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
  247. mfr.version = version
  248. return mfr
  249. }
  250. func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
  251. partitions := mfr.messages[topic]
  252. if partitions == nil {
  253. partitions = make(map[int32]map[int64]Encoder)
  254. mfr.messages[topic] = partitions
  255. }
  256. messages := partitions[partition]
  257. if messages == nil {
  258. messages = make(map[int64]Encoder)
  259. partitions[partition] = messages
  260. }
  261. messages[offset] = msg
  262. return mfr
  263. }
  264. func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
  265. partitions := mfr.highWaterMarks[topic]
  266. if partitions == nil {
  267. partitions = make(map[int32]int64)
  268. mfr.highWaterMarks[topic] = partitions
  269. }
  270. partitions[partition] = offset
  271. return mfr
  272. }
  273. func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
  274. fetchRequest := reqBody.(*FetchRequest)
  275. res := &FetchResponse{
  276. Version: mfr.version,
  277. }
  278. for topic, partitions := range fetchRequest.blocks {
  279. for partition, block := range partitions {
  280. initialOffset := block.fetchOffset
  281. offset := initialOffset
  282. maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
  283. for i := 0; i < mfr.batchSize && offset < maxOffset; {
  284. msg := mfr.getMessage(topic, partition, offset)
  285. if msg != nil {
  286. res.AddMessage(topic, partition, nil, msg, offset)
  287. i++
  288. }
  289. offset++
  290. }
  291. fb := res.GetBlock(topic, partition)
  292. if fb == nil {
  293. res.AddError(topic, partition, ErrNoError)
  294. fb = res.GetBlock(topic, partition)
  295. }
  296. fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
  297. }
  298. }
  299. return res
  300. }
  301. func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
  302. partitions := mfr.messages[topic]
  303. if partitions == nil {
  304. return nil
  305. }
  306. messages := partitions[partition]
  307. if messages == nil {
  308. return nil
  309. }
  310. return messages[offset]
  311. }
  312. func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
  313. partitions := mfr.messages[topic]
  314. if partitions == nil {
  315. return 0
  316. }
  317. messages := partitions[partition]
  318. if messages == nil {
  319. return 0
  320. }
  321. return len(messages)
  322. }
  323. func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
  324. partitions := mfr.highWaterMarks[topic]
  325. if partitions == nil {
  326. return 0
  327. }
  328. return partitions[partition]
  329. }
  330. // MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
  331. type MockConsumerMetadataResponse struct {
  332. coordinators map[string]interface{}
  333. t TestReporter
  334. }
  335. func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
  336. return &MockConsumerMetadataResponse{
  337. coordinators: make(map[string]interface{}),
  338. t: t,
  339. }
  340. }
  341. func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
  342. mr.coordinators[group] = broker
  343. return mr
  344. }
  345. func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
  346. mr.coordinators[group] = kerror
  347. return mr
  348. }
  349. func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
  350. req := reqBody.(*ConsumerMetadataRequest)
  351. group := req.ConsumerGroup
  352. res := &ConsumerMetadataResponse{}
  353. v := mr.coordinators[group]
  354. switch v := v.(type) {
  355. case *MockBroker:
  356. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  357. case KError:
  358. res.Err = v
  359. }
  360. return res
  361. }
  362. // MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
  363. type MockFindCoordinatorResponse struct {
  364. groupCoordinators map[string]interface{}
  365. transCoordinators map[string]interface{}
  366. t TestReporter
  367. }
  368. func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
  369. return &MockFindCoordinatorResponse{
  370. groupCoordinators: make(map[string]interface{}),
  371. transCoordinators: make(map[string]interface{}),
  372. t: t,
  373. }
  374. }
  375. func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
  376. switch coordinatorType {
  377. case CoordinatorGroup:
  378. mr.groupCoordinators[group] = broker
  379. case CoordinatorTransaction:
  380. mr.transCoordinators[group] = broker
  381. }
  382. return mr
  383. }
  384. func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
  385. switch coordinatorType {
  386. case CoordinatorGroup:
  387. mr.groupCoordinators[group] = kerror
  388. case CoordinatorTransaction:
  389. mr.transCoordinators[group] = kerror
  390. }
  391. return mr
  392. }
  393. func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {
  394. req := reqBody.(*FindCoordinatorRequest)
  395. res := &FindCoordinatorResponse{}
  396. var v interface{}
  397. switch req.CoordinatorType {
  398. case CoordinatorGroup:
  399. v = mr.groupCoordinators[req.CoordinatorKey]
  400. case CoordinatorTransaction:
  401. v = mr.transCoordinators[req.CoordinatorKey]
  402. }
  403. switch v := v.(type) {
  404. case *MockBroker:
  405. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  406. case KError:
  407. res.Err = v
  408. }
  409. return res
  410. }
  411. // MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
  412. type MockOffsetCommitResponse struct {
  413. errors map[string]map[string]map[int32]KError
  414. t TestReporter
  415. }
  416. func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
  417. return &MockOffsetCommitResponse{t: t}
  418. }
  419. func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
  420. if mr.errors == nil {
  421. mr.errors = make(map[string]map[string]map[int32]KError)
  422. }
  423. topics := mr.errors[group]
  424. if topics == nil {
  425. topics = make(map[string]map[int32]KError)
  426. mr.errors[group] = topics
  427. }
  428. partitions := topics[topic]
  429. if partitions == nil {
  430. partitions = make(map[int32]KError)
  431. topics[topic] = partitions
  432. }
  433. partitions[partition] = kerror
  434. return mr
  435. }
  436. func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {
  437. req := reqBody.(*OffsetCommitRequest)
  438. group := req.ConsumerGroup
  439. res := &OffsetCommitResponse{}
  440. for topic, partitions := range req.blocks {
  441. for partition := range partitions {
  442. res.AddError(topic, partition, mr.getError(group, topic, partition))
  443. }
  444. }
  445. return res
  446. }
  447. func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
  448. topics := mr.errors[group]
  449. if topics == nil {
  450. return ErrNoError
  451. }
  452. partitions := topics[topic]
  453. if partitions == nil {
  454. return ErrNoError
  455. }
  456. kerror, ok := partitions[partition]
  457. if !ok {
  458. return ErrNoError
  459. }
  460. return kerror
  461. }
  462. // MockProduceResponse is a `ProduceResponse` builder.
  463. type MockProduceResponse struct {
  464. version int16
  465. errors map[string]map[int32]KError
  466. t TestReporter
  467. }
  468. func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
  469. return &MockProduceResponse{t: t}
  470. }
  471. func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
  472. mr.version = version
  473. return mr
  474. }
  475. func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
  476. if mr.errors == nil {
  477. mr.errors = make(map[string]map[int32]KError)
  478. }
  479. partitions := mr.errors[topic]
  480. if partitions == nil {
  481. partitions = make(map[int32]KError)
  482. mr.errors[topic] = partitions
  483. }
  484. partitions[partition] = kerror
  485. return mr
  486. }
  487. func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {
  488. req := reqBody.(*ProduceRequest)
  489. res := &ProduceResponse{
  490. Version: mr.version,
  491. }
  492. for topic, partitions := range req.records {
  493. for partition := range partitions {
  494. res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
  495. }
  496. }
  497. return res
  498. }
  499. func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
  500. partitions := mr.errors[topic]
  501. if partitions == nil {
  502. return ErrNoError
  503. }
  504. kerror, ok := partitions[partition]
  505. if !ok {
  506. return ErrNoError
  507. }
  508. return kerror
  509. }
  510. // MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
  511. type MockOffsetFetchResponse struct {
  512. offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
  513. error KError
  514. t TestReporter
  515. }
  516. func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
  517. return &MockOffsetFetchResponse{t: t}
  518. }
  519. func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
  520. if mr.offsets == nil {
  521. mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
  522. }
  523. topics := mr.offsets[group]
  524. if topics == nil {
  525. topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
  526. mr.offsets[group] = topics
  527. }
  528. partitions := topics[topic]
  529. if partitions == nil {
  530. partitions = make(map[int32]*OffsetFetchResponseBlock)
  531. topics[topic] = partitions
  532. }
  533. partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
  534. return mr
  535. }
  536. func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
  537. mr.error = kerror
  538. return mr
  539. }
  540. func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
  541. req := reqBody.(*OffsetFetchRequest)
  542. group := req.ConsumerGroup
  543. res := &OffsetFetchResponse{Version: req.Version}
  544. for topic, partitions := range mr.offsets[group] {
  545. for partition, block := range partitions {
  546. res.AddBlock(topic, partition, block)
  547. }
  548. }
  549. if res.Version >= 2 {
  550. res.Err = mr.error
  551. }
  552. return res
  553. }
  554. type MockCreateTopicsResponse struct {
  555. t TestReporter
  556. }
  557. func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
  558. return &MockCreateTopicsResponse{t: t}
  559. }
  560. func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  561. req := reqBody.(*CreateTopicsRequest)
  562. res := &CreateTopicsResponse{
  563. Version: req.Version,
  564. }
  565. res.TopicErrors = make(map[string]*TopicError)
  566. for topic := range req.TopicDetails {
  567. if res.Version >= 1 && strings.HasPrefix(topic, "_") {
  568. msg := "insufficient permissions to create topic with reserved prefix"
  569. res.TopicErrors[topic] = &TopicError{
  570. Err: ErrTopicAuthorizationFailed,
  571. ErrMsg: &msg,
  572. }
  573. continue
  574. }
  575. res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
  576. }
  577. return res
  578. }
  579. type MockDeleteTopicsResponse struct {
  580. t TestReporter
  581. }
  582. func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
  583. return &MockDeleteTopicsResponse{t: t}
  584. }
  585. func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  586. req := reqBody.(*DeleteTopicsRequest)
  587. res := &DeleteTopicsResponse{}
  588. res.TopicErrorCodes = make(map[string]KError)
  589. for _, topic := range req.Topics {
  590. res.TopicErrorCodes[topic] = ErrNoError
  591. }
  592. res.Version = req.Version
  593. return res
  594. }
  595. type MockCreatePartitionsResponse struct {
  596. t TestReporter
  597. }
  598. func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
  599. return &MockCreatePartitionsResponse{t: t}
  600. }
  601. func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  602. req := reqBody.(*CreatePartitionsRequest)
  603. res := &CreatePartitionsResponse{}
  604. res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
  605. for topic := range req.TopicPartitions {
  606. if strings.HasPrefix(topic, "_") {
  607. msg := "insufficient permissions to create partition on topic with reserved prefix"
  608. res.TopicPartitionErrors[topic] = &TopicPartitionError{
  609. Err: ErrTopicAuthorizationFailed,
  610. ErrMsg: &msg,
  611. }
  612. continue
  613. }
  614. res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
  615. }
  616. return res
  617. }
  618. type MockAlterPartitionReassignmentsResponse struct {
  619. t TestReporter
  620. }
  621. func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse {
  622. return &MockAlterPartitionReassignmentsResponse{t: t}
  623. }
  624. func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  625. req := reqBody.(*AlterPartitionReassignmentsRequest)
  626. _ = req
  627. res := &AlterPartitionReassignmentsResponse{}
  628. return res
  629. }
  630. type MockListPartitionReassignmentsResponse struct {
  631. t TestReporter
  632. }
  633. func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {
  634. return &MockListPartitionReassignmentsResponse{t: t}
  635. }
  636. func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  637. req := reqBody.(*ListPartitionReassignmentsRequest)
  638. _ = req
  639. res := &ListPartitionReassignmentsResponse{}
  640. for topic, partitions := range req.blocks {
  641. for _, partition := range partitions {
  642. res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})
  643. }
  644. }
  645. return res
  646. }
  647. type MockDeleteRecordsResponse struct {
  648. t TestReporter
  649. }
  650. func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
  651. return &MockDeleteRecordsResponse{t: t}
  652. }
  653. func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  654. req := reqBody.(*DeleteRecordsRequest)
  655. res := &DeleteRecordsResponse{}
  656. res.Topics = make(map[string]*DeleteRecordsResponseTopic)
  657. for topic, deleteRecordRequestTopic := range req.Topics {
  658. partitions := make(map[int32]*DeleteRecordsResponsePartition)
  659. for partition := range deleteRecordRequestTopic.PartitionOffsets {
  660. partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
  661. }
  662. res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
  663. }
  664. return res
  665. }
  666. type MockDescribeConfigsResponse struct {
  667. t TestReporter
  668. }
  669. func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
  670. return &MockDescribeConfigsResponse{t: t}
  671. }
  672. func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  673. req := reqBody.(*DescribeConfigsRequest)
  674. res := &DescribeConfigsResponse{
  675. Version: req.Version,
  676. }
  677. includeSynonyms := req.Version > 0
  678. includeSource := req.Version > 0
  679. for _, r := range req.Resources {
  680. var configEntries []*ConfigEntry
  681. switch r.Type {
  682. case BrokerResource:
  683. configEntries = append(configEntries,
  684. &ConfigEntry{
  685. Name: "min.insync.replicas",
  686. Value: "2",
  687. ReadOnly: false,
  688. Default: false,
  689. },
  690. )
  691. res.Resources = append(res.Resources, &ResourceResponse{
  692. Name: r.Name,
  693. Configs: configEntries,
  694. })
  695. case BrokerLoggerResource:
  696. configEntries = append(configEntries,
  697. &ConfigEntry{
  698. Name: "kafka.controller.KafkaController",
  699. Value: "DEBUG",
  700. ReadOnly: false,
  701. Default: false,
  702. },
  703. )
  704. res.Resources = append(res.Resources, &ResourceResponse{
  705. Name: r.Name,
  706. Configs: configEntries,
  707. })
  708. case TopicResource:
  709. maxMessageBytes := &ConfigEntry{
  710. Name: "max.message.bytes",
  711. Value: "1000000",
  712. ReadOnly: false,
  713. Default: !includeSource,
  714. Sensitive: false,
  715. }
  716. if includeSource {
  717. maxMessageBytes.Source = SourceDefault
  718. }
  719. if includeSynonyms {
  720. maxMessageBytes.Synonyms = []*ConfigSynonym{
  721. {
  722. ConfigName: "max.message.bytes",
  723. ConfigValue: "500000",
  724. },
  725. }
  726. }
  727. retentionMs := &ConfigEntry{
  728. Name: "retention.ms",
  729. Value: "5000",
  730. ReadOnly: false,
  731. Default: false,
  732. Sensitive: false,
  733. }
  734. if includeSynonyms {
  735. retentionMs.Synonyms = []*ConfigSynonym{
  736. {
  737. ConfigName: "log.retention.ms",
  738. ConfigValue: "2500",
  739. },
  740. }
  741. }
  742. password := &ConfigEntry{
  743. Name: "password",
  744. Value: "12345",
  745. ReadOnly: false,
  746. Default: false,
  747. Sensitive: true,
  748. }
  749. configEntries = append(
  750. configEntries, maxMessageBytes, retentionMs, password)
  751. res.Resources = append(res.Resources, &ResourceResponse{
  752. Name: r.Name,
  753. Configs: configEntries,
  754. })
  755. }
  756. }
  757. return res
  758. }
  759. type MockDescribeConfigsResponseWithErrorCode struct {
  760. t TestReporter
  761. }
  762. func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode {
  763. return &MockDescribeConfigsResponseWithErrorCode{t: t}
  764. }
  765. func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
  766. req := reqBody.(*DescribeConfigsRequest)
  767. res := &DescribeConfigsResponse{
  768. Version: req.Version,
  769. }
  770. for _, r := range req.Resources {
  771. res.Resources = append(res.Resources, &ResourceResponse{
  772. Name: r.Name,
  773. Type: r.Type,
  774. ErrorCode: 83,
  775. ErrorMsg: "",
  776. })
  777. }
  778. return res
  779. }
  780. type MockAlterConfigsResponse struct {
  781. t TestReporter
  782. }
  783. func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
  784. return &MockAlterConfigsResponse{t: t}
  785. }
  786. func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  787. req := reqBody.(*AlterConfigsRequest)
  788. res := &AlterConfigsResponse{}
  789. for _, r := range req.Resources {
  790. res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
  791. Name: r.Name,
  792. Type: r.Type,
  793. ErrorMsg: "",
  794. })
  795. }
  796. return res
  797. }
  798. type MockAlterConfigsResponseWithErrorCode struct {
  799. t TestReporter
  800. }
  801. func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode {
  802. return &MockAlterConfigsResponseWithErrorCode{t: t}
  803. }
  804. func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
  805. req := reqBody.(*AlterConfigsRequest)
  806. res := &AlterConfigsResponse{}
  807. for _, r := range req.Resources {
  808. res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
  809. Name: r.Name,
  810. Type: r.Type,
  811. ErrorCode: 83,
  812. ErrorMsg: "",
  813. })
  814. }
  815. return res
  816. }
  817. type MockCreateAclsResponse struct {
  818. t TestReporter
  819. }
  820. func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
  821. return &MockCreateAclsResponse{t: t}
  822. }
  823. func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  824. req := reqBody.(*CreateAclsRequest)
  825. res := &CreateAclsResponse{}
  826. for range req.AclCreations {
  827. res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
  828. }
  829. return res
  830. }
  831. type MockListAclsResponse struct {
  832. t TestReporter
  833. }
  834. func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
  835. return &MockListAclsResponse{t: t}
  836. }
  837. func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  838. req := reqBody.(*DescribeAclsRequest)
  839. res := &DescribeAclsResponse{}
  840. res.Err = ErrNoError
  841. acl := &ResourceAcls{}
  842. if req.ResourceName != nil {
  843. acl.Resource.ResourceName = *req.ResourceName
  844. }
  845. acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
  846. acl.Resource.ResourceType = req.ResourceType
  847. host := "*"
  848. if req.Host != nil {
  849. host = *req.Host
  850. }
  851. principal := "User:test"
  852. if req.Principal != nil {
  853. principal = *req.Principal
  854. }
  855. permissionType := req.PermissionType
  856. if permissionType == AclPermissionAny {
  857. permissionType = AclPermissionAllow
  858. }
  859. acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
  860. res.ResourceAcls = append(res.ResourceAcls, acl)
  861. res.Version = int16(req.Version)
  862. return res
  863. }
  864. type MockSaslAuthenticateResponse struct {
  865. t TestReporter
  866. kerror KError
  867. saslAuthBytes []byte
  868. }
  869. func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
  870. return &MockSaslAuthenticateResponse{t: t}
  871. }
  872. func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {
  873. res := &SaslAuthenticateResponse{}
  874. res.Err = msar.kerror
  875. res.SaslAuthBytes = msar.saslAuthBytes
  876. return res
  877. }
  878. func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
  879. msar.kerror = kerror
  880. return msar
  881. }
  882. func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
  883. msar.saslAuthBytes = saslAuthBytes
  884. return msar
  885. }
  886. type MockDeleteAclsResponse struct {
  887. t TestReporter
  888. }
  889. type MockSaslHandshakeResponse struct {
  890. enabledMechanisms []string
  891. kerror KError
  892. t TestReporter
  893. }
  894. func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
  895. return &MockSaslHandshakeResponse{t: t}
  896. }
  897. func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {
  898. res := &SaslHandshakeResponse{}
  899. res.Err = mshr.kerror
  900. res.EnabledMechanisms = mshr.enabledMechanisms
  901. return res
  902. }
  903. func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
  904. mshr.kerror = kerror
  905. return mshr
  906. }
  907. func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
  908. mshr.enabledMechanisms = enabledMechanisms
  909. return mshr
  910. }
  911. func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
  912. return &MockDeleteAclsResponse{t: t}
  913. }
  914. func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  915. req := reqBody.(*DeleteAclsRequest)
  916. res := &DeleteAclsResponse{}
  917. for range req.Filters {
  918. response := &FilterResponse{Err: ErrNoError}
  919. response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
  920. res.FilterResponses = append(res.FilterResponses, response)
  921. }
  922. res.Version = int16(req.Version)
  923. return res
  924. }
  925. type MockDeleteGroupsResponse struct {
  926. deletedGroups []string
  927. }
  928. func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
  929. return &MockDeleteGroupsResponse{}
  930. }
  931. func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
  932. m.deletedGroups = groups
  933. return m
  934. }
  935. func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  936. resp := &DeleteGroupsResponse{
  937. GroupErrorCodes: map[string]KError{},
  938. }
  939. for _, group := range m.deletedGroups {
  940. resp.GroupErrorCodes[group] = ErrNoError
  941. }
  942. return resp
  943. }
  944. type MockDeleteOffsetResponse struct {
  945. errorCode KError
  946. topic string
  947. partition int32
  948. errorPartition KError
  949. }
  950. func NewMockDeleteOffsetRequest(t TestReporter) *MockDeleteOffsetResponse {
  951. return &MockDeleteOffsetResponse{}
  952. }
  953. func (m *MockDeleteOffsetResponse) SetDeletedOffset(errorCode KError, topic string, partition int32, errorPartition KError) *MockDeleteOffsetResponse {
  954. m.errorCode = errorCode
  955. m.topic = topic
  956. m.partition = partition
  957. m.errorPartition = errorPartition
  958. return m
  959. }
  960. func (m *MockDeleteOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
  961. resp := &DeleteOffsetsResponse{
  962. ErrorCode: m.errorCode,
  963. Errors: map[string]map[int32]KError{
  964. m.topic: {m.partition: m.errorPartition},
  965. },
  966. }
  967. return resp
  968. }
  969. type MockJoinGroupResponse struct {
  970. t TestReporter
  971. ThrottleTime int32
  972. Err KError
  973. GenerationId int32
  974. GroupProtocol string
  975. LeaderId string
  976. MemberId string
  977. Members map[string][]byte
  978. }
  979. func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse {
  980. return &MockJoinGroupResponse{
  981. t: t,
  982. Members: make(map[string][]byte),
  983. }
  984. }
  985. func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
  986. req := reqBody.(*JoinGroupRequest)
  987. resp := &JoinGroupResponse{
  988. Version: req.Version,
  989. ThrottleTime: m.ThrottleTime,
  990. Err: m.Err,
  991. GenerationId: m.GenerationId,
  992. GroupProtocol: m.GroupProtocol,
  993. LeaderId: m.LeaderId,
  994. MemberId: m.MemberId,
  995. Members: m.Members,
  996. }
  997. return resp
  998. }
  999. func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse {
  1000. m.ThrottleTime = t
  1001. return m
  1002. }
  1003. func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse {
  1004. m.Err = kerr
  1005. return m
  1006. }
  1007. func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse {
  1008. m.GenerationId = id
  1009. return m
  1010. }
  1011. func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse {
  1012. m.GroupProtocol = proto
  1013. return m
  1014. }
  1015. func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse {
  1016. m.LeaderId = id
  1017. return m
  1018. }
  1019. func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse {
  1020. m.MemberId = id
  1021. return m
  1022. }
  1023. func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse {
  1024. bin, err := encode(meta, nil)
  1025. if err != nil {
  1026. panic(fmt.Sprintf("error encoding member metadata: %v", err))
  1027. }
  1028. m.Members[id] = bin
  1029. return m
  1030. }
  1031. type MockLeaveGroupResponse struct {
  1032. t TestReporter
  1033. Err KError
  1034. }
  1035. func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse {
  1036. return &MockLeaveGroupResponse{t: t}
  1037. }
  1038. func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1039. resp := &LeaveGroupResponse{
  1040. Err: m.Err,
  1041. }
  1042. return resp
  1043. }
  1044. func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse {
  1045. m.Err = kerr
  1046. return m
  1047. }
  1048. type MockSyncGroupResponse struct {
  1049. t TestReporter
  1050. Err KError
  1051. MemberAssignment []byte
  1052. }
  1053. func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse {
  1054. return &MockSyncGroupResponse{t: t}
  1055. }
  1056. func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1057. resp := &SyncGroupResponse{
  1058. Err: m.Err,
  1059. MemberAssignment: m.MemberAssignment,
  1060. }
  1061. return resp
  1062. }
  1063. func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse {
  1064. m.Err = kerr
  1065. return m
  1066. }
  1067. func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse {
  1068. bin, err := encode(assignment, nil)
  1069. if err != nil {
  1070. panic(fmt.Sprintf("error encoding member assignment: %v", err))
  1071. }
  1072. m.MemberAssignment = bin
  1073. return m
  1074. }
  1075. type MockHeartbeatResponse struct {
  1076. t TestReporter
  1077. Err KError
  1078. }
  1079. func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse {
  1080. return &MockHeartbeatResponse{t: t}
  1081. }
  1082. func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1083. resp := &HeartbeatResponse{}
  1084. return resp
  1085. }
  1086. func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse {
  1087. m.Err = kerr
  1088. return m
  1089. }
  1090. type MockDescribeLogDirsResponse struct {
  1091. t TestReporter
  1092. logDirs []DescribeLogDirsResponseDirMetadata
  1093. }
  1094. func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
  1095. return &MockDescribeLogDirsResponse{t: t}
  1096. }
  1097. func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
  1098. var topics []DescribeLogDirsResponseTopic
  1099. for topic := range topicPartitions {
  1100. var partitions []DescribeLogDirsResponsePartition
  1101. for i := 0; i < topicPartitions[topic]; i++ {
  1102. partitions = append(partitions, DescribeLogDirsResponsePartition{
  1103. PartitionID: int32(i),
  1104. IsTemporary: false,
  1105. OffsetLag: int64(0),
  1106. Size: int64(1234),
  1107. })
  1108. }
  1109. topics = append(topics, DescribeLogDirsResponseTopic{
  1110. Topic: topic,
  1111. Partitions: partitions,
  1112. })
  1113. }
  1114. logDir := DescribeLogDirsResponseDirMetadata{
  1115. ErrorCode: ErrNoError,
  1116. Path: logDirPath,
  1117. Topics: topics,
  1118. }
  1119. m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
  1120. return m
  1121. }
  1122. func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1123. resp := &DescribeLogDirsResponse{
  1124. LogDirs: m.logDirs,
  1125. }
  1126. return resp
  1127. }
  1128. type MockApiVersionsResponse struct {
  1129. t TestReporter
  1130. }
  1131. func NewMockApiVersionsResponse(t TestReporter) *MockApiVersionsResponse {
  1132. return &MockApiVersionsResponse{t: t}
  1133. }
  1134. func (mr *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1135. req := reqBody.(*ApiVersionsRequest)
  1136. res := &ApiVersionsResponse{
  1137. Version: req.Version,
  1138. ApiKeys: []ApiVersionsResponseKey{
  1139. {
  1140. Version: req.Version,
  1141. ApiKey: 0,
  1142. MinVersion: 5,
  1143. MaxVersion: 8,
  1144. },
  1145. {
  1146. Version: req.Version,
  1147. ApiKey: 1,
  1148. MinVersion: 7,
  1149. MaxVersion: 11,
  1150. },
  1151. },
  1152. }
  1153. return res
  1154. }