admin.go 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108
  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "strconv"
  7. "sync"
  8. "time"
  9. )
  10. // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
  11. // brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
  12. // Methods with stricter requirements will specify the minimum broker version required.
  13. // You MUST call Close() on a client to avoid leaks
  14. type ClusterAdmin interface {
  15. // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
  16. // It may take several seconds after CreateTopic returns success for all the brokers
  17. // to become aware that the topic has been created. During this time, listTopics
  18. // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
  19. CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
  20. // List the topics available in the cluster with the default options.
  21. ListTopics() (map[string]TopicDetail, error)
  22. // Describe some topics in the cluster.
  23. DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
  24. // Delete a topic. It may take several seconds after the DeleteTopic to returns success
  25. // and for all the brokers to become aware that the topics are gone.
  26. // During this time, listTopics may continue to return information about the deleted topic.
  27. // If delete.topic.enable is false on the brokers, deleteTopic will mark
  28. // the topic for deletion, but not actually delete them.
  29. // This operation is supported by brokers with version 0.10.1.0 or higher.
  30. DeleteTopic(topic string) error
  31. // Increase the number of partitions of the topics according to the corresponding values.
  32. // If partitions are increased for a topic that has a key, the partition logic or ordering of
  33. // the messages will be affected. It may take several seconds after this method returns
  34. // success for all the brokers to become aware that the partitions have been created.
  35. // During this time, ClusterAdmin#describeTopics may not return information about the
  36. // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
  37. CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
  38. // Alter the replica assignment for partitions.
  39. // This operation is supported by brokers with version 2.4.0.0 or higher.
  40. AlterPartitionReassignments(topic string, assignment [][]int32) error
  41. // Provides info on ongoing partitions replica reassignments.
  42. // This operation is supported by brokers with version 2.4.0.0 or higher.
  43. ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
  44. // Delete records whose offset is smaller than the given offset of the corresponding partition.
  45. // This operation is supported by brokers with version 0.11.0.0 or higher.
  46. DeleteRecords(topic string, partitionOffsets map[int32]int64) error
  47. // Get the configuration for the specified resources.
  48. // The returned configuration includes default values and the Default is true
  49. // can be used to distinguish them from user supplied values.
  50. // Config entries where ReadOnly is true cannot be updated.
  51. // The value of config entries where Sensitive is true is always nil so
  52. // sensitive information is not disclosed.
  53. // This operation is supported by brokers with version 0.11.0.0 or higher.
  54. DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
  55. // Update the configuration for the specified resources with the default options.
  56. // This operation is supported by brokers with version 0.11.0.0 or higher.
  57. // The resources with their configs (topic is the only resource type with configs
  58. // that can be updated currently Updates are not transactional so they may succeed
  59. // for some resources while fail for others. The configs for a particular resource are updated automatically.
  60. AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
  61. // Creates access control lists (ACLs) which are bound to specific resources.
  62. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  63. // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
  64. // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
  65. CreateACL(resource Resource, acl Acl) error
  66. // Lists access control lists (ACLs) according to the supplied filter.
  67. // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
  68. // This operation is supported by brokers with version 0.11.0.0 or higher.
  69. ListAcls(filter AclFilter) ([]ResourceAcls, error)
  70. // Deletes access control lists (ACLs) according to the supplied filters.
  71. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  72. // This operation is supported by brokers with version 0.11.0.0 or higher.
  73. DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
  74. // List the consumer groups available in the cluster.
  75. ListConsumerGroups() (map[string]string, error)
  76. // Describe the given consumer groups.
  77. DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
  78. // List the consumer group offsets available in the cluster.
  79. ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
  80. // Deletes a consumer group offset
  81. DeleteConsumerGroupOffset(group string, topic string, partition int32) error
  82. // Delete a consumer group.
  83. DeleteConsumerGroup(group string) error
  84. // Get information about the nodes in the cluster
  85. DescribeCluster() (brokers []*Broker, controllerID int32, err error)
  86. // Get information about all log directories on the given set of brokers
  87. DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
  88. // Get information about SCRAM users
  89. DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)
  90. // Delete SCRAM users
  91. DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)
  92. // Upsert SCRAM users
  93. UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)
  94. // Get client quota configurations corresponding to the specified filter.
  95. // This operation is supported by brokers with version 2.6.0.0 or higher.
  96. DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error)
  97. // Alters client quota configurations with the specified alterations.
  98. // This operation is supported by brokers with version 2.6.0.0 or higher.
  99. AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error
  100. // Controller returns the cluster controller broker. It will return a
  101. // locally cached value if it's available.
  102. Controller() (*Broker, error)
  103. // Close shuts down the admin and closes underlying client.
  104. Close() error
  105. }
  106. type clusterAdmin struct {
  107. client Client
  108. conf *Config
  109. }
  110. // NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
  111. func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
  112. client, err := NewClient(addrs, conf)
  113. if err != nil {
  114. return nil, err
  115. }
  116. return NewClusterAdminFromClient(client)
  117. }
  118. // NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
  119. // Note that underlying client will also be closed on admin's Close() call.
  120. func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
  121. // make sure we can retrieve the controller
  122. _, err := client.Controller()
  123. if err != nil {
  124. return nil, err
  125. }
  126. ca := &clusterAdmin{
  127. client: client,
  128. conf: client.Config(),
  129. }
  130. return ca, nil
  131. }
  132. func (ca *clusterAdmin) Close() error {
  133. return ca.client.Close()
  134. }
  135. func (ca *clusterAdmin) Controller() (*Broker, error) {
  136. return ca.client.Controller()
  137. }
  138. func (ca *clusterAdmin) refreshController() (*Broker, error) {
  139. return ca.client.RefreshController()
  140. }
  141. // isErrNoController returns `true` if the given error type unwraps to an
  142. // `ErrNotController` response from Kafka
  143. func isErrNoController(err error) bool {
  144. switch e := err.(type) {
  145. case *TopicError:
  146. return e.Err == ErrNotController
  147. case *TopicPartitionError:
  148. return e.Err == ErrNotController
  149. case KError:
  150. return e == ErrNotController
  151. }
  152. return false
  153. }
  154. // retryOnError will repeatedly call the given (error-returning) func in the
  155. // case that its response is non-nil and retryable (as determined by the
  156. // provided retryable func) up to the maximum number of tries permitted by
  157. // the admin client configuration
  158. func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
  159. var err error
  160. for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
  161. err = fn()
  162. if err == nil || !retryable(err) {
  163. return err
  164. }
  165. Logger.Printf(
  166. "admin/request retrying after %dms... (%d attempts remaining)\n",
  167. ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
  168. time.Sleep(ca.conf.Admin.Retry.Backoff)
  169. continue
  170. }
  171. return err
  172. }
  173. func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
  174. if topic == "" {
  175. return ErrInvalidTopic
  176. }
  177. if detail == nil {
  178. return errors.New("you must specify topic details")
  179. }
  180. topicDetails := make(map[string]*TopicDetail)
  181. topicDetails[topic] = detail
  182. request := &CreateTopicsRequest{
  183. TopicDetails: topicDetails,
  184. ValidateOnly: validateOnly,
  185. Timeout: ca.conf.Admin.Timeout,
  186. }
  187. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  188. request.Version = 1
  189. }
  190. if ca.conf.Version.IsAtLeast(V1_0_0_0) {
  191. request.Version = 2
  192. }
  193. return ca.retryOnError(isErrNoController, func() error {
  194. b, err := ca.Controller()
  195. if err != nil {
  196. return err
  197. }
  198. rsp, err := b.CreateTopics(request)
  199. if err != nil {
  200. return err
  201. }
  202. topicErr, ok := rsp.TopicErrors[topic]
  203. if !ok {
  204. return ErrIncompleteResponse
  205. }
  206. if topicErr.Err != ErrNoError {
  207. if topicErr.Err == ErrNotController {
  208. _, _ = ca.refreshController()
  209. }
  210. return topicErr
  211. }
  212. return nil
  213. })
  214. }
  215. func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
  216. controller, err := ca.Controller()
  217. if err != nil {
  218. return nil, err
  219. }
  220. request := &MetadataRequest{
  221. Topics: topics,
  222. AllowAutoTopicCreation: false,
  223. }
  224. if ca.conf.Version.IsAtLeast(V1_0_0_0) {
  225. request.Version = 5
  226. } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  227. request.Version = 4
  228. }
  229. response, err := controller.GetMetadata(request)
  230. if err != nil {
  231. return nil, err
  232. }
  233. return response.Topics, nil
  234. }
  235. func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
  236. controller, err := ca.Controller()
  237. if err != nil {
  238. return nil, int32(0), err
  239. }
  240. request := &MetadataRequest{
  241. Topics: []string{},
  242. }
  243. if ca.conf.Version.IsAtLeast(V0_10_0_0) {
  244. request.Version = 1
  245. }
  246. response, err := controller.GetMetadata(request)
  247. if err != nil {
  248. return nil, int32(0), err
  249. }
  250. return response.Brokers, response.ControllerID, nil
  251. }
  252. func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
  253. brokers := ca.client.Brokers()
  254. for _, b := range brokers {
  255. if b.ID() == id {
  256. return b, nil
  257. }
  258. }
  259. return nil, fmt.Errorf("could not find broker id %d", id)
  260. }
  261. func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
  262. brokers := ca.client.Brokers()
  263. if len(brokers) > 0 {
  264. index := rand.Intn(len(brokers))
  265. return brokers[index], nil
  266. }
  267. return nil, errors.New("no available broker")
  268. }
  269. func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
  270. // In order to build TopicDetails we need to first get the list of all
  271. // topics using a MetadataRequest and then get their configs using a
  272. // DescribeConfigsRequest request. To avoid sending many requests to the
  273. // broker, we use a single DescribeConfigsRequest.
  274. // Send the all-topic MetadataRequest
  275. b, err := ca.findAnyBroker()
  276. if err != nil {
  277. return nil, err
  278. }
  279. _ = b.Open(ca.client.Config())
  280. metadataReq := &MetadataRequest{}
  281. metadataResp, err := b.GetMetadata(metadataReq)
  282. if err != nil {
  283. return nil, err
  284. }
  285. topicsDetailsMap := make(map[string]TopicDetail)
  286. var describeConfigsResources []*ConfigResource
  287. for _, topic := range metadataResp.Topics {
  288. topicDetails := TopicDetail{
  289. NumPartitions: int32(len(topic.Partitions)),
  290. }
  291. if len(topic.Partitions) > 0 {
  292. topicDetails.ReplicaAssignment = map[int32][]int32{}
  293. for _, partition := range topic.Partitions {
  294. topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
  295. }
  296. topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
  297. }
  298. topicsDetailsMap[topic.Name] = topicDetails
  299. // we populate the resources we want to describe from the MetadataResponse
  300. topicResource := ConfigResource{
  301. Type: TopicResource,
  302. Name: topic.Name,
  303. }
  304. describeConfigsResources = append(describeConfigsResources, &topicResource)
  305. }
  306. // Send the DescribeConfigsRequest
  307. describeConfigsReq := &DescribeConfigsRequest{
  308. Resources: describeConfigsResources,
  309. }
  310. if ca.conf.Version.IsAtLeast(V1_1_0_0) {
  311. describeConfigsReq.Version = 1
  312. }
  313. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  314. describeConfigsReq.Version = 2
  315. }
  316. describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
  317. if err != nil {
  318. return nil, err
  319. }
  320. for _, resource := range describeConfigsResp.Resources {
  321. topicDetails := topicsDetailsMap[resource.Name]
  322. topicDetails.ConfigEntries = make(map[string]*string)
  323. for _, entry := range resource.Configs {
  324. // only include non-default non-sensitive config
  325. // (don't actually think topic config will ever be sensitive)
  326. if entry.Default || entry.Sensitive {
  327. continue
  328. }
  329. topicDetails.ConfigEntries[entry.Name] = &entry.Value
  330. }
  331. topicsDetailsMap[resource.Name] = topicDetails
  332. }
  333. return topicsDetailsMap, nil
  334. }
  335. func (ca *clusterAdmin) DeleteTopic(topic string) error {
  336. if topic == "" {
  337. return ErrInvalidTopic
  338. }
  339. request := &DeleteTopicsRequest{
  340. Topics: []string{topic},
  341. Timeout: ca.conf.Admin.Timeout,
  342. }
  343. if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  344. request.Version = 1
  345. }
  346. return ca.retryOnError(isErrNoController, func() error {
  347. b, err := ca.Controller()
  348. if err != nil {
  349. return err
  350. }
  351. rsp, err := b.DeleteTopics(request)
  352. if err != nil {
  353. return err
  354. }
  355. topicErr, ok := rsp.TopicErrorCodes[topic]
  356. if !ok {
  357. return ErrIncompleteResponse
  358. }
  359. if topicErr != ErrNoError {
  360. if topicErr == ErrNotController {
  361. _, _ = ca.refreshController()
  362. }
  363. return topicErr
  364. }
  365. return nil
  366. })
  367. }
  368. func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
  369. if topic == "" {
  370. return ErrInvalidTopic
  371. }
  372. topicPartitions := make(map[string]*TopicPartition)
  373. topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
  374. request := &CreatePartitionsRequest{
  375. TopicPartitions: topicPartitions,
  376. Timeout: ca.conf.Admin.Timeout,
  377. ValidateOnly: validateOnly,
  378. }
  379. return ca.retryOnError(isErrNoController, func() error {
  380. b, err := ca.Controller()
  381. if err != nil {
  382. return err
  383. }
  384. rsp, err := b.CreatePartitions(request)
  385. if err != nil {
  386. return err
  387. }
  388. topicErr, ok := rsp.TopicPartitionErrors[topic]
  389. if !ok {
  390. return ErrIncompleteResponse
  391. }
  392. if topicErr.Err != ErrNoError {
  393. if topicErr.Err == ErrNotController {
  394. _, _ = ca.refreshController()
  395. }
  396. return topicErr
  397. }
  398. return nil
  399. })
  400. }
  401. func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
  402. if topic == "" {
  403. return ErrInvalidTopic
  404. }
  405. request := &AlterPartitionReassignmentsRequest{
  406. TimeoutMs: int32(60000),
  407. Version: int16(0),
  408. }
  409. for i := 0; i < len(assignment); i++ {
  410. request.AddBlock(topic, int32(i), assignment[i])
  411. }
  412. return ca.retryOnError(isErrNoController, func() error {
  413. b, err := ca.Controller()
  414. if err != nil {
  415. return err
  416. }
  417. errs := make([]error, 0)
  418. rsp, err := b.AlterPartitionReassignments(request)
  419. if err != nil {
  420. errs = append(errs, err)
  421. } else {
  422. if rsp.ErrorCode > 0 {
  423. errs = append(errs, errors.New(rsp.ErrorCode.Error()))
  424. }
  425. for topic, topicErrors := range rsp.Errors {
  426. for partition, partitionError := range topicErrors {
  427. if partitionError.errorCode != ErrNoError {
  428. errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
  429. errs = append(errs, errors.New(errStr))
  430. }
  431. }
  432. }
  433. }
  434. if len(errs) > 0 {
  435. return ErrReassignPartitions{MultiError{&errs}}
  436. }
  437. return nil
  438. })
  439. }
  440. func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
  441. if topic == "" {
  442. return nil, ErrInvalidTopic
  443. }
  444. request := &ListPartitionReassignmentsRequest{
  445. TimeoutMs: int32(60000),
  446. Version: int16(0),
  447. }
  448. request.AddBlock(topic, partitions)
  449. b, err := ca.Controller()
  450. if err != nil {
  451. return nil, err
  452. }
  453. _ = b.Open(ca.client.Config())
  454. rsp, err := b.ListPartitionReassignments(request)
  455. if err == nil && rsp != nil {
  456. return rsp.TopicStatus, nil
  457. } else {
  458. return nil, err
  459. }
  460. }
  461. func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
  462. if topic == "" {
  463. return ErrInvalidTopic
  464. }
  465. partitionPerBroker := make(map[*Broker][]int32)
  466. for partition := range partitionOffsets {
  467. broker, err := ca.client.Leader(topic, partition)
  468. if err != nil {
  469. return err
  470. }
  471. partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
  472. }
  473. errs := make([]error, 0)
  474. for broker, partitions := range partitionPerBroker {
  475. topics := make(map[string]*DeleteRecordsRequestTopic)
  476. recordsToDelete := make(map[int32]int64)
  477. for _, p := range partitions {
  478. recordsToDelete[p] = partitionOffsets[p]
  479. }
  480. topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
  481. request := &DeleteRecordsRequest{
  482. Topics: topics,
  483. Timeout: ca.conf.Admin.Timeout,
  484. }
  485. rsp, err := broker.DeleteRecords(request)
  486. if err != nil {
  487. errs = append(errs, err)
  488. } else {
  489. deleteRecordsResponseTopic, ok := rsp.Topics[topic]
  490. if !ok {
  491. errs = append(errs, ErrIncompleteResponse)
  492. } else {
  493. for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
  494. if deleteRecordsResponsePartition.Err != ErrNoError {
  495. errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
  496. }
  497. }
  498. }
  499. }
  500. }
  501. if len(errs) > 0 {
  502. return ErrDeleteRecords{MultiError{&errs}}
  503. }
  504. // todo since we are dealing with couple of partitions it would be good if we return slice of errors
  505. // for each partition instead of one error
  506. return nil
  507. }
  508. // Returns a bool indicating whether the resource request needs to go to a
  509. // specific broker
  510. func dependsOnSpecificNode(resource ConfigResource) bool {
  511. return (resource.Type == BrokerResource && resource.Name != "") ||
  512. resource.Type == BrokerLoggerResource
  513. }
  514. func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
  515. var entries []ConfigEntry
  516. var resources []*ConfigResource
  517. resources = append(resources, &resource)
  518. request := &DescribeConfigsRequest{
  519. Resources: resources,
  520. }
  521. if ca.conf.Version.IsAtLeast(V1_1_0_0) {
  522. request.Version = 1
  523. }
  524. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  525. request.Version = 2
  526. }
  527. var (
  528. b *Broker
  529. err error
  530. )
  531. // DescribeConfig of broker/broker logger must be sent to the broker in question
  532. if dependsOnSpecificNode(resource) {
  533. var id int64
  534. id, err = strconv.ParseInt(resource.Name, 10, 32)
  535. if err != nil {
  536. return nil, err
  537. }
  538. b, err = ca.findBroker(int32(id))
  539. } else {
  540. b, err = ca.findAnyBroker()
  541. }
  542. if err != nil {
  543. return nil, err
  544. }
  545. _ = b.Open(ca.client.Config())
  546. rsp, err := b.DescribeConfigs(request)
  547. if err != nil {
  548. return nil, err
  549. }
  550. for _, rspResource := range rsp.Resources {
  551. if rspResource.Name == resource.Name {
  552. if rspResource.ErrorMsg != "" {
  553. return nil, errors.New(rspResource.ErrorMsg)
  554. }
  555. if rspResource.ErrorCode != 0 {
  556. return nil, KError(rspResource.ErrorCode)
  557. }
  558. for _, cfgEntry := range rspResource.Configs {
  559. entries = append(entries, *cfgEntry)
  560. }
  561. }
  562. }
  563. return entries, nil
  564. }
  565. func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
  566. var resources []*AlterConfigsResource
  567. resources = append(resources, &AlterConfigsResource{
  568. Type: resourceType,
  569. Name: name,
  570. ConfigEntries: entries,
  571. })
  572. request := &AlterConfigsRequest{
  573. Resources: resources,
  574. ValidateOnly: validateOnly,
  575. }
  576. var (
  577. b *Broker
  578. err error
  579. )
  580. // AlterConfig of broker/broker logger must be sent to the broker in question
  581. if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
  582. var id int64
  583. id, err = strconv.ParseInt(name, 10, 32)
  584. if err != nil {
  585. return err
  586. }
  587. b, err = ca.findBroker(int32(id))
  588. } else {
  589. b, err = ca.findAnyBroker()
  590. }
  591. if err != nil {
  592. return err
  593. }
  594. _ = b.Open(ca.client.Config())
  595. rsp, err := b.AlterConfigs(request)
  596. if err != nil {
  597. return err
  598. }
  599. for _, rspResource := range rsp.Resources {
  600. if rspResource.Name == name {
  601. if rspResource.ErrorMsg != "" {
  602. return errors.New(rspResource.ErrorMsg)
  603. }
  604. if rspResource.ErrorCode != 0 {
  605. return KError(rspResource.ErrorCode)
  606. }
  607. }
  608. }
  609. return nil
  610. }
  611. func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
  612. var acls []*AclCreation
  613. acls = append(acls, &AclCreation{resource, acl})
  614. request := &CreateAclsRequest{AclCreations: acls}
  615. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  616. request.Version = 1
  617. }
  618. b, err := ca.Controller()
  619. if err != nil {
  620. return err
  621. }
  622. _, err = b.CreateAcls(request)
  623. return err
  624. }
  625. func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
  626. request := &DescribeAclsRequest{AclFilter: filter}
  627. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  628. request.Version = 1
  629. }
  630. b, err := ca.Controller()
  631. if err != nil {
  632. return nil, err
  633. }
  634. rsp, err := b.DescribeAcls(request)
  635. if err != nil {
  636. return nil, err
  637. }
  638. var lAcls []ResourceAcls
  639. for _, rAcl := range rsp.ResourceAcls {
  640. lAcls = append(lAcls, *rAcl)
  641. }
  642. return lAcls, nil
  643. }
  644. func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
  645. var filters []*AclFilter
  646. filters = append(filters, &filter)
  647. request := &DeleteAclsRequest{Filters: filters}
  648. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  649. request.Version = 1
  650. }
  651. b, err := ca.Controller()
  652. if err != nil {
  653. return nil, err
  654. }
  655. rsp, err := b.DeleteAcls(request)
  656. if err != nil {
  657. return nil, err
  658. }
  659. var mAcls []MatchingAcl
  660. for _, fr := range rsp.FilterResponses {
  661. for _, mACL := range fr.MatchingAcls {
  662. mAcls = append(mAcls, *mACL)
  663. }
  664. }
  665. return mAcls, nil
  666. }
  667. func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
  668. groupsPerBroker := make(map[*Broker][]string)
  669. for _, group := range groups {
  670. controller, err := ca.client.Coordinator(group)
  671. if err != nil {
  672. return nil, err
  673. }
  674. groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
  675. }
  676. for broker, brokerGroups := range groupsPerBroker {
  677. response, err := broker.DescribeGroups(&DescribeGroupsRequest{
  678. Groups: brokerGroups,
  679. })
  680. if err != nil {
  681. return nil, err
  682. }
  683. result = append(result, response.Groups...)
  684. }
  685. return result, nil
  686. }
  687. func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
  688. allGroups = make(map[string]string)
  689. // Query brokers in parallel, since we have to query *all* brokers
  690. brokers := ca.client.Brokers()
  691. groupMaps := make(chan map[string]string, len(brokers))
  692. errChan := make(chan error, len(brokers))
  693. wg := sync.WaitGroup{}
  694. for _, b := range brokers {
  695. wg.Add(1)
  696. go func(b *Broker, conf *Config) {
  697. defer wg.Done()
  698. _ = b.Open(conf) // Ensure that broker is opened
  699. response, err := b.ListGroups(&ListGroupsRequest{})
  700. if err != nil {
  701. errChan <- err
  702. return
  703. }
  704. groups := make(map[string]string)
  705. for group, typ := range response.Groups {
  706. groups[group] = typ
  707. }
  708. groupMaps <- groups
  709. }(b, ca.conf)
  710. }
  711. wg.Wait()
  712. close(groupMaps)
  713. close(errChan)
  714. for groupMap := range groupMaps {
  715. for group, protocolType := range groupMap {
  716. allGroups[group] = protocolType
  717. }
  718. }
  719. // Intentionally return only the first error for simplicity
  720. err = <-errChan
  721. return
  722. }
  723. func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
  724. coordinator, err := ca.client.Coordinator(group)
  725. if err != nil {
  726. return nil, err
  727. }
  728. request := &OffsetFetchRequest{
  729. ConsumerGroup: group,
  730. partitions: topicPartitions,
  731. }
  732. if ca.conf.Version.IsAtLeast(V0_10_2_0) {
  733. request.Version = 2
  734. } else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
  735. request.Version = 1
  736. }
  737. return coordinator.FetchOffset(request)
  738. }
  739. func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error {
  740. coordinator, err := ca.client.Coordinator(group)
  741. if err != nil {
  742. return err
  743. }
  744. request := &DeleteOffsetsRequest{
  745. Group: group,
  746. partitions: map[string][]int32{
  747. topic: {partition},
  748. },
  749. }
  750. resp, err := coordinator.DeleteOffsets(request)
  751. if err != nil {
  752. return err
  753. }
  754. if resp.ErrorCode != ErrNoError {
  755. return resp.ErrorCode
  756. }
  757. if resp.Errors[topic][partition] != ErrNoError {
  758. return resp.Errors[topic][partition]
  759. }
  760. return nil
  761. }
  762. func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
  763. coordinator, err := ca.client.Coordinator(group)
  764. if err != nil {
  765. return err
  766. }
  767. request := &DeleteGroupsRequest{
  768. Groups: []string{group},
  769. }
  770. resp, err := coordinator.DeleteGroups(request)
  771. if err != nil {
  772. return err
  773. }
  774. groupErr, ok := resp.GroupErrorCodes[group]
  775. if !ok {
  776. return ErrIncompleteResponse
  777. }
  778. if groupErr != ErrNoError {
  779. return groupErr
  780. }
  781. return nil
  782. }
  783. func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
  784. allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
  785. // Query brokers in parallel, since we may have to query multiple brokers
  786. logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
  787. errChan := make(chan error, len(brokerIds))
  788. wg := sync.WaitGroup{}
  789. for _, b := range brokerIds {
  790. wg.Add(1)
  791. broker, err := ca.findBroker(b)
  792. if err != nil {
  793. Logger.Printf("Unable to find broker with ID = %v\n", b)
  794. continue
  795. }
  796. go func(b *Broker, conf *Config) {
  797. defer wg.Done()
  798. _ = b.Open(conf) // Ensure that broker is opened
  799. response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
  800. if err != nil {
  801. errChan <- err
  802. return
  803. }
  804. logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
  805. logDirs[b.ID()] = response.LogDirs
  806. logDirsMaps <- logDirs
  807. }(broker, ca.conf)
  808. }
  809. wg.Wait()
  810. close(logDirsMaps)
  811. close(errChan)
  812. for logDirsMap := range logDirsMaps {
  813. for id, logDirs := range logDirsMap {
  814. allLogDirs[id] = logDirs
  815. }
  816. }
  817. // Intentionally return only the first error for simplicity
  818. err = <-errChan
  819. return
  820. }
  821. func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
  822. req := &DescribeUserScramCredentialsRequest{}
  823. for _, u := range users {
  824. req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
  825. Name: u,
  826. })
  827. }
  828. b, err := ca.Controller()
  829. if err != nil {
  830. return nil, err
  831. }
  832. rsp, err := b.DescribeUserScramCredentials(req)
  833. if err != nil {
  834. return nil, err
  835. }
  836. return rsp.Results, nil
  837. }
  838. func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
  839. res, err := ca.AlterUserScramCredentials(upsert, nil)
  840. if err != nil {
  841. return nil, err
  842. }
  843. return res, nil
  844. }
  845. func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
  846. res, err := ca.AlterUserScramCredentials(nil, delete)
  847. if err != nil {
  848. return nil, err
  849. }
  850. return res, nil
  851. }
  852. func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
  853. req := &AlterUserScramCredentialsRequest{
  854. Deletions: d,
  855. Upsertions: u,
  856. }
  857. b, err := ca.Controller()
  858. if err != nil {
  859. return nil, err
  860. }
  861. rsp, err := b.AlterUserScramCredentials(req)
  862. if err != nil {
  863. return nil, err
  864. }
  865. return rsp.Results, nil
  866. }
  867. // Describe All : use an empty/nil components slice + strict = false
  868. // Contains components: strict = false
  869. // Contains only components: strict = true
  870. func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) {
  871. request := &DescribeClientQuotasRequest{
  872. Components: components,
  873. Strict: strict,
  874. }
  875. b, err := ca.Controller()
  876. if err != nil {
  877. return nil, err
  878. }
  879. rsp, err := b.DescribeClientQuotas(request)
  880. if err != nil {
  881. return nil, err
  882. }
  883. if rsp.ErrorMsg != nil {
  884. return nil, errors.New(*rsp.ErrorMsg)
  885. }
  886. if rsp.ErrorCode != ErrNoError {
  887. return nil, rsp.ErrorCode
  888. }
  889. return rsp.Entries, nil
  890. }
  891. func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error {
  892. entry := AlterClientQuotasEntry{
  893. Entity: entity,
  894. Ops: []ClientQuotasOp{op},
  895. }
  896. request := &AlterClientQuotasRequest{
  897. Entries: []AlterClientQuotasEntry{entry},
  898. ValidateOnly: validateOnly,
  899. }
  900. b, err := ca.Controller()
  901. if err != nil {
  902. return err
  903. }
  904. rsp, err := b.AlterClientQuotas(request)
  905. if err != nil {
  906. return err
  907. }
  908. for _, entry := range rsp.Entries {
  909. if entry.ErrorCode != ErrNoError {
  910. return entry.ErrorCode
  911. }
  912. }
  913. return nil
  914. }