consumer_group.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923
  1. package sarama
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sort"
  7. "sync"
  8. "time"
  9. "github.com/rcrowley/go-metrics"
  10. )
  11. // ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
  12. var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
  13. // ConsumerGroup is responsible for dividing up processing of topics and partitions
  14. // over a collection of processes (the members of the consumer group).
  15. type ConsumerGroup interface {
  16. // Consume joins a cluster of consumers for a given list of topics and
  17. // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
  18. //
  19. // The life-cycle of a session is represented by the following steps:
  20. //
  21. // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
  22. // and is assigned their "fair share" of partitions, aka 'claims'.
  23. // 2. Before processing starts, the handler's Setup() hook is called to notify the user
  24. // of the claims and allow any necessary preparation or alteration of state.
  25. // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
  26. // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
  27. // from concurrent reads/writes.
  28. // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
  29. // parent context is canceled or when a server-side rebalance cycle is initiated.
  30. // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
  31. // to allow the user to perform any final tasks before a rebalance.
  32. // 6. Finally, marked offsets are committed one last time before claims are released.
  33. //
  34. // Please note, that once a rebalance is triggered, sessions must be completed within
  35. // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
  36. // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
  37. // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
  38. // commit failures.
  39. // This method should be called inside an infinite loop, when a
  40. // server-side rebalance happens, the consumer session will need to be
  41. // recreated to get the new claims.
  42. Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
  43. // Errors returns a read channel of errors that occurred during the consumer life-cycle.
  44. // By default, errors are logged and not returned over this channel.
  45. // If you want to implement any custom error handling, set your config's
  46. // Consumer.Return.Errors setting to true, and read from this channel.
  47. Errors() <-chan error
  48. // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
  49. // this function before the object passes out of scope, as it will otherwise leak memory.
  50. Close() error
  51. }
  52. type consumerGroup struct {
  53. client Client
  54. config *Config
  55. consumer Consumer
  56. groupID string
  57. memberID string
  58. errors chan error
  59. lock sync.Mutex
  60. closed chan none
  61. closeOnce sync.Once
  62. userData []byte
  63. }
  64. // NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
  65. func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
  66. client, err := NewClient(addrs, config)
  67. if err != nil {
  68. return nil, err
  69. }
  70. c, err := newConsumerGroup(groupID, client)
  71. if err != nil {
  72. _ = client.Close()
  73. }
  74. return c, err
  75. }
  76. // NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
  77. // necessary to call Close() on the underlying client when shutting down this consumer.
  78. // PLEASE NOTE: consumer groups can only re-use but not share clients.
  79. func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
  80. // For clients passed in by the client, ensure we don't
  81. // call Close() on it.
  82. cli := &nopCloserClient{client}
  83. return newConsumerGroup(groupID, cli)
  84. }
  85. func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
  86. config := client.Config()
  87. if !config.Version.IsAtLeast(V0_10_2_0) {
  88. return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
  89. }
  90. consumer, err := NewConsumerFromClient(client)
  91. if err != nil {
  92. return nil, err
  93. }
  94. return &consumerGroup{
  95. client: client,
  96. consumer: consumer,
  97. config: config,
  98. groupID: groupID,
  99. errors: make(chan error, config.ChannelBufferSize),
  100. closed: make(chan none),
  101. userData: config.Consumer.Group.Member.UserData,
  102. }, nil
  103. }
  104. // Errors implements ConsumerGroup.
  105. func (c *consumerGroup) Errors() <-chan error { return c.errors }
  106. // Close implements ConsumerGroup.
  107. func (c *consumerGroup) Close() (err error) {
  108. c.closeOnce.Do(func() {
  109. close(c.closed)
  110. // leave group
  111. if e := c.leave(); e != nil {
  112. err = e
  113. }
  114. // drain errors
  115. go func() {
  116. close(c.errors)
  117. }()
  118. for e := range c.errors {
  119. err = e
  120. }
  121. if e := c.client.Close(); e != nil {
  122. err = e
  123. }
  124. })
  125. return
  126. }
  127. // Consume implements ConsumerGroup.
  128. func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
  129. // Ensure group is not closed
  130. select {
  131. case <-c.closed:
  132. return ErrClosedConsumerGroup
  133. default:
  134. }
  135. c.lock.Lock()
  136. defer c.lock.Unlock()
  137. // Quick exit when no topics are provided
  138. if len(topics) == 0 {
  139. return fmt.Errorf("no topics provided")
  140. }
  141. // Refresh metadata for requested topics
  142. if err := c.client.RefreshMetadata(topics...); err != nil {
  143. return err
  144. }
  145. // Init session
  146. sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
  147. if err == ErrClosedClient {
  148. return ErrClosedConsumerGroup
  149. } else if err != nil {
  150. return err
  151. }
  152. // loop check topic partition numbers changed
  153. // will trigger rebalance when any topic partitions number had changed
  154. // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
  155. go c.loopCheckPartitionNumbers(topics, sess)
  156. // Wait for session exit signal
  157. <-sess.ctx.Done()
  158. // Gracefully release session claims
  159. return sess.release(true)
  160. }
  161. func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
  162. select {
  163. case <-c.closed:
  164. return nil, ErrClosedConsumerGroup
  165. case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
  166. }
  167. if refreshCoordinator {
  168. err := c.client.RefreshCoordinator(c.groupID)
  169. if err != nil {
  170. return c.retryNewSession(ctx, topics, handler, retries, true)
  171. }
  172. }
  173. return c.newSession(ctx, topics, handler, retries-1)
  174. }
  175. func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
  176. coordinator, err := c.client.Coordinator(c.groupID)
  177. if err != nil {
  178. if retries <= 0 {
  179. return nil, err
  180. }
  181. return c.retryNewSession(ctx, topics, handler, retries, true)
  182. }
  183. var (
  184. metricRegistry = c.config.MetricRegistry
  185. consumerGroupJoinTotal metrics.Counter
  186. consumerGroupJoinFailed metrics.Counter
  187. consumerGroupSyncTotal metrics.Counter
  188. consumerGroupSyncFailed metrics.Counter
  189. )
  190. if metricRegistry != nil {
  191. consumerGroupJoinTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-total-%s", c.groupID), metricRegistry)
  192. consumerGroupJoinFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-failed-%s", c.groupID), metricRegistry)
  193. consumerGroupSyncTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-total-%s", c.groupID), metricRegistry)
  194. consumerGroupSyncFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-failed-%s", c.groupID), metricRegistry)
  195. }
  196. // Join consumer group
  197. join, err := c.joinGroupRequest(coordinator, topics)
  198. if consumerGroupJoinTotal != nil {
  199. consumerGroupJoinTotal.Inc(1)
  200. }
  201. if err != nil {
  202. _ = coordinator.Close()
  203. if consumerGroupJoinFailed != nil {
  204. consumerGroupJoinFailed.Inc(1)
  205. }
  206. return nil, err
  207. }
  208. if join.Err != ErrNoError {
  209. if consumerGroupJoinFailed != nil {
  210. consumerGroupJoinFailed.Inc(1)
  211. }
  212. }
  213. switch join.Err {
  214. case ErrNoError:
  215. c.memberID = join.MemberId
  216. case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
  217. c.memberID = ""
  218. return c.newSession(ctx, topics, handler, retries)
  219. case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
  220. if retries <= 0 {
  221. return nil, join.Err
  222. }
  223. return c.retryNewSession(ctx, topics, handler, retries, true)
  224. case ErrRebalanceInProgress: // retry after backoff
  225. if retries <= 0 {
  226. return nil, join.Err
  227. }
  228. return c.retryNewSession(ctx, topics, handler, retries, false)
  229. default:
  230. return nil, join.Err
  231. }
  232. // Prepare distribution plan if we joined as the leader
  233. var plan BalanceStrategyPlan
  234. if join.LeaderId == join.MemberId {
  235. members, err := join.GetMembers()
  236. if err != nil {
  237. return nil, err
  238. }
  239. plan, err = c.balance(members)
  240. if err != nil {
  241. return nil, err
  242. }
  243. }
  244. // Sync consumer group
  245. groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
  246. if consumerGroupSyncTotal != nil {
  247. consumerGroupSyncTotal.Inc(1)
  248. }
  249. if err != nil {
  250. _ = coordinator.Close()
  251. if consumerGroupSyncFailed != nil {
  252. consumerGroupSyncFailed.Inc(1)
  253. }
  254. return nil, err
  255. }
  256. if groupRequest.Err != ErrNoError {
  257. if consumerGroupSyncFailed != nil {
  258. consumerGroupSyncFailed.Inc(1)
  259. }
  260. }
  261. switch groupRequest.Err {
  262. case ErrNoError:
  263. case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
  264. c.memberID = ""
  265. return c.newSession(ctx, topics, handler, retries)
  266. case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
  267. if retries <= 0 {
  268. return nil, groupRequest.Err
  269. }
  270. return c.retryNewSession(ctx, topics, handler, retries, true)
  271. case ErrRebalanceInProgress: // retry after backoff
  272. if retries <= 0 {
  273. return nil, groupRequest.Err
  274. }
  275. return c.retryNewSession(ctx, topics, handler, retries, false)
  276. default:
  277. return nil, groupRequest.Err
  278. }
  279. // Retrieve and sort claims
  280. var claims map[string][]int32
  281. if len(groupRequest.MemberAssignment) > 0 {
  282. members, err := groupRequest.GetMemberAssignment()
  283. if err != nil {
  284. return nil, err
  285. }
  286. claims = members.Topics
  287. // in the case of stateful balance strategies, hold on to the returned
  288. // assignment metadata, otherwise, reset the statically defined conusmer
  289. // group metadata
  290. if members.UserData != nil {
  291. c.userData = members.UserData
  292. } else {
  293. c.userData = c.config.Consumer.Group.Member.UserData
  294. }
  295. for _, partitions := range claims {
  296. sort.Sort(int32Slice(partitions))
  297. }
  298. }
  299. return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
  300. }
  301. func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
  302. req := &JoinGroupRequest{
  303. GroupId: c.groupID,
  304. MemberId: c.memberID,
  305. SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
  306. ProtocolType: "consumer",
  307. }
  308. if c.config.Version.IsAtLeast(V0_10_1_0) {
  309. req.Version = 1
  310. req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
  311. }
  312. meta := &ConsumerGroupMemberMetadata{
  313. Topics: topics,
  314. UserData: c.userData,
  315. }
  316. strategy := c.config.Consumer.Group.Rebalance.Strategy
  317. if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
  318. return nil, err
  319. }
  320. return coordinator.JoinGroup(req)
  321. }
  322. func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
  323. req := &SyncGroupRequest{
  324. GroupId: c.groupID,
  325. MemberId: c.memberID,
  326. GenerationId: generationID,
  327. }
  328. strategy := c.config.Consumer.Group.Rebalance.Strategy
  329. for memberID, topics := range plan {
  330. assignment := &ConsumerGroupMemberAssignment{Topics: topics}
  331. userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
  332. if err != nil {
  333. return nil, err
  334. }
  335. assignment.UserData = userDataBytes
  336. if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
  337. return nil, err
  338. }
  339. }
  340. return coordinator.SyncGroup(req)
  341. }
  342. func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
  343. req := &HeartbeatRequest{
  344. GroupId: c.groupID,
  345. MemberId: memberID,
  346. GenerationId: generationID,
  347. }
  348. return coordinator.Heartbeat(req)
  349. }
  350. func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
  351. topics := make(map[string][]int32)
  352. for _, meta := range members {
  353. for _, topic := range meta.Topics {
  354. topics[topic] = nil
  355. }
  356. }
  357. for topic := range topics {
  358. partitions, err := c.client.Partitions(topic)
  359. if err != nil {
  360. return nil, err
  361. }
  362. topics[topic] = partitions
  363. }
  364. strategy := c.config.Consumer.Group.Rebalance.Strategy
  365. return strategy.Plan(members, topics)
  366. }
  367. // Leaves the cluster, called by Close.
  368. func (c *consumerGroup) leave() error {
  369. c.lock.Lock()
  370. defer c.lock.Unlock()
  371. if c.memberID == "" {
  372. return nil
  373. }
  374. coordinator, err := c.client.Coordinator(c.groupID)
  375. if err != nil {
  376. return err
  377. }
  378. resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
  379. GroupId: c.groupID,
  380. MemberId: c.memberID,
  381. })
  382. if err != nil {
  383. _ = coordinator.Close()
  384. return err
  385. }
  386. // Unset memberID
  387. c.memberID = ""
  388. // Check response
  389. switch resp.Err {
  390. case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
  391. return nil
  392. default:
  393. return resp.Err
  394. }
  395. }
  396. func (c *consumerGroup) handleError(err error, topic string, partition int32) {
  397. if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
  398. err = &ConsumerError{
  399. Topic: topic,
  400. Partition: partition,
  401. Err: err,
  402. }
  403. }
  404. if !c.config.Consumer.Return.Errors {
  405. Logger.Println(err)
  406. return
  407. }
  408. select {
  409. case <-c.closed:
  410. // consumer is closed
  411. return
  412. default:
  413. }
  414. select {
  415. case c.errors <- err:
  416. default:
  417. // no error listener
  418. }
  419. }
  420. func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
  421. pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
  422. defer session.cancel()
  423. defer pause.Stop()
  424. var oldTopicToPartitionNum map[string]int
  425. var err error
  426. if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
  427. return
  428. }
  429. for {
  430. if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
  431. return
  432. } else {
  433. for topic, num := range oldTopicToPartitionNum {
  434. if newTopicToPartitionNum[topic] != num {
  435. return // trigger the end of the session on exit
  436. }
  437. }
  438. }
  439. select {
  440. case <-pause.C:
  441. case <-session.ctx.Done():
  442. Logger.Printf("loop check partition number coroutine will exit, topics %s", topics)
  443. // if session closed by other, should be exited
  444. return
  445. case <-c.closed:
  446. return
  447. }
  448. }
  449. }
  450. func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
  451. topicToPartitionNum := make(map[string]int, len(topics))
  452. for _, topic := range topics {
  453. if partitionNum, err := c.client.Partitions(topic); err != nil {
  454. Logger.Printf("Consumer Group topic %s get partition number failed %v", topic, err)
  455. return nil, err
  456. } else {
  457. topicToPartitionNum[topic] = len(partitionNum)
  458. }
  459. }
  460. return topicToPartitionNum, nil
  461. }
  462. // --------------------------------------------------------------------
  463. // ConsumerGroupSession represents a consumer group member session.
  464. type ConsumerGroupSession interface {
  465. // Claims returns information about the claimed partitions by topic.
  466. Claims() map[string][]int32
  467. // MemberID returns the cluster member ID.
  468. MemberID() string
  469. // GenerationID returns the current generation ID.
  470. GenerationID() int32
  471. // MarkOffset marks the provided offset, alongside a metadata string
  472. // that represents the state of the partition consumer at that point in time. The
  473. // metadata string can be used by another consumer to restore that state, so it
  474. // can resume consumption.
  475. //
  476. // To follow upstream conventions, you are expected to mark the offset of the
  477. // next message to read, not the last message read. Thus, when calling `MarkOffset`
  478. // you should typically add one to the offset of the last consumed message.
  479. //
  480. // Note: calling MarkOffset does not necessarily commit the offset to the backend
  481. // store immediately for efficiency reasons, and it may never be committed if
  482. // your application crashes. This means that you may end up processing the same
  483. // message twice, and your processing should ideally be idempotent.
  484. MarkOffset(topic string, partition int32, offset int64, metadata string)
  485. // Commit the offset to the backend
  486. //
  487. // Note: calling Commit performs a blocking synchronous operation.
  488. Commit()
  489. // ResetOffset resets to the provided offset, alongside a metadata string that
  490. // represents the state of the partition consumer at that point in time. Reset
  491. // acts as a counterpart to MarkOffset, the difference being that it allows to
  492. // reset an offset to an earlier or smaller value, where MarkOffset only
  493. // allows incrementing the offset. cf MarkOffset for more details.
  494. ResetOffset(topic string, partition int32, offset int64, metadata string)
  495. // MarkMessage marks a message as consumed.
  496. MarkMessage(msg *ConsumerMessage, metadata string)
  497. // Context returns the session context.
  498. Context() context.Context
  499. }
  500. type consumerGroupSession struct {
  501. parent *consumerGroup
  502. memberID string
  503. generationID int32
  504. handler ConsumerGroupHandler
  505. claims map[string][]int32
  506. offsets *offsetManager
  507. ctx context.Context
  508. cancel func()
  509. waitGroup sync.WaitGroup
  510. releaseOnce sync.Once
  511. hbDying, hbDead chan none
  512. }
  513. func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
  514. // init offset manager
  515. offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
  516. if err != nil {
  517. return nil, err
  518. }
  519. // init context
  520. ctx, cancel := context.WithCancel(ctx)
  521. // init session
  522. sess := &consumerGroupSession{
  523. parent: parent,
  524. memberID: memberID,
  525. generationID: generationID,
  526. handler: handler,
  527. offsets: offsets,
  528. claims: claims,
  529. ctx: ctx,
  530. cancel: cancel,
  531. hbDying: make(chan none),
  532. hbDead: make(chan none),
  533. }
  534. // start heartbeat loop
  535. go sess.heartbeatLoop()
  536. // create a POM for each claim
  537. for topic, partitions := range claims {
  538. for _, partition := range partitions {
  539. pom, err := offsets.ManagePartition(topic, partition)
  540. if err != nil {
  541. _ = sess.release(false)
  542. return nil, err
  543. }
  544. // handle POM errors
  545. go func(topic string, partition int32) {
  546. for err := range pom.Errors() {
  547. sess.parent.handleError(err, topic, partition)
  548. }
  549. }(topic, partition)
  550. }
  551. }
  552. // perform setup
  553. if err := handler.Setup(sess); err != nil {
  554. _ = sess.release(true)
  555. return nil, err
  556. }
  557. // start consuming
  558. for topic, partitions := range claims {
  559. for _, partition := range partitions {
  560. sess.waitGroup.Add(1)
  561. go func(topic string, partition int32) {
  562. defer sess.waitGroup.Done()
  563. // cancel the as session as soon as the first
  564. // goroutine exits
  565. defer sess.cancel()
  566. // consume a single topic/partition, blocking
  567. sess.consume(topic, partition)
  568. }(topic, partition)
  569. }
  570. }
  571. return sess, nil
  572. }
  573. func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
  574. func (s *consumerGroupSession) MemberID() string { return s.memberID }
  575. func (s *consumerGroupSession) GenerationID() int32 { return s.generationID }
  576. func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
  577. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  578. pom.MarkOffset(offset, metadata)
  579. }
  580. }
  581. func (s *consumerGroupSession) Commit() {
  582. s.offsets.Commit()
  583. }
  584. func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
  585. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  586. pom.ResetOffset(offset, metadata)
  587. }
  588. }
  589. func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
  590. s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
  591. }
  592. func (s *consumerGroupSession) Context() context.Context {
  593. return s.ctx
  594. }
  595. func (s *consumerGroupSession) consume(topic string, partition int32) {
  596. // quick exit if rebalance is due
  597. select {
  598. case <-s.ctx.Done():
  599. return
  600. case <-s.parent.closed:
  601. return
  602. default:
  603. }
  604. // get next offset
  605. offset := s.parent.config.Consumer.Offsets.Initial
  606. if pom := s.offsets.findPOM(topic, partition); pom != nil {
  607. offset, _ = pom.NextOffset()
  608. }
  609. // create new claim
  610. claim, err := newConsumerGroupClaim(s, topic, partition, offset)
  611. if err != nil {
  612. s.parent.handleError(err, topic, partition)
  613. return
  614. }
  615. // handle errors
  616. go func() {
  617. for err := range claim.Errors() {
  618. s.parent.handleError(err, topic, partition)
  619. }
  620. }()
  621. // trigger close when session is done
  622. go func() {
  623. select {
  624. case <-s.ctx.Done():
  625. case <-s.parent.closed:
  626. }
  627. claim.AsyncClose()
  628. }()
  629. // start processing
  630. if err := s.handler.ConsumeClaim(s, claim); err != nil {
  631. s.parent.handleError(err, topic, partition)
  632. }
  633. // ensure consumer is closed & drained
  634. claim.AsyncClose()
  635. for _, err := range claim.waitClosed() {
  636. s.parent.handleError(err, topic, partition)
  637. }
  638. }
  639. func (s *consumerGroupSession) release(withCleanup bool) (err error) {
  640. // signal release, stop heartbeat
  641. s.cancel()
  642. // wait for consumers to exit
  643. s.waitGroup.Wait()
  644. // perform release
  645. s.releaseOnce.Do(func() {
  646. if withCleanup {
  647. if e := s.handler.Cleanup(s); e != nil {
  648. s.parent.handleError(e, "", -1)
  649. err = e
  650. }
  651. }
  652. if e := s.offsets.Close(); e != nil {
  653. err = e
  654. }
  655. close(s.hbDying)
  656. <-s.hbDead
  657. })
  658. return
  659. }
  660. func (s *consumerGroupSession) heartbeatLoop() {
  661. defer close(s.hbDead)
  662. defer s.cancel() // trigger the end of the session on exit
  663. pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
  664. defer pause.Stop()
  665. retryBackoff := time.NewTimer(s.parent.config.Metadata.Retry.Backoff)
  666. defer retryBackoff.Stop()
  667. retries := s.parent.config.Metadata.Retry.Max
  668. for {
  669. coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
  670. if err != nil {
  671. if retries <= 0 {
  672. s.parent.handleError(err, "", -1)
  673. return
  674. }
  675. retryBackoff.Reset(s.parent.config.Metadata.Retry.Backoff)
  676. select {
  677. case <-s.hbDying:
  678. return
  679. case <-retryBackoff.C:
  680. retries--
  681. }
  682. continue
  683. }
  684. resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
  685. if err != nil {
  686. _ = coordinator.Close()
  687. if retries <= 0 {
  688. s.parent.handleError(err, "", -1)
  689. return
  690. }
  691. retries--
  692. continue
  693. }
  694. switch resp.Err {
  695. case ErrNoError:
  696. retries = s.parent.config.Metadata.Retry.Max
  697. case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
  698. return
  699. default:
  700. s.parent.handleError(resp.Err, "", -1)
  701. return
  702. }
  703. select {
  704. case <-pause.C:
  705. case <-s.hbDying:
  706. return
  707. }
  708. }
  709. }
  710. // --------------------------------------------------------------------
  711. // ConsumerGroupHandler instances are used to handle individual topic/partition claims.
  712. // It also provides hooks for your consumer group session life-cycle and allow you to
  713. // trigger logic before or after the consume loop(s).
  714. //
  715. // PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
  716. // ensure that all state is safely protected against race conditions.
  717. type ConsumerGroupHandler interface {
  718. // Setup is run at the beginning of a new session, before ConsumeClaim.
  719. Setup(ConsumerGroupSession) error
  720. // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
  721. // but before the offsets are committed for the very last time.
  722. Cleanup(ConsumerGroupSession) error
  723. // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
  724. // Once the Messages() channel is closed, the Handler must finish its processing
  725. // loop and exit.
  726. ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
  727. }
  728. // ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
  729. type ConsumerGroupClaim interface {
  730. // Topic returns the consumed topic name.
  731. Topic() string
  732. // Partition returns the consumed partition.
  733. Partition() int32
  734. // InitialOffset returns the initial offset that was used as a starting point for this claim.
  735. InitialOffset() int64
  736. // HighWaterMarkOffset returns the high water mark offset of the partition,
  737. // i.e. the offset that will be used for the next message that will be produced.
  738. // You can use this to determine how far behind the processing is.
  739. HighWaterMarkOffset() int64
  740. // Messages returns the read channel for the messages that are returned by
  741. // the broker. The messages channel will be closed when a new rebalance cycle
  742. // is due. You must finish processing and mark offsets within
  743. // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
  744. // re-assigned to another group member.
  745. Messages() <-chan *ConsumerMessage
  746. }
  747. type consumerGroupClaim struct {
  748. topic string
  749. partition int32
  750. offset int64
  751. PartitionConsumer
  752. }
  753. func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
  754. pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
  755. if err == ErrOffsetOutOfRange {
  756. offset = sess.parent.config.Consumer.Offsets.Initial
  757. pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
  758. }
  759. if err != nil {
  760. return nil, err
  761. }
  762. go func() {
  763. for err := range pcm.Errors() {
  764. sess.parent.handleError(err, topic, partition)
  765. }
  766. }()
  767. return &consumerGroupClaim{
  768. topic: topic,
  769. partition: partition,
  770. offset: offset,
  771. PartitionConsumer: pcm,
  772. }, nil
  773. }
  774. func (c *consumerGroupClaim) Topic() string { return c.topic }
  775. func (c *consumerGroupClaim) Partition() int32 { return c.partition }
  776. func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
  777. // Drains messages and errors, ensures the claim is fully closed.
  778. func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
  779. go func() {
  780. for range c.Messages() {
  781. }
  782. }()
  783. for err := range c.Errors() {
  784. errs = append(errs, err)
  785. }
  786. return
  787. }