123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332 |
- package sarama
- import (
- "fmt"
- "strings"
- )
- // TestReporter has methods matching go's testing.T to avoid importing
- // `testing` in the main part of the library.
- type TestReporter interface {
- Error(...interface{})
- Errorf(string, ...interface{})
- Fatal(...interface{})
- Fatalf(string, ...interface{})
- }
- // MockResponse is a response builder interface it defines one method that
- // allows generating a response based on a request body. MockResponses are used
- // to program behavior of MockBroker in tests.
- type MockResponse interface {
- For(reqBody versionedDecoder) (res encoderWithHeader)
- }
- // MockWrapper is a mock response builder that returns a particular concrete
- // response regardless of the actual request passed to the `For` method.
- type MockWrapper struct {
- res encoderWithHeader
- }
- func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {
- return mw.res
- }
- func NewMockWrapper(res encoderWithHeader) *MockWrapper {
- return &MockWrapper{res: res}
- }
- // MockSequence is a mock response builder that is created from a sequence of
- // concrete responses. Every time when a `MockBroker` calls its `For` method
- // the next response from the sequence is returned. When the end of the
- // sequence is reached the last element from the sequence is returned.
- type MockSequence struct {
- responses []MockResponse
- }
- func NewMockSequence(responses ...interface{}) *MockSequence {
- ms := &MockSequence{}
- ms.responses = make([]MockResponse, len(responses))
- for i, res := range responses {
- switch res := res.(type) {
- case MockResponse:
- ms.responses[i] = res
- case encoderWithHeader:
- ms.responses[i] = NewMockWrapper(res)
- default:
- panic(fmt.Sprintf("Unexpected response type: %T", res))
- }
- }
- return ms
- }
- func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {
- res = mc.responses[0].For(reqBody)
- if len(mc.responses) > 1 {
- mc.responses = mc.responses[1:]
- }
- return res
- }
- type MockListGroupsResponse struct {
- groups map[string]string
- t TestReporter
- }
- func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
- return &MockListGroupsResponse{
- groups: make(map[string]string),
- t: t,
- }
- }
- func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- request := reqBody.(*ListGroupsRequest)
- _ = request
- response := &ListGroupsResponse{
- Groups: m.groups,
- }
- return response
- }
- func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
- m.groups[groupID] = protocolType
- return m
- }
- type MockDescribeGroupsResponse struct {
- groups map[string]*GroupDescription
- t TestReporter
- }
- func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
- return &MockDescribeGroupsResponse{
- t: t,
- groups: make(map[string]*GroupDescription),
- }
- }
- func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
- m.groups[groupID] = description
- return m
- }
- func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- request := reqBody.(*DescribeGroupsRequest)
- response := &DescribeGroupsResponse{}
- for _, requestedGroup := range request.Groups {
- if group, ok := m.groups[requestedGroup]; ok {
- response.Groups = append(response.Groups, group)
- } else {
- // Mimic real kafka - if a group doesn't exist, return
- // an entry with state "Dead"
- response.Groups = append(response.Groups, &GroupDescription{
- GroupId: requestedGroup,
- State: "Dead",
- })
- }
- }
- return response
- }
- // MockMetadataResponse is a `MetadataResponse` builder.
- type MockMetadataResponse struct {
- controllerID int32
- leaders map[string]map[int32]int32
- brokers map[string]int32
- t TestReporter
- }
- func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
- return &MockMetadataResponse{
- leaders: make(map[string]map[int32]int32),
- brokers: make(map[string]int32),
- t: t,
- }
- }
- func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
- partitions := mmr.leaders[topic]
- if partitions == nil {
- partitions = make(map[int32]int32)
- mmr.leaders[topic] = partitions
- }
- partitions[partition] = brokerID
- return mmr
- }
- func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
- mmr.brokers[addr] = brokerID
- return mmr
- }
- func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
- mmr.controllerID = brokerID
- return mmr
- }
- func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
- metadataRequest := reqBody.(*MetadataRequest)
- metadataResponse := &MetadataResponse{
- Version: metadataRequest.version(),
- ControllerID: mmr.controllerID,
- }
- for addr, brokerID := range mmr.brokers {
- metadataResponse.AddBroker(addr, brokerID)
- }
- // Generate set of replicas
- var replicas []int32
- var offlineReplicas []int32
- for _, brokerID := range mmr.brokers {
- replicas = append(replicas, brokerID)
- }
- if len(metadataRequest.Topics) == 0 {
- for topic, partitions := range mmr.leaders {
- for partition, brokerID := range partitions {
- metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
- }
- }
- return metadataResponse
- }
- for _, topic := range metadataRequest.Topics {
- for partition, brokerID := range mmr.leaders[topic] {
- metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
- }
- }
- return metadataResponse
- }
- // MockOffsetResponse is an `OffsetResponse` builder.
- type MockOffsetResponse struct {
- offsets map[string]map[int32]map[int64]int64
- t TestReporter
- version int16
- }
- func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
- return &MockOffsetResponse{
- offsets: make(map[string]map[int32]map[int64]int64),
- t: t,
- }
- }
- func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
- mor.version = version
- return mor
- }
- func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
- partitions := mor.offsets[topic]
- if partitions == nil {
- partitions = make(map[int32]map[int64]int64)
- mor.offsets[topic] = partitions
- }
- times := partitions[partition]
- if times == nil {
- times = make(map[int64]int64)
- partitions[partition] = times
- }
- times[time] = offset
- return mor
- }
- func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
- offsetRequest := reqBody.(*OffsetRequest)
- offsetResponse := &OffsetResponse{Version: mor.version}
- for topic, partitions := range offsetRequest.blocks {
- for partition, block := range partitions {
- offset := mor.getOffset(topic, partition, block.time)
- offsetResponse.AddTopicPartition(topic, partition, offset)
- }
- }
- return offsetResponse
- }
- func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
- partitions := mor.offsets[topic]
- if partitions == nil {
- mor.t.Errorf("missing topic: %s", topic)
- }
- times := partitions[partition]
- if times == nil {
- mor.t.Errorf("missing partition: %d", partition)
- }
- offset, ok := times[time]
- if !ok {
- mor.t.Errorf("missing time: %d", time)
- }
- return offset
- }
- // MockFetchResponse is a `FetchResponse` builder.
- type MockFetchResponse struct {
- messages map[string]map[int32]map[int64]Encoder
- highWaterMarks map[string]map[int32]int64
- t TestReporter
- batchSize int
- version int16
- }
- func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
- return &MockFetchResponse{
- messages: make(map[string]map[int32]map[int64]Encoder),
- highWaterMarks: make(map[string]map[int32]int64),
- t: t,
- batchSize: batchSize,
- }
- }
- func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
- mfr.version = version
- return mfr
- }
- func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
- partitions := mfr.messages[topic]
- if partitions == nil {
- partitions = make(map[int32]map[int64]Encoder)
- mfr.messages[topic] = partitions
- }
- messages := partitions[partition]
- if messages == nil {
- messages = make(map[int64]Encoder)
- partitions[partition] = messages
- }
- messages[offset] = msg
- return mfr
- }
- func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
- partitions := mfr.highWaterMarks[topic]
- if partitions == nil {
- partitions = make(map[int32]int64)
- mfr.highWaterMarks[topic] = partitions
- }
- partitions[partition] = offset
- return mfr
- }
- func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
- fetchRequest := reqBody.(*FetchRequest)
- res := &FetchResponse{
- Version: mfr.version,
- }
- for topic, partitions := range fetchRequest.blocks {
- for partition, block := range partitions {
- initialOffset := block.fetchOffset
- offset := initialOffset
- maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
- for i := 0; i < mfr.batchSize && offset < maxOffset; {
- msg := mfr.getMessage(topic, partition, offset)
- if msg != nil {
- res.AddMessage(topic, partition, nil, msg, offset)
- i++
- }
- offset++
- }
- fb := res.GetBlock(topic, partition)
- if fb == nil {
- res.AddError(topic, partition, ErrNoError)
- fb = res.GetBlock(topic, partition)
- }
- fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
- }
- }
- return res
- }
- func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
- partitions := mfr.messages[topic]
- if partitions == nil {
- return nil
- }
- messages := partitions[partition]
- if messages == nil {
- return nil
- }
- return messages[offset]
- }
- func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
- partitions := mfr.messages[topic]
- if partitions == nil {
- return 0
- }
- messages := partitions[partition]
- if messages == nil {
- return 0
- }
- return len(messages)
- }
- func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
- partitions := mfr.highWaterMarks[topic]
- if partitions == nil {
- return 0
- }
- return partitions[partition]
- }
- // MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
- type MockConsumerMetadataResponse struct {
- coordinators map[string]interface{}
- t TestReporter
- }
- func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
- return &MockConsumerMetadataResponse{
- coordinators: make(map[string]interface{}),
- t: t,
- }
- }
- func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
- mr.coordinators[group] = broker
- return mr
- }
- func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
- mr.coordinators[group] = kerror
- return mr
- }
- func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*ConsumerMetadataRequest)
- group := req.ConsumerGroup
- res := &ConsumerMetadataResponse{}
- v := mr.coordinators[group]
- switch v := v.(type) {
- case *MockBroker:
- res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
- case KError:
- res.Err = v
- }
- return res
- }
- // MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
- type MockFindCoordinatorResponse struct {
- groupCoordinators map[string]interface{}
- transCoordinators map[string]interface{}
- t TestReporter
- }
- func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
- return &MockFindCoordinatorResponse{
- groupCoordinators: make(map[string]interface{}),
- transCoordinators: make(map[string]interface{}),
- t: t,
- }
- }
- func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
- switch coordinatorType {
- case CoordinatorGroup:
- mr.groupCoordinators[group] = broker
- case CoordinatorTransaction:
- mr.transCoordinators[group] = broker
- }
- return mr
- }
- func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
- switch coordinatorType {
- case CoordinatorGroup:
- mr.groupCoordinators[group] = kerror
- case CoordinatorTransaction:
- mr.transCoordinators[group] = kerror
- }
- return mr
- }
- func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*FindCoordinatorRequest)
- res := &FindCoordinatorResponse{}
- var v interface{}
- switch req.CoordinatorType {
- case CoordinatorGroup:
- v = mr.groupCoordinators[req.CoordinatorKey]
- case CoordinatorTransaction:
- v = mr.transCoordinators[req.CoordinatorKey]
- }
- switch v := v.(type) {
- case *MockBroker:
- res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
- case KError:
- res.Err = v
- }
- return res
- }
- // MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
- type MockOffsetCommitResponse struct {
- errors map[string]map[string]map[int32]KError
- t TestReporter
- }
- func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
- return &MockOffsetCommitResponse{t: t}
- }
- func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
- if mr.errors == nil {
- mr.errors = make(map[string]map[string]map[int32]KError)
- }
- topics := mr.errors[group]
- if topics == nil {
- topics = make(map[string]map[int32]KError)
- mr.errors[group] = topics
- }
- partitions := topics[topic]
- if partitions == nil {
- partitions = make(map[int32]KError)
- topics[topic] = partitions
- }
- partitions[partition] = kerror
- return mr
- }
- func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*OffsetCommitRequest)
- group := req.ConsumerGroup
- res := &OffsetCommitResponse{}
- for topic, partitions := range req.blocks {
- for partition := range partitions {
- res.AddError(topic, partition, mr.getError(group, topic, partition))
- }
- }
- return res
- }
- func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
- topics := mr.errors[group]
- if topics == nil {
- return ErrNoError
- }
- partitions := topics[topic]
- if partitions == nil {
- return ErrNoError
- }
- kerror, ok := partitions[partition]
- if !ok {
- return ErrNoError
- }
- return kerror
- }
- // MockProduceResponse is a `ProduceResponse` builder.
- type MockProduceResponse struct {
- version int16
- errors map[string]map[int32]KError
- t TestReporter
- }
- func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
- return &MockProduceResponse{t: t}
- }
- func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
- mr.version = version
- return mr
- }
- func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
- if mr.errors == nil {
- mr.errors = make(map[string]map[int32]KError)
- }
- partitions := mr.errors[topic]
- if partitions == nil {
- partitions = make(map[int32]KError)
- mr.errors[topic] = partitions
- }
- partitions[partition] = kerror
- return mr
- }
- func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*ProduceRequest)
- res := &ProduceResponse{
- Version: mr.version,
- }
- for topic, partitions := range req.records {
- for partition := range partitions {
- res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
- }
- }
- return res
- }
- func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
- partitions := mr.errors[topic]
- if partitions == nil {
- return ErrNoError
- }
- kerror, ok := partitions[partition]
- if !ok {
- return ErrNoError
- }
- return kerror
- }
- // MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
- type MockOffsetFetchResponse struct {
- offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
- error KError
- t TestReporter
- }
- func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
- return &MockOffsetFetchResponse{t: t}
- }
- func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
- if mr.offsets == nil {
- mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
- }
- topics := mr.offsets[group]
- if topics == nil {
- topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
- mr.offsets[group] = topics
- }
- partitions := topics[topic]
- if partitions == nil {
- partitions = make(map[int32]*OffsetFetchResponseBlock)
- topics[topic] = partitions
- }
- partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
- return mr
- }
- func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
- mr.error = kerror
- return mr
- }
- func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*OffsetFetchRequest)
- group := req.ConsumerGroup
- res := &OffsetFetchResponse{Version: req.Version}
- for topic, partitions := range mr.offsets[group] {
- for partition, block := range partitions {
- res.AddBlock(topic, partition, block)
- }
- }
- if res.Version >= 2 {
- res.Err = mr.error
- }
- return res
- }
- type MockCreateTopicsResponse struct {
- t TestReporter
- }
- func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
- return &MockCreateTopicsResponse{t: t}
- }
- func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*CreateTopicsRequest)
- res := &CreateTopicsResponse{
- Version: req.Version,
- }
- res.TopicErrors = make(map[string]*TopicError)
- for topic := range req.TopicDetails {
- if res.Version >= 1 && strings.HasPrefix(topic, "_") {
- msg := "insufficient permissions to create topic with reserved prefix"
- res.TopicErrors[topic] = &TopicError{
- Err: ErrTopicAuthorizationFailed,
- ErrMsg: &msg,
- }
- continue
- }
- res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
- }
- return res
- }
- type MockDeleteTopicsResponse struct {
- t TestReporter
- }
- func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
- return &MockDeleteTopicsResponse{t: t}
- }
- func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*DeleteTopicsRequest)
- res := &DeleteTopicsResponse{}
- res.TopicErrorCodes = make(map[string]KError)
- for _, topic := range req.Topics {
- res.TopicErrorCodes[topic] = ErrNoError
- }
- res.Version = req.Version
- return res
- }
- type MockCreatePartitionsResponse struct {
- t TestReporter
- }
- func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
- return &MockCreatePartitionsResponse{t: t}
- }
- func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*CreatePartitionsRequest)
- res := &CreatePartitionsResponse{}
- res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
- for topic := range req.TopicPartitions {
- if strings.HasPrefix(topic, "_") {
- msg := "insufficient permissions to create partition on topic with reserved prefix"
- res.TopicPartitionErrors[topic] = &TopicPartitionError{
- Err: ErrTopicAuthorizationFailed,
- ErrMsg: &msg,
- }
- continue
- }
- res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
- }
- return res
- }
- type MockAlterPartitionReassignmentsResponse struct {
- t TestReporter
- }
- func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse {
- return &MockAlterPartitionReassignmentsResponse{t: t}
- }
- func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*AlterPartitionReassignmentsRequest)
- _ = req
- res := &AlterPartitionReassignmentsResponse{}
- return res
- }
- type MockListPartitionReassignmentsResponse struct {
- t TestReporter
- }
- func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {
- return &MockListPartitionReassignmentsResponse{t: t}
- }
- func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*ListPartitionReassignmentsRequest)
- _ = req
- res := &ListPartitionReassignmentsResponse{}
- for topic, partitions := range req.blocks {
- for _, partition := range partitions {
- res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})
- }
- }
- return res
- }
- type MockDeleteRecordsResponse struct {
- t TestReporter
- }
- func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
- return &MockDeleteRecordsResponse{t: t}
- }
- func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*DeleteRecordsRequest)
- res := &DeleteRecordsResponse{}
- res.Topics = make(map[string]*DeleteRecordsResponseTopic)
- for topic, deleteRecordRequestTopic := range req.Topics {
- partitions := make(map[int32]*DeleteRecordsResponsePartition)
- for partition := range deleteRecordRequestTopic.PartitionOffsets {
- partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
- }
- res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
- }
- return res
- }
- type MockDescribeConfigsResponse struct {
- t TestReporter
- }
- func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
- return &MockDescribeConfigsResponse{t: t}
- }
- func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*DescribeConfigsRequest)
- res := &DescribeConfigsResponse{
- Version: req.Version,
- }
- includeSynonyms := req.Version > 0
- includeSource := req.Version > 0
- for _, r := range req.Resources {
- var configEntries []*ConfigEntry
- switch r.Type {
- case BrokerResource:
- configEntries = append(configEntries,
- &ConfigEntry{
- Name: "min.insync.replicas",
- Value: "2",
- ReadOnly: false,
- Default: false,
- },
- )
- res.Resources = append(res.Resources, &ResourceResponse{
- Name: r.Name,
- Configs: configEntries,
- })
- case BrokerLoggerResource:
- configEntries = append(configEntries,
- &ConfigEntry{
- Name: "kafka.controller.KafkaController",
- Value: "DEBUG",
- ReadOnly: false,
- Default: false,
- },
- )
- res.Resources = append(res.Resources, &ResourceResponse{
- Name: r.Name,
- Configs: configEntries,
- })
- case TopicResource:
- maxMessageBytes := &ConfigEntry{
- Name: "max.message.bytes",
- Value: "1000000",
- ReadOnly: false,
- Default: !includeSource,
- Sensitive: false,
- }
- if includeSource {
- maxMessageBytes.Source = SourceDefault
- }
- if includeSynonyms {
- maxMessageBytes.Synonyms = []*ConfigSynonym{
- {
- ConfigName: "max.message.bytes",
- ConfigValue: "500000",
- },
- }
- }
- retentionMs := &ConfigEntry{
- Name: "retention.ms",
- Value: "5000",
- ReadOnly: false,
- Default: false,
- Sensitive: false,
- }
- if includeSynonyms {
- retentionMs.Synonyms = []*ConfigSynonym{
- {
- ConfigName: "log.retention.ms",
- ConfigValue: "2500",
- },
- }
- }
- password := &ConfigEntry{
- Name: "password",
- Value: "12345",
- ReadOnly: false,
- Default: false,
- Sensitive: true,
- }
- configEntries = append(
- configEntries, maxMessageBytes, retentionMs, password)
- res.Resources = append(res.Resources, &ResourceResponse{
- Name: r.Name,
- Configs: configEntries,
- })
- }
- }
- return res
- }
- type MockDescribeConfigsResponseWithErrorCode struct {
- t TestReporter
- }
- func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode {
- return &MockDescribeConfigsResponseWithErrorCode{t: t}
- }
- func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*DescribeConfigsRequest)
- res := &DescribeConfigsResponse{
- Version: req.Version,
- }
- for _, r := range req.Resources {
- res.Resources = append(res.Resources, &ResourceResponse{
- Name: r.Name,
- Type: r.Type,
- ErrorCode: 83,
- ErrorMsg: "",
- })
- }
- return res
- }
- type MockAlterConfigsResponse struct {
- t TestReporter
- }
- func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
- return &MockAlterConfigsResponse{t: t}
- }
- func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*AlterConfigsRequest)
- res := &AlterConfigsResponse{}
- for _, r := range req.Resources {
- res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
- Name: r.Name,
- Type: r.Type,
- ErrorMsg: "",
- })
- }
- return res
- }
- type MockAlterConfigsResponseWithErrorCode struct {
- t TestReporter
- }
- func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode {
- return &MockAlterConfigsResponseWithErrorCode{t: t}
- }
- func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*AlterConfigsRequest)
- res := &AlterConfigsResponse{}
- for _, r := range req.Resources {
- res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
- Name: r.Name,
- Type: r.Type,
- ErrorCode: 83,
- ErrorMsg: "",
- })
- }
- return res
- }
- type MockCreateAclsResponse struct {
- t TestReporter
- }
- func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
- return &MockCreateAclsResponse{t: t}
- }
- func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*CreateAclsRequest)
- res := &CreateAclsResponse{}
- for range req.AclCreations {
- res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
- }
- return res
- }
- type MockListAclsResponse struct {
- t TestReporter
- }
- func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
- return &MockListAclsResponse{t: t}
- }
- func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*DescribeAclsRequest)
- res := &DescribeAclsResponse{}
- res.Err = ErrNoError
- acl := &ResourceAcls{}
- if req.ResourceName != nil {
- acl.Resource.ResourceName = *req.ResourceName
- }
- acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
- acl.Resource.ResourceType = req.ResourceType
- host := "*"
- if req.Host != nil {
- host = *req.Host
- }
- principal := "User:test"
- if req.Principal != nil {
- principal = *req.Principal
- }
- permissionType := req.PermissionType
- if permissionType == AclPermissionAny {
- permissionType = AclPermissionAllow
- }
- acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
- res.ResourceAcls = append(res.ResourceAcls, acl)
- res.Version = int16(req.Version)
- return res
- }
- type MockSaslAuthenticateResponse struct {
- t TestReporter
- kerror KError
- saslAuthBytes []byte
- }
- func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
- return &MockSaslAuthenticateResponse{t: t}
- }
- func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {
- res := &SaslAuthenticateResponse{}
- res.Err = msar.kerror
- res.SaslAuthBytes = msar.saslAuthBytes
- return res
- }
- func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
- msar.kerror = kerror
- return msar
- }
- func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
- msar.saslAuthBytes = saslAuthBytes
- return msar
- }
- type MockDeleteAclsResponse struct {
- t TestReporter
- }
- type MockSaslHandshakeResponse struct {
- enabledMechanisms []string
- kerror KError
- t TestReporter
- }
- func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
- return &MockSaslHandshakeResponse{t: t}
- }
- func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {
- res := &SaslHandshakeResponse{}
- res.Err = mshr.kerror
- res.EnabledMechanisms = mshr.enabledMechanisms
- return res
- }
- func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
- mshr.kerror = kerror
- return mshr
- }
- func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
- mshr.enabledMechanisms = enabledMechanisms
- return mshr
- }
- func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
- return &MockDeleteAclsResponse{t: t}
- }
- func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*DeleteAclsRequest)
- res := &DeleteAclsResponse{}
- for range req.Filters {
- response := &FilterResponse{Err: ErrNoError}
- response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
- res.FilterResponses = append(res.FilterResponses, response)
- }
- res.Version = int16(req.Version)
- return res
- }
- type MockDeleteGroupsResponse struct {
- deletedGroups []string
- }
- func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
- return &MockDeleteGroupsResponse{}
- }
- func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
- m.deletedGroups = groups
- return m
- }
- func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- resp := &DeleteGroupsResponse{
- GroupErrorCodes: map[string]KError{},
- }
- for _, group := range m.deletedGroups {
- resp.GroupErrorCodes[group] = ErrNoError
- }
- return resp
- }
- type MockDeleteOffsetResponse struct {
- errorCode KError
- topic string
- partition int32
- errorPartition KError
- }
- func NewMockDeleteOffsetRequest(t TestReporter) *MockDeleteOffsetResponse {
- return &MockDeleteOffsetResponse{}
- }
- func (m *MockDeleteOffsetResponse) SetDeletedOffset(errorCode KError, topic string, partition int32, errorPartition KError) *MockDeleteOffsetResponse {
- m.errorCode = errorCode
- m.topic = topic
- m.partition = partition
- m.errorPartition = errorPartition
- return m
- }
- func (m *MockDeleteOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
- resp := &DeleteOffsetsResponse{
- ErrorCode: m.errorCode,
- Errors: map[string]map[int32]KError{
- m.topic: {m.partition: m.errorPartition},
- },
- }
- return resp
- }
- type MockJoinGroupResponse struct {
- t TestReporter
- ThrottleTime int32
- Err KError
- GenerationId int32
- GroupProtocol string
- LeaderId string
- MemberId string
- Members map[string][]byte
- }
- func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse {
- return &MockJoinGroupResponse{
- t: t,
- Members: make(map[string][]byte),
- }
- }
- func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*JoinGroupRequest)
- resp := &JoinGroupResponse{
- Version: req.Version,
- ThrottleTime: m.ThrottleTime,
- Err: m.Err,
- GenerationId: m.GenerationId,
- GroupProtocol: m.GroupProtocol,
- LeaderId: m.LeaderId,
- MemberId: m.MemberId,
- Members: m.Members,
- }
- return resp
- }
- func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse {
- m.ThrottleTime = t
- return m
- }
- func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse {
- m.Err = kerr
- return m
- }
- func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse {
- m.GenerationId = id
- return m
- }
- func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse {
- m.GroupProtocol = proto
- return m
- }
- func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse {
- m.LeaderId = id
- return m
- }
- func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse {
- m.MemberId = id
- return m
- }
- func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse {
- bin, err := encode(meta, nil)
- if err != nil {
- panic(fmt.Sprintf("error encoding member metadata: %v", err))
- }
- m.Members[id] = bin
- return m
- }
- type MockLeaveGroupResponse struct {
- t TestReporter
- Err KError
- }
- func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse {
- return &MockLeaveGroupResponse{t: t}
- }
- func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
- resp := &LeaveGroupResponse{
- Err: m.Err,
- }
- return resp
- }
- func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse {
- m.Err = kerr
- return m
- }
- type MockSyncGroupResponse struct {
- t TestReporter
- Err KError
- MemberAssignment []byte
- }
- func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse {
- return &MockSyncGroupResponse{t: t}
- }
- func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
- resp := &SyncGroupResponse{
- Err: m.Err,
- MemberAssignment: m.MemberAssignment,
- }
- return resp
- }
- func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse {
- m.Err = kerr
- return m
- }
- func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse {
- bin, err := encode(assignment, nil)
- if err != nil {
- panic(fmt.Sprintf("error encoding member assignment: %v", err))
- }
- m.MemberAssignment = bin
- return m
- }
- type MockHeartbeatResponse struct {
- t TestReporter
- Err KError
- }
- func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse {
- return &MockHeartbeatResponse{t: t}
- }
- func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader {
- resp := &HeartbeatResponse{}
- return resp
- }
- func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse {
- m.Err = kerr
- return m
- }
- type MockDescribeLogDirsResponse struct {
- t TestReporter
- logDirs []DescribeLogDirsResponseDirMetadata
- }
- func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
- return &MockDescribeLogDirsResponse{t: t}
- }
- func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
- var topics []DescribeLogDirsResponseTopic
- for topic := range topicPartitions {
- var partitions []DescribeLogDirsResponsePartition
- for i := 0; i < topicPartitions[topic]; i++ {
- partitions = append(partitions, DescribeLogDirsResponsePartition{
- PartitionID: int32(i),
- IsTemporary: false,
- OffsetLag: int64(0),
- Size: int64(1234),
- })
- }
- topics = append(topics, DescribeLogDirsResponseTopic{
- Topic: topic,
- Partitions: partitions,
- })
- }
- logDir := DescribeLogDirsResponseDirMetadata{
- ErrorCode: ErrNoError,
- Path: logDirPath,
- Topics: topics,
- }
- m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
- return m
- }
- func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- resp := &DescribeLogDirsResponse{
- LogDirs: m.logDirs,
- }
- return resp
- }
- type MockApiVersionsResponse struct {
- t TestReporter
- }
- func NewMockApiVersionsResponse(t TestReporter) *MockApiVersionsResponse {
- return &MockApiVersionsResponse{t: t}
- }
- func (mr *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
- req := reqBody.(*ApiVersionsRequest)
- res := &ApiVersionsResponse{
- Version: req.Version,
- ApiKeys: []ApiVersionsResponseKey{
- {
- Version: req.Version,
- ApiKey: 0,
- MinVersion: 5,
- MaxVersion: 8,
- },
- {
- Version: req.Version,
- ApiKey: 1,
- MinVersion: 7,
- MaxVersion: 11,
- },
- },
- }
- return res
- }
|