balance_strategy.go 44 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136
  1. package sarama
  2. import (
  3. "container/heap"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "sort"
  8. "strings"
  9. )
  10. const (
  11. // RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
  12. RangeBalanceStrategyName = "range"
  13. // RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
  14. RoundRobinBalanceStrategyName = "roundrobin"
  15. // StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
  16. StickyBalanceStrategyName = "sticky"
  17. defaultGeneration = -1
  18. )
  19. // BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
  20. // It contains an allocation of topic/partitions by memberID in the form of
  21. // a `memberID -> topic -> partitions` map.
  22. type BalanceStrategyPlan map[string]map[string][]int32
  23. // Add assigns a topic with a number partitions to a member.
  24. func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
  25. if len(partitions) == 0 {
  26. return
  27. }
  28. if _, ok := p[memberID]; !ok {
  29. p[memberID] = make(map[string][]int32, 1)
  30. }
  31. p[memberID][topic] = append(p[memberID][topic], partitions...)
  32. }
  33. // --------------------------------------------------------------------
  34. // BalanceStrategy is used to balance topics and partitions
  35. // across members of a consumer group
  36. type BalanceStrategy interface {
  37. // Name uniquely identifies the strategy.
  38. Name() string
  39. // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
  40. // and returns a distribution plan.
  41. Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
  42. // AssignmentData returns the serialized assignment data for the specified
  43. // memberID
  44. AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
  45. }
  46. // --------------------------------------------------------------------
  47. // BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
  48. // Example with one topic T with six partitions (0..5) and two members (M1, M2):
  49. // M1: {T: [0, 1, 2]}
  50. // M2: {T: [3, 4, 5]}
  51. var BalanceStrategyRange = &balanceStrategy{
  52. name: RangeBalanceStrategyName,
  53. coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
  54. step := float64(len(partitions)) / float64(len(memberIDs))
  55. for i, memberID := range memberIDs {
  56. pos := float64(i)
  57. min := int(math.Floor(pos*step + 0.5))
  58. max := int(math.Floor((pos+1)*step + 0.5))
  59. plan.Add(memberID, topic, partitions[min:max]...)
  60. }
  61. },
  62. }
  63. // BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
  64. // while maintain a balanced partition distribution.
  65. // Example with topic T with six partitions (0..5) and two members (M1, M2):
  66. // M1: {T: [0, 2, 4]}
  67. // M2: {T: [1, 3, 5]}
  68. //
  69. // On reassignment with an additional consumer, you might get an assignment plan like:
  70. // M1: {T: [0, 2]}
  71. // M2: {T: [1, 3]}
  72. // M3: {T: [4, 5]}
  73. //
  74. var BalanceStrategySticky = &stickyBalanceStrategy{}
  75. // --------------------------------------------------------------------
  76. type balanceStrategy struct {
  77. name string
  78. coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
  79. }
  80. // Name implements BalanceStrategy.
  81. func (s *balanceStrategy) Name() string { return s.name }
  82. // Plan implements BalanceStrategy.
  83. func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
  84. // Build members by topic map
  85. mbt := make(map[string][]string)
  86. for memberID, meta := range members {
  87. for _, topic := range meta.Topics {
  88. mbt[topic] = append(mbt[topic], memberID)
  89. }
  90. }
  91. // Sort members for each topic
  92. for topic, memberIDs := range mbt {
  93. sort.Sort(&balanceStrategySortable{
  94. topic: topic,
  95. memberIDs: memberIDs,
  96. })
  97. }
  98. // Assemble plan
  99. plan := make(BalanceStrategyPlan, len(members))
  100. for topic, memberIDs := range mbt {
  101. s.coreFn(plan, memberIDs, topic, topics[topic])
  102. }
  103. return plan, nil
  104. }
  105. // AssignmentData simple strategies do not require any shared assignment data
  106. func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
  107. return nil, nil
  108. }
  109. type balanceStrategySortable struct {
  110. topic string
  111. memberIDs []string
  112. }
  113. func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
  114. func (p balanceStrategySortable) Swap(i, j int) {
  115. p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
  116. }
  117. func (p balanceStrategySortable) Less(i, j int) bool {
  118. return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
  119. }
  120. func balanceStrategyHashValue(vv ...string) uint32 {
  121. h := uint32(2166136261)
  122. for _, s := range vv {
  123. for _, c := range s {
  124. h ^= uint32(c)
  125. h *= 16777619
  126. }
  127. }
  128. return h
  129. }
  130. type stickyBalanceStrategy struct {
  131. movements partitionMovements
  132. }
  133. // Name implements BalanceStrategy.
  134. func (s *stickyBalanceStrategy) Name() string { return StickyBalanceStrategyName }
  135. // Plan implements BalanceStrategy.
  136. func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
  137. // track partition movements during generation of the partition assignment plan
  138. s.movements = partitionMovements{
  139. Movements: make(map[topicPartitionAssignment]consumerPair),
  140. PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
  141. }
  142. // prepopulate the current assignment state from userdata on the consumer group members
  143. currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
  144. if err != nil {
  145. return nil, err
  146. }
  147. // determine if we're dealing with a completely fresh assignment, or if there's existing assignment state
  148. isFreshAssignment := false
  149. if len(currentAssignment) == 0 {
  150. isFreshAssignment = true
  151. }
  152. // create a mapping of all current topic partitions and the consumers that can be assigned to them
  153. partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
  154. for topic, partitions := range topics {
  155. for _, partition := range partitions {
  156. partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}
  157. }
  158. }
  159. // create a mapping of all consumers to all potential topic partitions that can be assigned to them
  160. // also, populate the mapping of partitions to potential consumers
  161. consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
  162. for memberID, meta := range members {
  163. consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
  164. for _, topicSubscription := range meta.Topics {
  165. // only evaluate topic subscriptions that are present in the supplied topics map
  166. if _, found := topics[topicSubscription]; found {
  167. for _, partition := range topics[topicSubscription] {
  168. topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
  169. consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
  170. partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
  171. }
  172. }
  173. }
  174. // add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
  175. if _, exists := currentAssignment[memberID]; !exists {
  176. currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
  177. }
  178. }
  179. // create a mapping of each partition to its current consumer, where possible
  180. currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
  181. unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
  182. for partition := range partition2AllPotentialConsumers {
  183. unvisitedPartitions[partition] = true
  184. }
  185. var unassignedPartitions []topicPartitionAssignment
  186. for memberID, partitions := range currentAssignment {
  187. var keepPartitions []topicPartitionAssignment
  188. for _, partition := range partitions {
  189. // If this partition no longer exists at all, likely due to the
  190. // topic being deleted, we remove the partition from the member.
  191. if _, exists := partition2AllPotentialConsumers[partition]; !exists {
  192. continue
  193. }
  194. delete(unvisitedPartitions, partition)
  195. currentPartitionConsumers[partition] = memberID
  196. if !strsContains(members[memberID].Topics, partition.Topic) {
  197. unassignedPartitions = append(unassignedPartitions, partition)
  198. continue
  199. }
  200. keepPartitions = append(keepPartitions, partition)
  201. }
  202. currentAssignment[memberID] = keepPartitions
  203. }
  204. for unvisited := range unvisitedPartitions {
  205. unassignedPartitions = append(unassignedPartitions, unvisited)
  206. }
  207. // sort the topic partitions in order of priority for reassignment
  208. sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
  209. // at this point we have preserved all valid topic partition to consumer assignments and removed
  210. // all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
  211. // to consumers so that the topic partition assignments are as balanced as possible.
  212. // an ascending sorted set of consumers based on how many topic partitions are already assigned to them
  213. sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
  214. s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)
  215. // Assemble plan
  216. plan := make(BalanceStrategyPlan, len(currentAssignment))
  217. for memberID, assignments := range currentAssignment {
  218. if len(assignments) == 0 {
  219. plan[memberID] = make(map[string][]int32)
  220. } else {
  221. for _, assignment := range assignments {
  222. plan.Add(memberID, assignment.Topic, assignment.Partition)
  223. }
  224. }
  225. }
  226. return plan, nil
  227. }
  228. // AssignmentData serializes the set of topics currently assigned to the
  229. // specified member as part of the supplied balance plan
  230. func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
  231. return encode(&StickyAssignorUserDataV1{
  232. Topics: topics,
  233. Generation: generationID,
  234. }, nil)
  235. }
  236. func strsContains(s []string, value string) bool {
  237. for _, entry := range s {
  238. if entry == value {
  239. return true
  240. }
  241. }
  242. return false
  243. }
  244. // Balance assignments across consumers for maximum fairness and stickiness.
  245. func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
  246. initializing := false
  247. if len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0 {
  248. initializing = true
  249. }
  250. // assign all unassigned partitions
  251. for _, partition := range unassignedPartitions {
  252. // skip if there is no potential consumer for the partition
  253. if len(partition2AllPotentialConsumers[partition]) == 0 {
  254. continue
  255. }
  256. sortedCurrentSubscriptions = assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer)
  257. }
  258. // narrow down the reassignment scope to only those partitions that can actually be reassigned
  259. for partition := range partition2AllPotentialConsumers {
  260. if !canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
  261. sortedPartitions = removeTopicPartitionFromMemberAssignments(sortedPartitions, partition)
  262. }
  263. }
  264. // narrow down the reassignment scope to only those consumers that are subject to reassignment
  265. fixedAssignments := make(map[string][]topicPartitionAssignment)
  266. for memberID := range consumer2AllPotentialPartitions {
  267. if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) {
  268. fixedAssignments[memberID] = currentAssignment[memberID]
  269. delete(currentAssignment, memberID)
  270. sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
  271. }
  272. }
  273. // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
  274. preBalanceAssignment := deepCopyAssignment(currentAssignment)
  275. preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))
  276. for k, v := range currentPartitionConsumer {
  277. preBalancePartitionConsumers[k] = v
  278. }
  279. reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
  280. // if we are not preserving existing assignments and we have made changes to the current assignment
  281. // make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
  282. if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
  283. currentAssignment = deepCopyAssignment(preBalanceAssignment)
  284. currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))
  285. for k, v := range preBalancePartitionConsumers {
  286. currentPartitionConsumer[k] = v
  287. }
  288. }
  289. // add the fixed assignments (those that could not change) back
  290. for consumer, assignments := range fixedAssignments {
  291. currentAssignment[consumer] = assignments
  292. }
  293. }
  294. // BalanceStrategyRoundRobin assigns partitions to members in alternating order.
  295. // For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
  296. // M0: [t0p0, t0p2, t1p1]
  297. // M1: [t0p1, t1p0, t1p2]
  298. var BalanceStrategyRoundRobin = new(roundRobinBalancer)
  299. type roundRobinBalancer struct{}
  300. func (b *roundRobinBalancer) Name() string {
  301. return RoundRobinBalanceStrategyName
  302. }
  303. func (b *roundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
  304. if len(memberAndMetadata) == 0 || len(topics) == 0 {
  305. return nil, errors.New("members and topics are not provided")
  306. }
  307. // sort partitions
  308. var topicPartitions []topicAndPartition
  309. for topic, partitions := range topics {
  310. for _, partition := range partitions {
  311. topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition})
  312. }
  313. }
  314. sort.SliceStable(topicPartitions, func(i, j int) bool {
  315. pi := topicPartitions[i]
  316. pj := topicPartitions[j]
  317. return pi.comparedValue() < pj.comparedValue()
  318. })
  319. // sort members
  320. var members []memberAndTopic
  321. for memberID, meta := range memberAndMetadata {
  322. m := memberAndTopic{
  323. memberID: memberID,
  324. topics: make(map[string]struct{}),
  325. }
  326. for _, t := range meta.Topics {
  327. m.topics[t] = struct{}{}
  328. }
  329. members = append(members, m)
  330. }
  331. sort.SliceStable(members, func(i, j int) bool {
  332. mi := members[i]
  333. mj := members[j]
  334. return mi.memberID < mj.memberID
  335. })
  336. // assign partitions
  337. plan := make(BalanceStrategyPlan, len(members))
  338. i := 0
  339. n := len(members)
  340. for _, tp := range topicPartitions {
  341. m := members[i%n]
  342. for !m.hasTopic(tp.topic) {
  343. i++
  344. m = members[i%n]
  345. }
  346. plan.Add(m.memberID, tp.topic, tp.partition)
  347. i++
  348. }
  349. return plan, nil
  350. }
  351. func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
  352. return nil, nil // do nothing for now
  353. }
  354. type topicAndPartition struct {
  355. topic string
  356. partition int32
  357. }
  358. func (tp *topicAndPartition) comparedValue() string {
  359. return fmt.Sprintf("%s-%d", tp.topic, tp.partition)
  360. }
  361. type memberAndTopic struct {
  362. memberID string
  363. topics map[string]struct{}
  364. }
  365. func (m *memberAndTopic) hasTopic(topic string) bool {
  366. _, isExist := m.topics[topic]
  367. return isExist
  368. }
  369. // Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
  370. // A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
  371. // Lower balance score indicates a more balanced assignment.
  372. func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {
  373. consumer2AssignmentSize := make(map[string]int, len(assignment))
  374. for memberID, partitions := range assignment {
  375. consumer2AssignmentSize[memberID] = len(partitions)
  376. }
  377. var score float64
  378. for memberID, consumerAssignmentSize := range consumer2AssignmentSize {
  379. delete(consumer2AssignmentSize, memberID)
  380. for _, otherConsumerAssignmentSize := range consumer2AssignmentSize {
  381. score += math.Abs(float64(consumerAssignmentSize - otherConsumerAssignmentSize))
  382. }
  383. }
  384. return int(score)
  385. }
  386. // Determine whether the current assignment plan is balanced.
  387. func isBalanced(currentAssignment map[string][]topicPartitionAssignment, allSubscriptions map[string][]topicPartitionAssignment) bool {
  388. sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
  389. min := len(currentAssignment[sortedCurrentSubscriptions[0]])
  390. max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
  391. if min >= max-1 {
  392. // if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true
  393. return true
  394. }
  395. // create a mapping from partitions to the consumer assigned to them
  396. allPartitions := make(map[topicPartitionAssignment]string)
  397. for memberID, partitions := range currentAssignment {
  398. for _, partition := range partitions {
  399. if _, exists := allPartitions[partition]; exists {
  400. Logger.Printf("Topic %s Partition %d is assigned more than one consumer", partition.Topic, partition.Partition)
  401. }
  402. allPartitions[partition] = memberID
  403. }
  404. }
  405. // for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it
  406. // could but did not get cannot be moved to it (because that would break the balance)
  407. for _, memberID := range sortedCurrentSubscriptions {
  408. consumerPartitions := currentAssignment[memberID]
  409. consumerPartitionCount := len(consumerPartitions)
  410. // skip if this consumer already has all the topic partitions it can get
  411. if consumerPartitionCount == len(allSubscriptions[memberID]) {
  412. continue
  413. }
  414. // otherwise make sure it cannot get any more
  415. potentialTopicPartitions := allSubscriptions[memberID]
  416. for _, partition := range potentialTopicPartitions {
  417. if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) {
  418. otherConsumer := allPartitions[partition]
  419. otherConsumerPartitionCount := len(currentAssignment[otherConsumer])
  420. if consumerPartitionCount < otherConsumerPartitionCount {
  421. return false
  422. }
  423. }
  424. }
  425. }
  426. return true
  427. }
  428. // Reassign all topic partitions that need reassignment until balanced.
  429. func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) bool {
  430. reassignmentPerformed := false
  431. modified := false
  432. // repeat reassignment until no partition can be moved to improve the balance
  433. for {
  434. modified = false
  435. // reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
  436. // until the full list is processed or a balance is achieved
  437. for _, partition := range reassignablePartitions {
  438. if isBalanced(currentAssignment, consumer2AllPotentialPartitions) {
  439. break
  440. }
  441. // the partition must have at least two consumers
  442. if len(partition2AllPotentialConsumers[partition]) <= 1 {
  443. Logger.Printf("Expected more than one potential consumer for partition %s topic %d", partition.Topic, partition.Partition)
  444. }
  445. // the partition must have a consumer
  446. consumer := currentPartitionConsumer[partition]
  447. if consumer == "" {
  448. Logger.Printf("Expected topic %s partition %d to be assigned to a consumer", partition.Topic, partition.Partition)
  449. }
  450. if _, exists := prevAssignment[partition]; exists {
  451. if len(currentAssignment[consumer]) > (len(currentAssignment[prevAssignment[partition].MemberID]) + 1) {
  452. sortedCurrentSubscriptions = s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment[partition].MemberID)
  453. reassignmentPerformed = true
  454. modified = true
  455. continue
  456. }
  457. }
  458. // check if a better-suited consumer exists for the partition; if so, reassign it
  459. for _, otherConsumer := range partition2AllPotentialConsumers[partition] {
  460. if len(currentAssignment[consumer]) > (len(currentAssignment[otherConsumer]) + 1) {
  461. sortedCurrentSubscriptions = s.reassignPartitionToNewConsumer(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions)
  462. reassignmentPerformed = true
  463. modified = true
  464. break
  465. }
  466. }
  467. }
  468. if !modified {
  469. return reassignmentPerformed
  470. }
  471. }
  472. }
  473. // Identify a new consumer for a topic partition and reassign it.
  474. func (s *stickyBalanceStrategy) reassignPartitionToNewConsumer(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []string {
  475. for _, anotherConsumer := range sortedCurrentSubscriptions {
  476. if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[anotherConsumer], partition) {
  477. return s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, anotherConsumer)
  478. }
  479. }
  480. return sortedCurrentSubscriptions
  481. }
  482. // Reassign a specific partition to a new consumer
  483. func (s *stickyBalanceStrategy) reassignPartition(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, newConsumer string) []string {
  484. consumer := currentPartitionConsumer[partition]
  485. // find the correct partition movement considering the stickiness requirement
  486. partitionToBeMoved := s.movements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer)
  487. return s.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer)
  488. }
  489. // Track the movement of a topic partition after assignment
  490. func (s *stickyBalanceStrategy) processPartitionMovement(partition topicPartitionAssignment, newConsumer string, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
  491. oldConsumer := currentPartitionConsumer[partition]
  492. s.movements.movePartition(partition, oldConsumer, newConsumer)
  493. currentAssignment[oldConsumer] = removeTopicPartitionFromMemberAssignments(currentAssignment[oldConsumer], partition)
  494. currentAssignment[newConsumer] = append(currentAssignment[newConsumer], partition)
  495. currentPartitionConsumer[partition] = newConsumer
  496. return sortMemberIDsByPartitionAssignments(currentAssignment)
  497. }
  498. // Determine whether a specific consumer should be considered for topic partition assignment.
  499. func canConsumerParticipateInReassignment(memberID string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
  500. currentPartitions := currentAssignment[memberID]
  501. currentAssignmentSize := len(currentPartitions)
  502. maxAssignmentSize := len(consumer2AllPotentialPartitions[memberID])
  503. if currentAssignmentSize > maxAssignmentSize {
  504. Logger.Printf("The consumer %s is assigned more partitions than the maximum possible", memberID)
  505. }
  506. if currentAssignmentSize < maxAssignmentSize {
  507. // if a consumer is not assigned all its potential partitions it is subject to reassignment
  508. return true
  509. }
  510. for _, partition := range currentPartitions {
  511. if canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
  512. return true
  513. }
  514. }
  515. return false
  516. }
  517. // Only consider reassigning those topic partitions that have two or more potential consumers.
  518. func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
  519. return len(partition2AllPotentialConsumers[partition]) >= 2
  520. }
  521. // The assignment should improve the overall balance of the partition assignments to consumers.
  522. func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
  523. for _, memberID := range sortedCurrentSubscriptions {
  524. if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {
  525. currentAssignment[memberID] = append(currentAssignment[memberID], partition)
  526. currentPartitionConsumer[partition] = memberID
  527. break
  528. }
  529. }
  530. return sortMemberIDsByPartitionAssignments(currentAssignment)
  531. }
  532. // Deserialize topic partition assignment data to aid with creation of a sticky assignment.
  533. func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
  534. userDataV1 := &StickyAssignorUserDataV1{}
  535. if err := decode(userDataBytes, userDataV1); err != nil {
  536. userDataV0 := &StickyAssignorUserDataV0{}
  537. if err := decode(userDataBytes, userDataV0); err != nil {
  538. return nil, err
  539. }
  540. return userDataV0, nil
  541. }
  542. return userDataV1, nil
  543. }
  544. // filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
  545. // to those topic partitions currently reported by the Kafka cluster.
  546. func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
  547. assignments := deepCopyAssignment(currentAssignment)
  548. for memberID, partitions := range assignments {
  549. // perform in-place filtering
  550. i := 0
  551. for _, partition := range partitions {
  552. if _, exists := partition2AllPotentialConsumers[partition]; exists {
  553. partitions[i] = partition
  554. i++
  555. }
  556. }
  557. assignments[memberID] = partitions[:i]
  558. }
  559. return assignments
  560. }
  561. func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {
  562. for i, assignment := range assignments {
  563. if assignment == topic {
  564. return append(assignments[:i], assignments[i+1:]...)
  565. }
  566. }
  567. return assignments
  568. }
  569. func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {
  570. for _, assignment := range assignments {
  571. if assignment == topic {
  572. return true
  573. }
  574. }
  575. return false
  576. }
  577. func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
  578. unassignedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
  579. for partition := range partition2AllPotentialConsumers {
  580. unassignedPartitions[partition] = true
  581. }
  582. sortedPartitions := make([]topicPartitionAssignment, 0)
  583. if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {
  584. // if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
  585. // then we just need to simply list partitions in a round robin fashion (from consumers with
  586. // most assigned partitions to those with least)
  587. assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)
  588. // use priority-queue to evaluate consumer group members in descending-order based on
  589. // the number of topic partition assignments (i.e. consumers with most assignments first)
  590. pq := make(assignmentPriorityQueue, len(assignments))
  591. i := 0
  592. for consumerID, consumerAssignments := range assignments {
  593. pq[i] = &consumerGroupMember{
  594. id: consumerID,
  595. assignments: consumerAssignments,
  596. }
  597. i++
  598. }
  599. heap.Init(&pq)
  600. for {
  601. // loop until no consumer-group members remain
  602. if pq.Len() == 0 {
  603. break
  604. }
  605. member := pq[0]
  606. // partitions that were assigned to a different consumer last time
  607. var prevPartitionIndex int
  608. for i, partition := range member.assignments {
  609. if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {
  610. prevPartitionIndex = i
  611. break
  612. }
  613. }
  614. if len(member.assignments) > 0 {
  615. partition := member.assignments[prevPartitionIndex]
  616. sortedPartitions = append(sortedPartitions, partition)
  617. delete(unassignedPartitions, partition)
  618. if prevPartitionIndex == 0 {
  619. member.assignments = member.assignments[1:]
  620. } else {
  621. member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
  622. }
  623. heap.Fix(&pq, 0)
  624. } else {
  625. heap.Pop(&pq)
  626. }
  627. }
  628. for partition := range unassignedPartitions {
  629. sortedPartitions = append(sortedPartitions, partition)
  630. }
  631. } else {
  632. // an ascending sorted set of topic partitions based on how many consumers can potentially use them
  633. sortedPartitions = sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers)
  634. }
  635. return sortedPartitions
  636. }
  637. func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartitionAssignment) []string {
  638. // sort the members by the number of partition assignments in ascending order
  639. sortedMemberIDs := make([]string, 0, len(assignments))
  640. for memberID := range assignments {
  641. sortedMemberIDs = append(sortedMemberIDs, memberID)
  642. }
  643. sort.SliceStable(sortedMemberIDs, func(i, j int) bool {
  644. ret := len(assignments[sortedMemberIDs[i]]) - len(assignments[sortedMemberIDs[j]])
  645. if ret == 0 {
  646. return sortedMemberIDs[i] < sortedMemberIDs[j]
  647. }
  648. return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]])
  649. })
  650. return sortedMemberIDs
  651. }
  652. func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {
  653. // sort the members by the number of partition assignments in descending order
  654. sortedPartionIDs := make([]topicPartitionAssignment, len(partition2AllPotentialConsumers))
  655. i := 0
  656. for partition := range partition2AllPotentialConsumers {
  657. sortedPartionIDs[i] = partition
  658. i++
  659. }
  660. sort.Slice(sortedPartionIDs, func(i, j int) bool {
  661. if len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) == len(partition2AllPotentialConsumers[sortedPartionIDs[j]]) {
  662. ret := strings.Compare(sortedPartionIDs[i].Topic, sortedPartionIDs[j].Topic)
  663. if ret == 0 {
  664. return sortedPartionIDs[i].Partition < sortedPartionIDs[j].Partition
  665. }
  666. return ret < 0
  667. }
  668. return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) < len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
  669. })
  670. return sortedPartionIDs
  671. }
  672. func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
  673. m := make(map[string][]topicPartitionAssignment, len(assignment))
  674. for memberID, subscriptions := range assignment {
  675. m[memberID] = append(subscriptions[:0:0], subscriptions...)
  676. }
  677. return m
  678. }
  679. func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
  680. curMembers := make(map[string]int)
  681. for _, cur := range partition2AllPotentialConsumers {
  682. if len(curMembers) == 0 {
  683. for _, curMembersElem := range cur {
  684. curMembers[curMembersElem]++
  685. }
  686. continue
  687. }
  688. if len(curMembers) != len(cur) {
  689. return false
  690. }
  691. yMap := make(map[string]int)
  692. for _, yElem := range cur {
  693. yMap[yElem]++
  694. }
  695. for curMembersMapKey, curMembersMapVal := range curMembers {
  696. if yMap[curMembersMapKey] != curMembersMapVal {
  697. return false
  698. }
  699. }
  700. }
  701. curPartitions := make(map[topicPartitionAssignment]int)
  702. for _, cur := range consumer2AllPotentialPartitions {
  703. if len(curPartitions) == 0 {
  704. for _, curPartitionElem := range cur {
  705. curPartitions[curPartitionElem]++
  706. }
  707. continue
  708. }
  709. if len(curPartitions) != len(cur) {
  710. return false
  711. }
  712. yMap := make(map[topicPartitionAssignment]int)
  713. for _, yElem := range cur {
  714. yMap[yElem]++
  715. }
  716. for curMembersMapKey, curMembersMapVal := range curPartitions {
  717. if yMap[curMembersMapKey] != curMembersMapVal {
  718. return false
  719. }
  720. }
  721. }
  722. return true
  723. }
  724. // We need to process subscriptions' user data with each consumer's reported generation in mind
  725. // higher generations overwrite lower generations in case of a conflict
  726. // note that a conflict could exist only if user data is for different generations
  727. func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
  728. currentAssignment := make(map[string][]topicPartitionAssignment)
  729. prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)
  730. // for each partition we create a sorted map of its consumers by generation
  731. sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
  732. for memberID, meta := range members {
  733. consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
  734. if err != nil {
  735. return nil, nil, err
  736. }
  737. for _, partition := range consumerUserData.partitions() {
  738. if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
  739. if consumerUserData.hasGeneration() {
  740. if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
  741. // same partition is assigned to two consumers during the same rebalance.
  742. // log a warning and skip this record
  743. Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
  744. continue
  745. } else {
  746. consumers[consumerUserData.generation()] = memberID
  747. }
  748. } else {
  749. consumers[defaultGeneration] = memberID
  750. }
  751. } else {
  752. generation := defaultGeneration
  753. if consumerUserData.hasGeneration() {
  754. generation = consumerUserData.generation()
  755. }
  756. sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
  757. }
  758. }
  759. }
  760. // prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition
  761. // current and previous consumers are the last two consumers of each partition in the above sorted map
  762. for partition, consumers := range sortedPartitionConsumersByGeneration {
  763. // sort consumers by generation in decreasing order
  764. var generations []int
  765. for generation := range consumers {
  766. generations = append(generations, generation)
  767. }
  768. sort.Sort(sort.Reverse(sort.IntSlice(generations)))
  769. consumer := consumers[generations[0]]
  770. if _, exists := currentAssignment[consumer]; !exists {
  771. currentAssignment[consumer] = []topicPartitionAssignment{partition}
  772. } else {
  773. currentAssignment[consumer] = append(currentAssignment[consumer], partition)
  774. }
  775. // check for previous assignment, if any
  776. if len(generations) > 1 {
  777. prevAssignment[partition] = consumerGenerationPair{
  778. MemberID: consumers[generations[1]],
  779. Generation: generations[1],
  780. }
  781. }
  782. }
  783. return currentAssignment, prevAssignment, nil
  784. }
  785. type consumerGenerationPair struct {
  786. MemberID string
  787. Generation int
  788. }
  789. // consumerPair represents a pair of Kafka consumer ids involved in a partition reassignment.
  790. type consumerPair struct {
  791. SrcMemberID string
  792. DstMemberID string
  793. }
  794. // partitionMovements maintains some data structures to simplify lookup of partition movements among consumers.
  795. type partitionMovements struct {
  796. PartitionMovementsByTopic map[string]map[consumerPair]map[topicPartitionAssignment]bool
  797. Movements map[topicPartitionAssignment]consumerPair
  798. }
  799. func (p *partitionMovements) removeMovementRecordOfPartition(partition topicPartitionAssignment) consumerPair {
  800. pair := p.Movements[partition]
  801. delete(p.Movements, partition)
  802. partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
  803. delete(partitionMovementsForThisTopic[pair], partition)
  804. if len(partitionMovementsForThisTopic[pair]) == 0 {
  805. delete(partitionMovementsForThisTopic, pair)
  806. }
  807. if len(p.PartitionMovementsByTopic[partition.Topic]) == 0 {
  808. delete(p.PartitionMovementsByTopic, partition.Topic)
  809. }
  810. return pair
  811. }
  812. func (p *partitionMovements) addPartitionMovementRecord(partition topicPartitionAssignment, pair consumerPair) {
  813. p.Movements[partition] = pair
  814. if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
  815. p.PartitionMovementsByTopic[partition.Topic] = make(map[consumerPair]map[topicPartitionAssignment]bool)
  816. }
  817. partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
  818. if _, exists := partitionMovementsForThisTopic[pair]; !exists {
  819. partitionMovementsForThisTopic[pair] = make(map[topicPartitionAssignment]bool)
  820. }
  821. partitionMovementsForThisTopic[pair][partition] = true
  822. }
  823. func (p *partitionMovements) movePartition(partition topicPartitionAssignment, oldConsumer, newConsumer string) {
  824. pair := consumerPair{
  825. SrcMemberID: oldConsumer,
  826. DstMemberID: newConsumer,
  827. }
  828. if _, exists := p.Movements[partition]; exists {
  829. // this partition has previously moved
  830. existingPair := p.removeMovementRecordOfPartition(partition)
  831. if existingPair.DstMemberID != oldConsumer {
  832. Logger.Printf("Existing pair DstMemberID %s was not equal to the oldConsumer ID %s", existingPair.DstMemberID, oldConsumer)
  833. }
  834. if existingPair.SrcMemberID != newConsumer {
  835. // the partition is not moving back to its previous consumer
  836. p.addPartitionMovementRecord(partition, consumerPair{
  837. SrcMemberID: existingPair.SrcMemberID,
  838. DstMemberID: newConsumer,
  839. })
  840. }
  841. } else {
  842. p.addPartitionMovementRecord(partition, pair)
  843. }
  844. }
  845. func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment {
  846. if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
  847. return partition
  848. }
  849. if _, exists := p.Movements[partition]; exists {
  850. // this partition has previously moved
  851. if oldConsumer != p.Movements[partition].DstMemberID {
  852. Logger.Printf("Partition movement DstMemberID %s was not equal to the oldConsumer ID %s", p.Movements[partition].DstMemberID, oldConsumer)
  853. }
  854. oldConsumer = p.Movements[partition].SrcMemberID
  855. }
  856. partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
  857. reversePair := consumerPair{
  858. SrcMemberID: newConsumer,
  859. DstMemberID: oldConsumer,
  860. }
  861. if _, exists := partitionMovementsForThisTopic[reversePair]; !exists {
  862. return partition
  863. }
  864. var reversePairPartition topicPartitionAssignment
  865. for otherPartition := range partitionMovementsForThisTopic[reversePair] {
  866. reversePairPartition = otherPartition
  867. }
  868. return reversePairPartition
  869. }
  870. func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {
  871. if src == dst {
  872. return currentPath, false
  873. }
  874. if len(pairs) == 0 {
  875. return currentPath, false
  876. }
  877. for _, pair := range pairs {
  878. if src == pair.SrcMemberID && dst == pair.DstMemberID {
  879. currentPath = append(currentPath, src, dst)
  880. return currentPath, true
  881. }
  882. }
  883. for _, pair := range pairs {
  884. if pair.SrcMemberID == src {
  885. // create a deep copy of the pairs, excluding the current pair
  886. reducedSet := make([]consumerPair, len(pairs)-1)
  887. i := 0
  888. for _, p := range pairs {
  889. if p != pair {
  890. reducedSet[i] = pair
  891. i++
  892. }
  893. }
  894. currentPath = append(currentPath, pair.SrcMemberID)
  895. return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)
  896. }
  897. }
  898. return currentPath, false
  899. }
  900. func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {
  901. superCycle := make([]string, len(cycle)-1)
  902. for i := 0; i < len(cycle)-1; i++ {
  903. superCycle[i] = cycle[i]
  904. }
  905. superCycle = append(superCycle, cycle...)
  906. for _, foundCycle := range cycles {
  907. if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {
  908. return true
  909. }
  910. }
  911. return false
  912. }
  913. func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {
  914. cycles := make([][]string, 0)
  915. for _, pair := range pairs {
  916. // create a deep copy of the pairs, excluding the current pair
  917. reducedPairs := make([]consumerPair, len(pairs)-1)
  918. i := 0
  919. for _, p := range pairs {
  920. if p != pair {
  921. reducedPairs[i] = pair
  922. i++
  923. }
  924. }
  925. if path, linked := p.isLinked(pair.DstMemberID, pair.SrcMemberID, reducedPairs, []string{pair.SrcMemberID}); linked {
  926. if !p.in(path, cycles) {
  927. cycles = append(cycles, path)
  928. Logger.Printf("A cycle of length %d was found: %v", len(path)-1, path)
  929. }
  930. }
  931. }
  932. // for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
  933. // the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
  934. // tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.
  935. for _, cycle := range cycles {
  936. if len(cycle) == 3 {
  937. return true
  938. }
  939. }
  940. return false
  941. }
  942. func (p *partitionMovements) isSticky() bool {
  943. for topic, movements := range p.PartitionMovementsByTopic {
  944. movementPairs := make([]consumerPair, len(movements))
  945. i := 0
  946. for pair := range movements {
  947. movementPairs[i] = pair
  948. i++
  949. }
  950. if p.hasCycles(movementPairs) {
  951. Logger.Printf("Stickiness is violated for topic %s", topic)
  952. Logger.Printf("Partition movements for this topic occurred among the following consumer pairs: %v", movements)
  953. return false
  954. }
  955. }
  956. return true
  957. }
  958. func indexOfSubList(source []string, target []string) int {
  959. targetSize := len(target)
  960. maxCandidate := len(source) - targetSize
  961. nextCand:
  962. for candidate := 0; candidate <= maxCandidate; candidate++ {
  963. j := candidate
  964. for i := 0; i < targetSize; i++ {
  965. if target[i] != source[j] {
  966. // Element mismatch, try next cand
  967. continue nextCand
  968. }
  969. j++
  970. }
  971. // All elements of candidate matched target
  972. return candidate
  973. }
  974. return -1
  975. }
  976. type consumerGroupMember struct {
  977. id string
  978. assignments []topicPartitionAssignment
  979. }
  980. // assignmentPriorityQueue is a priority-queue of consumer group members that is sorted
  981. // in descending order (most assignments to least assignments).
  982. type assignmentPriorityQueue []*consumerGroupMember
  983. func (pq assignmentPriorityQueue) Len() int { return len(pq) }
  984. func (pq assignmentPriorityQueue) Less(i, j int) bool {
  985. // order asssignment priority queue in descending order using assignment-count/member-id
  986. if len(pq[i].assignments) == len(pq[j].assignments) {
  987. return strings.Compare(pq[i].id, pq[j].id) > 0
  988. }
  989. return len(pq[i].assignments) > len(pq[j].assignments)
  990. }
  991. func (pq assignmentPriorityQueue) Swap(i, j int) {
  992. pq[i], pq[j] = pq[j], pq[i]
  993. }
  994. func (pq *assignmentPriorityQueue) Push(x interface{}) {
  995. member := x.(*consumerGroupMember)
  996. *pq = append(*pq, member)
  997. }
  998. func (pq *assignmentPriorityQueue) Pop() interface{} {
  999. old := *pq
  1000. n := len(old)
  1001. member := old[n-1]
  1002. *pq = old[0 : n-1]
  1003. return member
  1004. }