partitioner.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package sarama
  2. import (
  3. "hash"
  4. "hash/fnv"
  5. "math/rand"
  6. "time"
  7. )
  8. // Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1],
  9. // decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
  10. // as simple default implementations.
  11. type Partitioner interface {
  12. // Partition takes a message and partition count and chooses a partition
  13. Partition(message *ProducerMessage, numPartitions int32) (int32, error)
  14. // RequiresConsistency indicates to the user of the partitioner whether the
  15. // mapping of key->partition is consistent or not. Specifically, if a
  16. // partitioner requires consistency then it must be allowed to choose from all
  17. // partitions (even ones known to be unavailable), and its choice must be
  18. // respected by the caller. The obvious example is the HashPartitioner.
  19. RequiresConsistency() bool
  20. }
  21. // DynamicConsistencyPartitioner can optionally be implemented by Partitioners
  22. // in order to allow more flexibility than is originally allowed by the
  23. // RequiresConsistency method in the Partitioner interface. This allows
  24. // partitioners to require consistency sometimes, but not all times. It's useful
  25. // for, e.g., the HashPartitioner, which does not require consistency if the
  26. // message key is nil.
  27. type DynamicConsistencyPartitioner interface {
  28. Partitioner
  29. // MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
  30. // but takes in the message being partitioned so that the partitioner can
  31. // make a per-message determination.
  32. MessageRequiresConsistency(message *ProducerMessage) bool
  33. }
  34. // PartitionerConstructor is the type for a function capable of constructing new Partitioners.
  35. type PartitionerConstructor func(topic string) Partitioner
  36. type manualPartitioner struct{}
  37. // HashPartitionerOption lets you modify default values of the partitioner
  38. type HashPartitionerOption func(*hashPartitioner)
  39. // WithAbsFirst means that the partitioner handles absolute values
  40. // in the same way as the reference Java implementation
  41. func WithAbsFirst() HashPartitionerOption {
  42. return func(hp *hashPartitioner) {
  43. hp.referenceAbs = true
  44. }
  45. }
  46. // WithCustomHashFunction lets you specify what hash function to use for the partitioning
  47. func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption {
  48. return func(hp *hashPartitioner) {
  49. hp.hasher = hasher()
  50. }
  51. }
  52. // WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty
  53. func WithCustomFallbackPartitioner(randomHP Partitioner) HashPartitionerOption {
  54. return func(hp *hashPartitioner) {
  55. hp.random = randomHP
  56. }
  57. }
  58. // NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
  59. // ProducerMessage's Partition field as the partition to produce to.
  60. func NewManualPartitioner(topic string) Partitioner {
  61. return new(manualPartitioner)
  62. }
  63. func (p *manualPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
  64. return message.Partition, nil
  65. }
  66. func (p *manualPartitioner) RequiresConsistency() bool {
  67. return true
  68. }
  69. type randomPartitioner struct {
  70. generator *rand.Rand
  71. }
  72. // NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
  73. func NewRandomPartitioner(topic string) Partitioner {
  74. p := new(randomPartitioner)
  75. p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
  76. return p
  77. }
  78. func (p *randomPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
  79. return int32(p.generator.Intn(int(numPartitions))), nil
  80. }
  81. func (p *randomPartitioner) RequiresConsistency() bool {
  82. return false
  83. }
  84. type roundRobinPartitioner struct {
  85. partition int32
  86. }
  87. // NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
  88. func NewRoundRobinPartitioner(topic string) Partitioner {
  89. return &roundRobinPartitioner{}
  90. }
  91. func (p *roundRobinPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
  92. if p.partition >= numPartitions {
  93. p.partition = 0
  94. }
  95. ret := p.partition
  96. p.partition++
  97. return ret, nil
  98. }
  99. func (p *roundRobinPartitioner) RequiresConsistency() bool {
  100. return false
  101. }
  102. type hashPartitioner struct {
  103. random Partitioner
  104. hasher hash.Hash32
  105. referenceAbs bool
  106. }
  107. // NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
  108. // The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that
  109. // each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
  110. func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor {
  111. return func(topic string) Partitioner {
  112. p := new(hashPartitioner)
  113. p.random = NewRandomPartitioner(topic)
  114. p.hasher = hasher()
  115. p.referenceAbs = false
  116. return p
  117. }
  118. }
  119. // NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options
  120. func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor {
  121. return func(topic string) Partitioner {
  122. p := new(hashPartitioner)
  123. p.random = NewRandomPartitioner(topic)
  124. p.hasher = fnv.New32a()
  125. p.referenceAbs = false
  126. for _, option := range options {
  127. option(p)
  128. }
  129. return p
  130. }
  131. }
  132. // NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
  133. // random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
  134. // modulus the number of partitions. This ensures that messages with the same key always end up on the
  135. // same partition.
  136. func NewHashPartitioner(topic string) Partitioner {
  137. p := new(hashPartitioner)
  138. p.random = NewRandomPartitioner(topic)
  139. p.hasher = fnv.New32a()
  140. p.referenceAbs = false
  141. return p
  142. }
  143. // NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values
  144. // in the same way as the reference Java implementation. NewHashPartitioner was supposed to do
  145. // that but it had a mistake and now there are people depending on both behaviors. This will
  146. // all go away on the next major version bump.
  147. func NewReferenceHashPartitioner(topic string) Partitioner {
  148. p := new(hashPartitioner)
  149. p.random = NewRandomPartitioner(topic)
  150. p.hasher = fnv.New32a()
  151. p.referenceAbs = true
  152. return p
  153. }
  154. func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
  155. if message.Key == nil {
  156. return p.random.Partition(message, numPartitions)
  157. }
  158. bytes, err := message.Key.Encode()
  159. if err != nil {
  160. return -1, err
  161. }
  162. p.hasher.Reset()
  163. _, err = p.hasher.Write(bytes)
  164. if err != nil {
  165. return -1, err
  166. }
  167. var partition int32
  168. // Turns out we were doing our absolute value in a subtly different way from the upstream
  169. // implementation, but now we need to maintain backwards compat for people who started using
  170. // the old version; if referenceAbs is set we are compatible with the reference java client
  171. // but not past Sarama versions
  172. if p.referenceAbs {
  173. partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
  174. } else {
  175. partition = int32(p.hasher.Sum32()) % numPartitions
  176. if partition < 0 {
  177. partition = -partition
  178. }
  179. }
  180. return partition, nil
  181. }
  182. func (p *hashPartitioner) RequiresConsistency() bool {
  183. return true
  184. }
  185. func (p *hashPartitioner) MessageRequiresConsistency(message *ProducerMessage) bool {
  186. return message.Key != nil
  187. }