async_producer.go 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/eapache/go-resiliency/breaker"
  8. "github.com/eapache/queue"
  9. )
  10. // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
  11. // to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
  12. // and parses responses for errors. You must read from the Errors() channel or the
  13. // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
  14. // leaks and message lost: it will not be garbage-collected automatically when it passes
  15. // out of scope and buffered messages may not be flushed.
  16. type AsyncProducer interface {
  17. // AsyncClose triggers a shutdown of the producer. The shutdown has completed
  18. // when both the Errors and Successes channels have been closed. When calling
  19. // AsyncClose, you *must* continue to read from those channels in order to
  20. // drain the results of any messages in flight.
  21. AsyncClose()
  22. // Close shuts down the producer and waits for any buffered messages to be
  23. // flushed. You must call this function before a producer object passes out of
  24. // scope, as it may otherwise leak memory. You must call this before process
  25. // shutting down, or you may lose messages. You must call this before calling
  26. // Close on the underlying client.
  27. Close() error
  28. // Input is the input channel for the user to write messages to that they
  29. // wish to send.
  30. Input() chan<- *ProducerMessage
  31. // Successes is the success output channel back to the user when Return.Successes is
  32. // enabled. If Return.Successes is true, you MUST read from this channel or the
  33. // Producer will deadlock. It is suggested that you send and read messages
  34. // together in a single select statement.
  35. Successes() <-chan *ProducerMessage
  36. // Errors is the error output channel back to the user. You MUST read from this
  37. // channel or the Producer will deadlock when the channel is full. Alternatively,
  38. // you can set Producer.Return.Errors in your config to false, which prevents
  39. // errors to be returned.
  40. Errors() <-chan *ProducerError
  41. }
  42. // transactionManager keeps the state necessary to ensure idempotent production
  43. type transactionManager struct {
  44. producerID int64
  45. producerEpoch int16
  46. sequenceNumbers map[string]int32
  47. mutex sync.Mutex
  48. }
  49. const (
  50. noProducerID = -1
  51. noProducerEpoch = -1
  52. )
  53. func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
  54. key := fmt.Sprintf("%s-%d", topic, partition)
  55. t.mutex.Lock()
  56. defer t.mutex.Unlock()
  57. sequence := t.sequenceNumbers[key]
  58. t.sequenceNumbers[key] = sequence + 1
  59. return sequence, t.producerEpoch
  60. }
  61. func (t *transactionManager) bumpEpoch() {
  62. t.mutex.Lock()
  63. defer t.mutex.Unlock()
  64. t.producerEpoch++
  65. for k := range t.sequenceNumbers {
  66. t.sequenceNumbers[k] = 0
  67. }
  68. }
  69. func (t *transactionManager) getProducerID() (int64, int16) {
  70. t.mutex.Lock()
  71. defer t.mutex.Unlock()
  72. return t.producerID, t.producerEpoch
  73. }
  74. func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
  75. txnmgr := &transactionManager{
  76. producerID: noProducerID,
  77. producerEpoch: noProducerEpoch,
  78. }
  79. if conf.Producer.Idempotent {
  80. initProducerIDResponse, err := client.InitProducerID()
  81. if err != nil {
  82. return nil, err
  83. }
  84. txnmgr.producerID = initProducerIDResponse.ProducerID
  85. txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
  86. txnmgr.sequenceNumbers = make(map[string]int32)
  87. txnmgr.mutex = sync.Mutex{}
  88. Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
  89. }
  90. return txnmgr, nil
  91. }
  92. type asyncProducer struct {
  93. client Client
  94. conf *Config
  95. errors chan *ProducerError
  96. input, successes, retries chan *ProducerMessage
  97. inFlight sync.WaitGroup
  98. brokers map[*Broker]*brokerProducer
  99. brokerRefs map[*brokerProducer]int
  100. brokerLock sync.Mutex
  101. txnmgr *transactionManager
  102. }
  103. // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
  104. func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  105. client, err := NewClient(addrs, conf)
  106. if err != nil {
  107. return nil, err
  108. }
  109. return newAsyncProducer(client)
  110. }
  111. // NewAsyncProducerFromClient creates a new Producer using the given client. It is still
  112. // necessary to call Close() on the underlying client when shutting down this producer.
  113. func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
  114. // For clients passed in by the client, ensure we don't
  115. // call Close() on it.
  116. cli := &nopCloserClient{client}
  117. return newAsyncProducer(cli)
  118. }
  119. func newAsyncProducer(client Client) (AsyncProducer, error) {
  120. // Check that we are not dealing with a closed Client before processing any other arguments
  121. if client.Closed() {
  122. return nil, ErrClosedClient
  123. }
  124. txnmgr, err := newTransactionManager(client.Config(), client)
  125. if err != nil {
  126. return nil, err
  127. }
  128. p := &asyncProducer{
  129. client: client,
  130. conf: client.Config(),
  131. errors: make(chan *ProducerError),
  132. input: make(chan *ProducerMessage),
  133. successes: make(chan *ProducerMessage),
  134. retries: make(chan *ProducerMessage),
  135. brokers: make(map[*Broker]*brokerProducer),
  136. brokerRefs: make(map[*brokerProducer]int),
  137. txnmgr: txnmgr,
  138. }
  139. // launch our singleton dispatchers
  140. go withRecover(p.dispatcher)
  141. go withRecover(p.retryHandler)
  142. return p, nil
  143. }
  144. type flagSet int8
  145. const (
  146. syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
  147. fin // final message from partitionProducer to brokerProducer and back
  148. shutdown // start the shutdown process
  149. )
  150. // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
  151. type ProducerMessage struct {
  152. Topic string // The Kafka topic for this message.
  153. // The partitioning key for this message. Pre-existing Encoders include
  154. // StringEncoder and ByteEncoder.
  155. Key Encoder
  156. // The actual message to store in Kafka. Pre-existing Encoders include
  157. // StringEncoder and ByteEncoder.
  158. Value Encoder
  159. // The headers are key-value pairs that are transparently passed
  160. // by Kafka between producers and consumers.
  161. Headers []RecordHeader
  162. // This field is used to hold arbitrary data you wish to include so it
  163. // will be available when receiving on the Successes and Errors channels.
  164. // Sarama completely ignores this field and is only to be used for
  165. // pass-through data.
  166. Metadata interface{}
  167. // Below this point are filled in by the producer as the message is processed
  168. // Offset is the offset of the message stored on the broker. This is only
  169. // guaranteed to be defined if the message was successfully delivered and
  170. // RequiredAcks is not NoResponse.
  171. Offset int64
  172. // Partition is the partition that the message was sent to. This is only
  173. // guaranteed to be defined if the message was successfully delivered.
  174. Partition int32
  175. // Timestamp can vary in behavior depending on broker configuration, being
  176. // in either one of the CreateTime or LogAppendTime modes (default CreateTime),
  177. // and requiring version at least 0.10.0.
  178. //
  179. // When configured to CreateTime, the timestamp is specified by the producer
  180. // either by explicitly setting this field, or when the message is added
  181. // to a produce set.
  182. //
  183. // When configured to LogAppendTime, the timestamp assigned to the message
  184. // by the broker. This is only guaranteed to be defined if the message was
  185. // successfully delivered and RequiredAcks is not NoResponse.
  186. Timestamp time.Time
  187. retries int
  188. flags flagSet
  189. expectation chan *ProducerError
  190. sequenceNumber int32
  191. producerEpoch int16
  192. hasSequence bool
  193. }
  194. const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
  195. func (m *ProducerMessage) byteSize(version int) int {
  196. var size int
  197. if version >= 2 {
  198. size = maximumRecordOverhead
  199. for _, h := range m.Headers {
  200. size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
  201. }
  202. } else {
  203. size = producerMessageOverhead
  204. }
  205. if m.Key != nil {
  206. size += m.Key.Length()
  207. }
  208. if m.Value != nil {
  209. size += m.Value.Length()
  210. }
  211. return size
  212. }
  213. func (m *ProducerMessage) clear() {
  214. m.flags = 0
  215. m.retries = 0
  216. m.sequenceNumber = 0
  217. m.producerEpoch = 0
  218. m.hasSequence = false
  219. }
  220. // ProducerError is the type of error generated when the producer fails to deliver a message.
  221. // It contains the original ProducerMessage as well as the actual error value.
  222. type ProducerError struct {
  223. Msg *ProducerMessage
  224. Err error
  225. }
  226. func (pe ProducerError) Error() string {
  227. return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
  228. }
  229. func (pe ProducerError) Unwrap() error {
  230. return pe.Err
  231. }
  232. // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
  233. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
  234. // when closing a producer.
  235. type ProducerErrors []*ProducerError
  236. func (pe ProducerErrors) Error() string {
  237. return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
  238. }
  239. func (p *asyncProducer) Errors() <-chan *ProducerError {
  240. return p.errors
  241. }
  242. func (p *asyncProducer) Successes() <-chan *ProducerMessage {
  243. return p.successes
  244. }
  245. func (p *asyncProducer) Input() chan<- *ProducerMessage {
  246. return p.input
  247. }
  248. func (p *asyncProducer) Close() error {
  249. p.AsyncClose()
  250. if p.conf.Producer.Return.Successes {
  251. go withRecover(func() {
  252. for range p.successes {
  253. }
  254. })
  255. }
  256. var errors ProducerErrors
  257. if p.conf.Producer.Return.Errors {
  258. for event := range p.errors {
  259. errors = append(errors, event)
  260. }
  261. } else {
  262. <-p.errors
  263. }
  264. if len(errors) > 0 {
  265. return errors
  266. }
  267. return nil
  268. }
  269. func (p *asyncProducer) AsyncClose() {
  270. go withRecover(p.shutdown)
  271. }
  272. // singleton
  273. // dispatches messages by topic
  274. func (p *asyncProducer) dispatcher() {
  275. handlers := make(map[string]chan<- *ProducerMessage)
  276. shuttingDown := false
  277. for msg := range p.input {
  278. if msg == nil {
  279. Logger.Println("Something tried to send a nil message, it was ignored.")
  280. continue
  281. }
  282. if msg.flags&shutdown != 0 {
  283. shuttingDown = true
  284. p.inFlight.Done()
  285. continue
  286. } else if msg.retries == 0 {
  287. if shuttingDown {
  288. // we can't just call returnError here because that decrements the wait group,
  289. // which hasn't been incremented yet for this message, and shouldn't be
  290. pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
  291. if p.conf.Producer.Return.Errors {
  292. p.errors <- pErr
  293. } else {
  294. Logger.Println(pErr)
  295. }
  296. continue
  297. }
  298. p.inFlight.Add(1)
  299. }
  300. for _, interceptor := range p.conf.Producer.Interceptors {
  301. msg.safelyApplyInterceptor(interceptor)
  302. }
  303. version := 1
  304. if p.conf.Version.IsAtLeast(V0_11_0_0) {
  305. version = 2
  306. } else if msg.Headers != nil {
  307. p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
  308. continue
  309. }
  310. if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
  311. p.returnError(msg, ErrMessageSizeTooLarge)
  312. continue
  313. }
  314. handler := handlers[msg.Topic]
  315. if handler == nil {
  316. handler = p.newTopicProducer(msg.Topic)
  317. handlers[msg.Topic] = handler
  318. }
  319. handler <- msg
  320. }
  321. for _, handler := range handlers {
  322. close(handler)
  323. }
  324. }
  325. // one per topic
  326. // partitions messages, then dispatches them by partition
  327. type topicProducer struct {
  328. parent *asyncProducer
  329. topic string
  330. input <-chan *ProducerMessage
  331. breaker *breaker.Breaker
  332. handlers map[int32]chan<- *ProducerMessage
  333. partitioner Partitioner
  334. }
  335. func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
  336. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  337. tp := &topicProducer{
  338. parent: p,
  339. topic: topic,
  340. input: input,
  341. breaker: breaker.New(3, 1, 10*time.Second),
  342. handlers: make(map[int32]chan<- *ProducerMessage),
  343. partitioner: p.conf.Producer.Partitioner(topic),
  344. }
  345. go withRecover(tp.dispatch)
  346. return input
  347. }
  348. func (tp *topicProducer) dispatch() {
  349. for msg := range tp.input {
  350. if msg.retries == 0 {
  351. if err := tp.partitionMessage(msg); err != nil {
  352. tp.parent.returnError(msg, err)
  353. continue
  354. }
  355. }
  356. handler := tp.handlers[msg.Partition]
  357. if handler == nil {
  358. handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
  359. tp.handlers[msg.Partition] = handler
  360. }
  361. handler <- msg
  362. }
  363. for _, handler := range tp.handlers {
  364. close(handler)
  365. }
  366. }
  367. func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
  368. var partitions []int32
  369. err := tp.breaker.Run(func() (err error) {
  370. requiresConsistency := false
  371. if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
  372. requiresConsistency = ep.MessageRequiresConsistency(msg)
  373. } else {
  374. requiresConsistency = tp.partitioner.RequiresConsistency()
  375. }
  376. if requiresConsistency {
  377. partitions, err = tp.parent.client.Partitions(msg.Topic)
  378. } else {
  379. partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
  380. }
  381. return
  382. })
  383. if err != nil {
  384. return err
  385. }
  386. numPartitions := int32(len(partitions))
  387. if numPartitions == 0 {
  388. return ErrLeaderNotAvailable
  389. }
  390. choice, err := tp.partitioner.Partition(msg, numPartitions)
  391. if err != nil {
  392. return err
  393. } else if choice < 0 || choice >= numPartitions {
  394. return ErrInvalidPartition
  395. }
  396. msg.Partition = partitions[choice]
  397. return nil
  398. }
  399. // one per partition per topic
  400. // dispatches messages to the appropriate broker
  401. // also responsible for maintaining message order during retries
  402. type partitionProducer struct {
  403. parent *asyncProducer
  404. topic string
  405. partition int32
  406. input <-chan *ProducerMessage
  407. leader *Broker
  408. breaker *breaker.Breaker
  409. brokerProducer *brokerProducer
  410. // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  411. // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  412. // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
  413. // therefore whether our buffer is complete and safe to flush)
  414. highWatermark int
  415. retryState []partitionRetryState
  416. }
  417. type partitionRetryState struct {
  418. buf []*ProducerMessage
  419. expectChaser bool
  420. }
  421. func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
  422. input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
  423. pp := &partitionProducer{
  424. parent: p,
  425. topic: topic,
  426. partition: partition,
  427. input: input,
  428. breaker: breaker.New(3, 1, 10*time.Second),
  429. retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
  430. }
  431. go withRecover(pp.dispatch)
  432. return input
  433. }
  434. func (pp *partitionProducer) backoff(retries int) {
  435. var backoff time.Duration
  436. if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
  437. maxRetries := pp.parent.conf.Producer.Retry.Max
  438. backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
  439. } else {
  440. backoff = pp.parent.conf.Producer.Retry.Backoff
  441. }
  442. if backoff > 0 {
  443. time.Sleep(backoff)
  444. }
  445. }
  446. func (pp *partitionProducer) dispatch() {
  447. // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
  448. // on the first message
  449. pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
  450. if pp.leader != nil {
  451. pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
  452. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  453. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  454. }
  455. defer func() {
  456. if pp.brokerProducer != nil {
  457. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  458. }
  459. }()
  460. for msg := range pp.input {
  461. if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
  462. select {
  463. case <-pp.brokerProducer.abandoned:
  464. // a message on the abandoned channel means that our current broker selection is out of date
  465. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  466. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  467. pp.brokerProducer = nil
  468. time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
  469. default:
  470. // producer connection is still open.
  471. }
  472. }
  473. if msg.retries > pp.highWatermark {
  474. // a new, higher, retry level; handle it and then back off
  475. pp.newHighWatermark(msg.retries)
  476. pp.backoff(msg.retries)
  477. } else if pp.highWatermark > 0 {
  478. // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
  479. if msg.retries < pp.highWatermark {
  480. // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
  481. if msg.flags&fin == fin {
  482. pp.retryState[msg.retries].expectChaser = false
  483. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  484. } else {
  485. pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
  486. }
  487. continue
  488. } else if msg.flags&fin == fin {
  489. // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
  490. // meaning this retry level is done and we can go down (at least) one level and flush that
  491. pp.retryState[pp.highWatermark].expectChaser = false
  492. pp.flushRetryBuffers()
  493. pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
  494. continue
  495. }
  496. }
  497. // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
  498. // without breaking any of our ordering guarantees
  499. if pp.brokerProducer == nil {
  500. if err := pp.updateLeader(); err != nil {
  501. pp.parent.returnError(msg, err)
  502. pp.backoff(msg.retries)
  503. continue
  504. }
  505. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  506. }
  507. // Now that we know we have a broker to actually try and send this message to, generate the sequence
  508. // number for it.
  509. // All messages being retried (sent or not) have already had their retry count updated
  510. // Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
  511. if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
  512. msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
  513. msg.hasSequence = true
  514. }
  515. pp.brokerProducer.input <- msg
  516. }
  517. }
  518. func (pp *partitionProducer) newHighWatermark(hwm int) {
  519. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
  520. pp.highWatermark = hwm
  521. // send off a fin so that we know when everything "in between" has made it
  522. // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
  523. pp.retryState[pp.highWatermark].expectChaser = true
  524. pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
  525. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
  526. // a new HWM means that our current broker selection is out of date
  527. Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  528. pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
  529. pp.brokerProducer = nil
  530. }
  531. func (pp *partitionProducer) flushRetryBuffers() {
  532. Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  533. for {
  534. pp.highWatermark--
  535. if pp.brokerProducer == nil {
  536. if err := pp.updateLeader(); err != nil {
  537. pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
  538. goto flushDone
  539. }
  540. Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
  541. }
  542. for _, msg := range pp.retryState[pp.highWatermark].buf {
  543. pp.brokerProducer.input <- msg
  544. }
  545. flushDone:
  546. pp.retryState[pp.highWatermark].buf = nil
  547. if pp.retryState[pp.highWatermark].expectChaser {
  548. Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
  549. break
  550. } else if pp.highWatermark == 0 {
  551. Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
  552. break
  553. }
  554. }
  555. }
  556. func (pp *partitionProducer) updateLeader() error {
  557. return pp.breaker.Run(func() (err error) {
  558. if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
  559. return err
  560. }
  561. if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
  562. return err
  563. }
  564. pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
  565. pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
  566. pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
  567. return nil
  568. })
  569. }
  570. // one per broker; also constructs an associated flusher
  571. func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
  572. var (
  573. input = make(chan *ProducerMessage)
  574. bridge = make(chan *produceSet)
  575. responses = make(chan *brokerProducerResponse)
  576. )
  577. bp := &brokerProducer{
  578. parent: p,
  579. broker: broker,
  580. input: input,
  581. output: bridge,
  582. responses: responses,
  583. stopchan: make(chan struct{}),
  584. buffer: newProduceSet(p),
  585. currentRetries: make(map[string]map[int32]error),
  586. }
  587. go withRecover(bp.run)
  588. // minimal bridge to make the network response `select`able
  589. go withRecover(func() {
  590. for set := range bridge {
  591. request := set.buildRequest()
  592. response, err := broker.Produce(request)
  593. responses <- &brokerProducerResponse{
  594. set: set,
  595. err: err,
  596. res: response,
  597. }
  598. }
  599. close(responses)
  600. })
  601. if p.conf.Producer.Retry.Max <= 0 {
  602. bp.abandoned = make(chan struct{})
  603. }
  604. return bp
  605. }
  606. type brokerProducerResponse struct {
  607. set *produceSet
  608. err error
  609. res *ProduceResponse
  610. }
  611. // groups messages together into appropriately-sized batches for sending to the broker
  612. // handles state related to retries etc
  613. type brokerProducer struct {
  614. parent *asyncProducer
  615. broker *Broker
  616. input chan *ProducerMessage
  617. output chan<- *produceSet
  618. responses <-chan *brokerProducerResponse
  619. abandoned chan struct{}
  620. stopchan chan struct{}
  621. buffer *produceSet
  622. timer <-chan time.Time
  623. timerFired bool
  624. closing error
  625. currentRetries map[string]map[int32]error
  626. }
  627. func (bp *brokerProducer) run() {
  628. var output chan<- *produceSet
  629. Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
  630. for {
  631. select {
  632. case msg, ok := <-bp.input:
  633. if !ok {
  634. Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
  635. bp.shutdown()
  636. return
  637. }
  638. if msg == nil {
  639. continue
  640. }
  641. if msg.flags&syn == syn {
  642. Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
  643. bp.broker.ID(), msg.Topic, msg.Partition)
  644. if bp.currentRetries[msg.Topic] == nil {
  645. bp.currentRetries[msg.Topic] = make(map[int32]error)
  646. }
  647. bp.currentRetries[msg.Topic][msg.Partition] = nil
  648. bp.parent.inFlight.Done()
  649. continue
  650. }
  651. if reason := bp.needsRetry(msg); reason != nil {
  652. bp.parent.retryMessage(msg, reason)
  653. if bp.closing == nil && msg.flags&fin == fin {
  654. // we were retrying this partition but we can start processing again
  655. delete(bp.currentRetries[msg.Topic], msg.Partition)
  656. Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
  657. bp.broker.ID(), msg.Topic, msg.Partition)
  658. }
  659. continue
  660. }
  661. if bp.buffer.wouldOverflow(msg) {
  662. Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
  663. if err := bp.waitForSpace(msg, false); err != nil {
  664. bp.parent.retryMessage(msg, err)
  665. continue
  666. }
  667. }
  668. if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
  669. // The epoch was reset, need to roll the buffer over
  670. Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
  671. if err := bp.waitForSpace(msg, true); err != nil {
  672. bp.parent.retryMessage(msg, err)
  673. continue
  674. }
  675. }
  676. if err := bp.buffer.add(msg); err != nil {
  677. bp.parent.returnError(msg, err)
  678. continue
  679. }
  680. if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
  681. bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
  682. }
  683. case <-bp.timer:
  684. bp.timerFired = true
  685. case output <- bp.buffer:
  686. bp.rollOver()
  687. case response, ok := <-bp.responses:
  688. if ok {
  689. bp.handleResponse(response)
  690. }
  691. case <-bp.stopchan:
  692. Logger.Printf(
  693. "producer/broker/%d run loop asked to stop\n", bp.broker.ID())
  694. return
  695. }
  696. if bp.timerFired || bp.buffer.readyToFlush() {
  697. output = bp.output
  698. } else {
  699. output = nil
  700. }
  701. }
  702. }
  703. func (bp *brokerProducer) shutdown() {
  704. for !bp.buffer.empty() {
  705. select {
  706. case response := <-bp.responses:
  707. bp.handleResponse(response)
  708. case bp.output <- bp.buffer:
  709. bp.rollOver()
  710. }
  711. }
  712. close(bp.output)
  713. for response := range bp.responses {
  714. bp.handleResponse(response)
  715. }
  716. close(bp.stopchan)
  717. Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
  718. }
  719. func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
  720. if bp.closing != nil {
  721. return bp.closing
  722. }
  723. return bp.currentRetries[msg.Topic][msg.Partition]
  724. }
  725. func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
  726. for {
  727. select {
  728. case response := <-bp.responses:
  729. bp.handleResponse(response)
  730. // handling a response can change our state, so re-check some things
  731. if reason := bp.needsRetry(msg); reason != nil {
  732. return reason
  733. } else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
  734. return nil
  735. }
  736. case bp.output <- bp.buffer:
  737. bp.rollOver()
  738. return nil
  739. }
  740. }
  741. }
  742. func (bp *brokerProducer) rollOver() {
  743. bp.timer = nil
  744. bp.timerFired = false
  745. bp.buffer = newProduceSet(bp.parent)
  746. }
  747. func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
  748. if response.err != nil {
  749. bp.handleError(response.set, response.err)
  750. } else {
  751. bp.handleSuccess(response.set, response.res)
  752. }
  753. if bp.buffer.empty() {
  754. bp.rollOver() // this can happen if the response invalidated our buffer
  755. }
  756. }
  757. func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
  758. // we iterate through the blocks in the request set, not the response, so that we notice
  759. // if the response is missing a block completely
  760. var retryTopics []string
  761. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  762. if response == nil {
  763. // this only happens when RequiredAcks is NoResponse, so we have to assume success
  764. bp.parent.returnSuccesses(pSet.msgs)
  765. return
  766. }
  767. block := response.GetBlock(topic, partition)
  768. if block == nil {
  769. bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
  770. return
  771. }
  772. switch block.Err {
  773. // Success
  774. case ErrNoError:
  775. if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
  776. for _, msg := range pSet.msgs {
  777. msg.Timestamp = block.Timestamp
  778. }
  779. }
  780. for i, msg := range pSet.msgs {
  781. msg.Offset = block.Offset + int64(i)
  782. }
  783. bp.parent.returnSuccesses(pSet.msgs)
  784. // Duplicate
  785. case ErrDuplicateSequenceNumber:
  786. bp.parent.returnSuccesses(pSet.msgs)
  787. // Retriable errors
  788. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  789. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  790. if bp.parent.conf.Producer.Retry.Max <= 0 {
  791. bp.parent.abandonBrokerConnection(bp.broker)
  792. bp.parent.returnErrors(pSet.msgs, block.Err)
  793. } else {
  794. retryTopics = append(retryTopics, topic)
  795. }
  796. // Other non-retriable errors
  797. default:
  798. if bp.parent.conf.Producer.Retry.Max <= 0 {
  799. bp.parent.abandonBrokerConnection(bp.broker)
  800. }
  801. bp.parent.returnErrors(pSet.msgs, block.Err)
  802. }
  803. })
  804. if len(retryTopics) > 0 {
  805. if bp.parent.conf.Producer.Idempotent {
  806. err := bp.parent.client.RefreshMetadata(retryTopics...)
  807. if err != nil {
  808. Logger.Printf("Failed refreshing metadata because of %v\n", err)
  809. }
  810. }
  811. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  812. block := response.GetBlock(topic, partition)
  813. if block == nil {
  814. // handled in the previous "eachPartition" loop
  815. return
  816. }
  817. switch block.Err {
  818. case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
  819. ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
  820. Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
  821. bp.broker.ID(), topic, partition, block.Err)
  822. if bp.currentRetries[topic] == nil {
  823. bp.currentRetries[topic] = make(map[int32]error)
  824. }
  825. bp.currentRetries[topic][partition] = block.Err
  826. if bp.parent.conf.Producer.Idempotent {
  827. go bp.parent.retryBatch(topic, partition, pSet, block.Err)
  828. } else {
  829. bp.parent.retryMessages(pSet.msgs, block.Err)
  830. }
  831. // dropping the following messages has the side effect of incrementing their retry count
  832. bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
  833. }
  834. })
  835. }
  836. }
  837. func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
  838. Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
  839. produceSet := newProduceSet(p)
  840. produceSet.msgs[topic] = make(map[int32]*partitionSet)
  841. produceSet.msgs[topic][partition] = pSet
  842. produceSet.bufferBytes += pSet.bufferBytes
  843. produceSet.bufferCount += len(pSet.msgs)
  844. for _, msg := range pSet.msgs {
  845. if msg.retries >= p.conf.Producer.Retry.Max {
  846. p.returnError(msg, kerr)
  847. return
  848. }
  849. msg.retries++
  850. }
  851. // it's expected that a metadata refresh has been requested prior to calling retryBatch
  852. leader, err := p.client.Leader(topic, partition)
  853. if err != nil {
  854. Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
  855. for _, msg := range pSet.msgs {
  856. p.returnError(msg, kerr)
  857. }
  858. return
  859. }
  860. bp := p.getBrokerProducer(leader)
  861. bp.output <- produceSet
  862. }
  863. func (bp *brokerProducer) handleError(sent *produceSet, err error) {
  864. switch err.(type) {
  865. case PacketEncodingError:
  866. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  867. bp.parent.returnErrors(pSet.msgs, err)
  868. })
  869. default:
  870. Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
  871. bp.parent.abandonBrokerConnection(bp.broker)
  872. _ = bp.broker.Close()
  873. bp.closing = err
  874. sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  875. bp.parent.retryMessages(pSet.msgs, err)
  876. })
  877. bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
  878. bp.parent.retryMessages(pSet.msgs, err)
  879. })
  880. bp.rollOver()
  881. }
  882. }
  883. // singleton
  884. // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
  885. // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
  886. func (p *asyncProducer) retryHandler() {
  887. var msg *ProducerMessage
  888. buf := queue.New()
  889. for {
  890. if buf.Length() == 0 {
  891. msg = <-p.retries
  892. } else {
  893. select {
  894. case msg = <-p.retries:
  895. case p.input <- buf.Peek().(*ProducerMessage):
  896. buf.Remove()
  897. continue
  898. }
  899. }
  900. if msg == nil {
  901. return
  902. }
  903. buf.Add(msg)
  904. }
  905. }
  906. // utility functions
  907. func (p *asyncProducer) shutdown() {
  908. Logger.Println("Producer shutting down.")
  909. p.inFlight.Add(1)
  910. p.input <- &ProducerMessage{flags: shutdown}
  911. p.inFlight.Wait()
  912. err := p.client.Close()
  913. if err != nil {
  914. Logger.Println("producer/shutdown failed to close the embedded client:", err)
  915. }
  916. close(p.input)
  917. close(p.retries)
  918. close(p.errors)
  919. close(p.successes)
  920. }
  921. func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
  922. // We need to reset the producer ID epoch if we set a sequence number on it, because the broker
  923. // will never see a message with this number, so we can never continue the sequence.
  924. if msg.hasSequence {
  925. Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
  926. p.txnmgr.bumpEpoch()
  927. }
  928. msg.clear()
  929. pErr := &ProducerError{Msg: msg, Err: err}
  930. if p.conf.Producer.Return.Errors {
  931. p.errors <- pErr
  932. } else {
  933. Logger.Println(pErr)
  934. }
  935. p.inFlight.Done()
  936. }
  937. func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
  938. for _, msg := range batch {
  939. p.returnError(msg, err)
  940. }
  941. }
  942. func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
  943. for _, msg := range batch {
  944. if p.conf.Producer.Return.Successes {
  945. msg.clear()
  946. p.successes <- msg
  947. }
  948. p.inFlight.Done()
  949. }
  950. }
  951. func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
  952. if msg.retries >= p.conf.Producer.Retry.Max {
  953. p.returnError(msg, err)
  954. } else {
  955. msg.retries++
  956. p.retries <- msg
  957. }
  958. }
  959. func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
  960. for _, msg := range batch {
  961. p.retryMessage(msg, err)
  962. }
  963. }
  964. func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
  965. p.brokerLock.Lock()
  966. defer p.brokerLock.Unlock()
  967. bp := p.brokers[broker]
  968. if bp == nil {
  969. bp = p.newBrokerProducer(broker)
  970. p.brokers[broker] = bp
  971. p.brokerRefs[bp] = 0
  972. }
  973. p.brokerRefs[bp]++
  974. return bp
  975. }
  976. func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
  977. p.brokerLock.Lock()
  978. defer p.brokerLock.Unlock()
  979. p.brokerRefs[bp]--
  980. if p.brokerRefs[bp] == 0 {
  981. close(bp.input)
  982. delete(p.brokerRefs, bp)
  983. if p.brokers[broker] == bp {
  984. delete(p.brokers, broker)
  985. }
  986. }
  987. }
  988. func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
  989. p.brokerLock.Lock()
  990. defer p.brokerLock.Unlock()
  991. bc, ok := p.brokers[broker]
  992. if ok && bc.abandoned != nil {
  993. close(bc.abandoned)
  994. }
  995. delete(p.brokers, broker)
  996. }