broker.go 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602
  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/rcrowley/go-metrics"
  15. )
  16. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  17. type Broker struct {
  18. conf *Config
  19. rack *string
  20. id int32
  21. addr string
  22. correlationID int32
  23. conn net.Conn
  24. connErr error
  25. lock sync.Mutex
  26. opened int32
  27. responses chan responsePromise
  28. done chan bool
  29. registeredMetrics []string
  30. incomingByteRate metrics.Meter
  31. requestRate metrics.Meter
  32. requestSize metrics.Histogram
  33. requestLatency metrics.Histogram
  34. outgoingByteRate metrics.Meter
  35. responseRate metrics.Meter
  36. responseSize metrics.Histogram
  37. requestsInFlight metrics.Counter
  38. brokerIncomingByteRate metrics.Meter
  39. brokerRequestRate metrics.Meter
  40. brokerRequestSize metrics.Histogram
  41. brokerRequestLatency metrics.Histogram
  42. brokerOutgoingByteRate metrics.Meter
  43. brokerResponseRate metrics.Meter
  44. brokerResponseSize metrics.Histogram
  45. brokerRequestsInFlight metrics.Counter
  46. brokerThrottleTime metrics.Histogram
  47. kerberosAuthenticator GSSAPIKerberosAuth
  48. }
  49. // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
  50. type SASLMechanism string
  51. const (
  52. // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
  53. SASLTypeOAuth = "OAUTHBEARER"
  54. // SASLTypePlaintext represents the SASL/PLAIN mechanism
  55. SASLTypePlaintext = "PLAIN"
  56. // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
  57. SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
  58. // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
  59. SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
  60. SASLTypeGSSAPI = "GSSAPI"
  61. // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
  62. // server negotiate SASL auth using opaque packets.
  63. SASLHandshakeV0 = int16(0)
  64. // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
  65. // server negotiate SASL by wrapping tokens with Kafka protocol headers.
  66. SASLHandshakeV1 = int16(1)
  67. // SASLExtKeyAuth is the reserved extension key name sent as part of the
  68. // SASL/OAUTHBEARER initial client response
  69. SASLExtKeyAuth = "auth"
  70. )
  71. // AccessToken contains an access token used to authenticate a
  72. // SASL/OAUTHBEARER client along with associated metadata.
  73. type AccessToken struct {
  74. // Token is the access token payload.
  75. Token string
  76. // Extensions is a optional map of arbitrary key-value pairs that can be
  77. // sent with the SASL/OAUTHBEARER initial client response. These values are
  78. // ignored by the SASL server if they are unexpected. This feature is only
  79. // supported by Kafka >= 2.1.0.
  80. Extensions map[string]string
  81. }
  82. // AccessTokenProvider is the interface that encapsulates how implementors
  83. // can generate access tokens for Kafka broker authentication.
  84. type AccessTokenProvider interface {
  85. // Token returns an access token. The implementation should ensure token
  86. // reuse so that multiple calls at connect time do not create multiple
  87. // tokens. The implementation should also periodically refresh the token in
  88. // order to guarantee that each call returns an unexpired token. This
  89. // method should not block indefinitely--a timeout error should be returned
  90. // after a short period of inactivity so that the broker connection logic
  91. // can log debugging information and retry.
  92. Token() (*AccessToken, error)
  93. }
  94. // SCRAMClient is a an interface to a SCRAM
  95. // client implementation.
  96. type SCRAMClient interface {
  97. // Begin prepares the client for the SCRAM exchange
  98. // with the server with a user name and a password
  99. Begin(userName, password, authzID string) error
  100. // Step steps client through the SCRAM exchange. It is
  101. // called repeatedly until it errors or `Done` returns true.
  102. Step(challenge string) (response string, err error)
  103. // Done should return true when the SCRAM conversation
  104. // is over.
  105. Done() bool
  106. }
  107. type responsePromise struct {
  108. requestTime time.Time
  109. correlationID int32
  110. headerVersion int16
  111. packets chan []byte
  112. errors chan error
  113. }
  114. // NewBroker creates and returns a Broker targeting the given host:port address.
  115. // This does not attempt to actually connect, you have to call Open() for that.
  116. func NewBroker(addr string) *Broker {
  117. return &Broker{id: -1, addr: addr}
  118. }
  119. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  120. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  121. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  122. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  123. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  124. func (b *Broker) Open(conf *Config) error {
  125. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  126. return ErrAlreadyConnected
  127. }
  128. if conf == nil {
  129. conf = NewConfig()
  130. }
  131. err := conf.Validate()
  132. if err != nil {
  133. return err
  134. }
  135. usingApiVersionsRequests := conf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest
  136. b.lock.Lock()
  137. go withRecover(func() {
  138. defer func() {
  139. b.lock.Unlock()
  140. // Send an ApiVersionsRequest to identify the client (KIP-511).
  141. // Ideally Sarama would use the response to control protocol versions,
  142. // but for now just fire-and-forget just to send
  143. if usingApiVersionsRequests {
  144. _, err = b.ApiVersions(&ApiVersionsRequest{
  145. Version: 3,
  146. ClientSoftwareName: defaultClientSoftwareName,
  147. ClientSoftwareVersion: version(),
  148. })
  149. if err != nil {
  150. Logger.Printf("Error while sending ApiVersionsRequest to broker %s: %s\n", b.addr, err)
  151. }
  152. }
  153. }()
  154. dialer := conf.getDialer()
  155. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  156. if b.connErr != nil {
  157. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  158. b.conn = nil
  159. atomic.StoreInt32(&b.opened, 0)
  160. return
  161. }
  162. if conf.Net.TLS.Enable {
  163. b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
  164. }
  165. b.conn = newBufConn(b.conn)
  166. b.conf = conf
  167. // Create or reuse the global metrics shared between brokers
  168. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  169. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  170. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  171. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  172. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  173. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  174. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  175. b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
  176. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  177. // the same id (-1) and are already exposed through the global metrics above
  178. if b.id >= 0 && !metrics.UseNilMetrics {
  179. b.registerMetrics()
  180. }
  181. if conf.Net.SASL.Enable {
  182. b.connErr = b.authenticateViaSASL()
  183. if b.connErr != nil {
  184. err = b.conn.Close()
  185. if err == nil {
  186. DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
  187. } else {
  188. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  189. }
  190. b.conn = nil
  191. atomic.StoreInt32(&b.opened, 0)
  192. return
  193. }
  194. }
  195. b.done = make(chan bool)
  196. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  197. if b.id >= 0 {
  198. DebugLogger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  199. } else {
  200. DebugLogger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  201. }
  202. go withRecover(b.responseReceiver)
  203. })
  204. return nil
  205. }
  206. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  207. // connected but it had tried to connect, the error from that connection attempt is also returned.
  208. func (b *Broker) Connected() (bool, error) {
  209. b.lock.Lock()
  210. defer b.lock.Unlock()
  211. return b.conn != nil, b.connErr
  212. }
  213. // Close closes the broker resources
  214. func (b *Broker) Close() error {
  215. b.lock.Lock()
  216. defer b.lock.Unlock()
  217. if b.conn == nil {
  218. return ErrNotConnected
  219. }
  220. close(b.responses)
  221. <-b.done
  222. err := b.conn.Close()
  223. b.conn = nil
  224. b.connErr = nil
  225. b.done = nil
  226. b.responses = nil
  227. b.unregisterMetrics()
  228. if err == nil {
  229. DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
  230. } else {
  231. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  232. }
  233. atomic.StoreInt32(&b.opened, 0)
  234. return err
  235. }
  236. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  237. func (b *Broker) ID() int32 {
  238. return b.id
  239. }
  240. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  241. func (b *Broker) Addr() string {
  242. return b.addr
  243. }
  244. // Rack returns the broker's rack as retrieved from Kafka's metadata or the
  245. // empty string if it is not known. The returned value corresponds to the
  246. // broker's broker.rack configuration setting. Requires protocol version to be
  247. // at least v0.10.0.0.
  248. func (b *Broker) Rack() string {
  249. if b.rack == nil {
  250. return ""
  251. }
  252. return *b.rack
  253. }
  254. // GetMetadata send a metadata request and returns a metadata response or error
  255. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  256. response := new(MetadataResponse)
  257. err := b.sendAndReceive(request, response)
  258. if err != nil {
  259. return nil, err
  260. }
  261. return response, nil
  262. }
  263. // GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
  264. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  265. response := new(ConsumerMetadataResponse)
  266. err := b.sendAndReceive(request, response)
  267. if err != nil {
  268. return nil, err
  269. }
  270. return response, nil
  271. }
  272. // FindCoordinator sends a find coordinate request and returns a response or error
  273. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  274. response := new(FindCoordinatorResponse)
  275. err := b.sendAndReceive(request, response)
  276. if err != nil {
  277. return nil, err
  278. }
  279. return response, nil
  280. }
  281. // GetAvailableOffsets return an offset response or error
  282. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  283. response := new(OffsetResponse)
  284. err := b.sendAndReceive(request, response)
  285. if err != nil {
  286. return nil, err
  287. }
  288. return response, nil
  289. }
  290. // Produce returns a produce response or error
  291. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  292. var (
  293. response *ProduceResponse
  294. err error
  295. )
  296. if request.RequiredAcks == NoResponse {
  297. err = b.sendAndReceive(request, nil)
  298. } else {
  299. response = new(ProduceResponse)
  300. err = b.sendAndReceive(request, response)
  301. if response.ThrottleTime != time.Duration(0) {
  302. DebugLogger.Printf(
  303. "producer/broker/%d ProduceResponse throttled %v\n",
  304. b.ID(), response.ThrottleTime)
  305. if b.brokerThrottleTime != nil {
  306. throttleTimeInMs := int64(response.ThrottleTime / time.Millisecond)
  307. b.brokerThrottleTime.Update(throttleTimeInMs)
  308. }
  309. }
  310. }
  311. if err != nil {
  312. return nil, err
  313. }
  314. return response, nil
  315. }
  316. // Fetch returns a FetchResponse or error
  317. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  318. response := new(FetchResponse)
  319. err := b.sendAndReceive(request, response)
  320. if err != nil {
  321. return nil, err
  322. }
  323. return response, nil
  324. }
  325. // CommitOffset return an Offset commit response or error
  326. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  327. response := new(OffsetCommitResponse)
  328. err := b.sendAndReceive(request, response)
  329. if err != nil {
  330. return nil, err
  331. }
  332. return response, nil
  333. }
  334. // FetchOffset returns an offset fetch response or error
  335. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  336. response := new(OffsetFetchResponse)
  337. response.Version = request.Version // needed to handle the two header versions
  338. err := b.sendAndReceive(request, response)
  339. if err != nil {
  340. return nil, err
  341. }
  342. return response, nil
  343. }
  344. // JoinGroup returns a join group response or error
  345. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  346. response := new(JoinGroupResponse)
  347. err := b.sendAndReceive(request, response)
  348. if err != nil {
  349. return nil, err
  350. }
  351. return response, nil
  352. }
  353. // SyncGroup returns a sync group response or error
  354. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  355. response := new(SyncGroupResponse)
  356. err := b.sendAndReceive(request, response)
  357. if err != nil {
  358. return nil, err
  359. }
  360. return response, nil
  361. }
  362. // LeaveGroup return a leave group response or error
  363. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  364. response := new(LeaveGroupResponse)
  365. err := b.sendAndReceive(request, response)
  366. if err != nil {
  367. return nil, err
  368. }
  369. return response, nil
  370. }
  371. // Heartbeat returns a heartbeat response or error
  372. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  373. response := new(HeartbeatResponse)
  374. err := b.sendAndReceive(request, response)
  375. if err != nil {
  376. return nil, err
  377. }
  378. return response, nil
  379. }
  380. // ListGroups return a list group response or error
  381. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  382. response := new(ListGroupsResponse)
  383. err := b.sendAndReceive(request, response)
  384. if err != nil {
  385. return nil, err
  386. }
  387. return response, nil
  388. }
  389. // DescribeGroups return describe group response or error
  390. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  391. response := new(DescribeGroupsResponse)
  392. err := b.sendAndReceive(request, response)
  393. if err != nil {
  394. return nil, err
  395. }
  396. return response, nil
  397. }
  398. // ApiVersions return api version response or error
  399. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  400. response := new(ApiVersionsResponse)
  401. err := b.sendAndReceive(request, response)
  402. if err != nil {
  403. return nil, err
  404. }
  405. return response, nil
  406. }
  407. // CreateTopics send a create topic request and returns create topic response
  408. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  409. response := new(CreateTopicsResponse)
  410. err := b.sendAndReceive(request, response)
  411. if err != nil {
  412. return nil, err
  413. }
  414. return response, nil
  415. }
  416. // DeleteTopics sends a delete topic request and returns delete topic response
  417. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  418. response := new(DeleteTopicsResponse)
  419. err := b.sendAndReceive(request, response)
  420. if err != nil {
  421. return nil, err
  422. }
  423. return response, nil
  424. }
  425. // CreatePartitions sends a create partition request and returns create
  426. // partitions response or error
  427. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  428. response := new(CreatePartitionsResponse)
  429. err := b.sendAndReceive(request, response)
  430. if err != nil {
  431. return nil, err
  432. }
  433. return response, nil
  434. }
  435. // AlterPartitionReassignments sends a alter partition reassignments request and
  436. // returns alter partition reassignments response
  437. func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) {
  438. response := new(AlterPartitionReassignmentsResponse)
  439. err := b.sendAndReceive(request, response)
  440. if err != nil {
  441. return nil, err
  442. }
  443. return response, nil
  444. }
  445. // ListPartitionReassignments sends a list partition reassignments request and
  446. // returns list partition reassignments response
  447. func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) {
  448. response := new(ListPartitionReassignmentsResponse)
  449. err := b.sendAndReceive(request, response)
  450. if err != nil {
  451. return nil, err
  452. }
  453. return response, nil
  454. }
  455. // DeleteRecords send a request to delete records and return delete record
  456. // response or error
  457. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  458. response := new(DeleteRecordsResponse)
  459. err := b.sendAndReceive(request, response)
  460. if err != nil {
  461. return nil, err
  462. }
  463. return response, nil
  464. }
  465. // DescribeAcls sends a describe acl request and returns a response or error
  466. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  467. response := new(DescribeAclsResponse)
  468. err := b.sendAndReceive(request, response)
  469. if err != nil {
  470. return nil, err
  471. }
  472. return response, nil
  473. }
  474. // CreateAcls sends a create acl request and returns a response or error
  475. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  476. response := new(CreateAclsResponse)
  477. err := b.sendAndReceive(request, response)
  478. if err != nil {
  479. return nil, err
  480. }
  481. return response, nil
  482. }
  483. // DeleteAcls sends a delete acl request and returns a response or error
  484. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  485. response := new(DeleteAclsResponse)
  486. err := b.sendAndReceive(request, response)
  487. if err != nil {
  488. return nil, err
  489. }
  490. return response, nil
  491. }
  492. // InitProducerID sends an init producer request and returns a response or error
  493. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  494. response := new(InitProducerIDResponse)
  495. err := b.sendAndReceive(request, response)
  496. if err != nil {
  497. return nil, err
  498. }
  499. return response, nil
  500. }
  501. // AddPartitionsToTxn send a request to add partition to txn and returns
  502. // a response or error
  503. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  504. response := new(AddPartitionsToTxnResponse)
  505. err := b.sendAndReceive(request, response)
  506. if err != nil {
  507. return nil, err
  508. }
  509. return response, nil
  510. }
  511. // AddOffsetsToTxn sends a request to add offsets to txn and returns a response
  512. // or error
  513. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  514. response := new(AddOffsetsToTxnResponse)
  515. err := b.sendAndReceive(request, response)
  516. if err != nil {
  517. return nil, err
  518. }
  519. return response, nil
  520. }
  521. // EndTxn sends a request to end txn and returns a response or error
  522. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  523. response := new(EndTxnResponse)
  524. err := b.sendAndReceive(request, response)
  525. if err != nil {
  526. return nil, err
  527. }
  528. return response, nil
  529. }
  530. // TxnOffsetCommit sends a request to commit transaction offsets and returns
  531. // a response or error
  532. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  533. response := new(TxnOffsetCommitResponse)
  534. err := b.sendAndReceive(request, response)
  535. if err != nil {
  536. return nil, err
  537. }
  538. return response, nil
  539. }
  540. // DescribeConfigs sends a request to describe config and returns a response or
  541. // error
  542. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  543. response := new(DescribeConfigsResponse)
  544. err := b.sendAndReceive(request, response)
  545. if err != nil {
  546. return nil, err
  547. }
  548. return response, nil
  549. }
  550. // AlterConfigs sends a request to alter config and return a response or error
  551. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  552. response := new(AlterConfigsResponse)
  553. err := b.sendAndReceive(request, response)
  554. if err != nil {
  555. return nil, err
  556. }
  557. return response, nil
  558. }
  559. // IncrementalAlterConfigs sends a request to incremental alter config and return a response or error
  560. func (b *Broker) IncrementalAlterConfigs(request *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error) {
  561. response := new(IncrementalAlterConfigsResponse)
  562. err := b.sendAndReceive(request, response)
  563. if err != nil {
  564. return nil, err
  565. }
  566. return response, nil
  567. }
  568. // DeleteGroups sends a request to delete groups and returns a response or error
  569. func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
  570. response := new(DeleteGroupsResponse)
  571. if err := b.sendAndReceive(request, response); err != nil {
  572. return nil, err
  573. }
  574. return response, nil
  575. }
  576. // DeleteOffsets sends a request to delete group offsets and returns a response or error
  577. func (b *Broker) DeleteOffsets(request *DeleteOffsetsRequest) (*DeleteOffsetsResponse, error) {
  578. response := new(DeleteOffsetsResponse)
  579. if err := b.sendAndReceive(request, response); err != nil {
  580. return nil, err
  581. }
  582. return response, nil
  583. }
  584. // DescribeLogDirs sends a request to get the broker's log dir paths and sizes
  585. func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
  586. response := new(DescribeLogDirsResponse)
  587. err := b.sendAndReceive(request, response)
  588. if err != nil {
  589. return nil, err
  590. }
  591. return response, nil
  592. }
  593. // DescribeUserScramCredentials sends a request to get SCRAM users
  594. func (b *Broker) DescribeUserScramCredentials(req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) {
  595. res := new(DescribeUserScramCredentialsResponse)
  596. err := b.sendAndReceive(req, res)
  597. if err != nil {
  598. return nil, err
  599. }
  600. return res, err
  601. }
  602. func (b *Broker) AlterUserScramCredentials(req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) {
  603. res := new(AlterUserScramCredentialsResponse)
  604. err := b.sendAndReceive(req, res)
  605. if err != nil {
  606. return nil, err
  607. }
  608. return res, nil
  609. }
  610. // DescribeClientQuotas sends a request to get the broker's quotas
  611. func (b *Broker) DescribeClientQuotas(request *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error) {
  612. response := new(DescribeClientQuotasResponse)
  613. err := b.sendAndReceive(request, response)
  614. if err != nil {
  615. return nil, err
  616. }
  617. return response, nil
  618. }
  619. // AlterClientQuotas sends a request to alter the broker's quotas
  620. func (b *Broker) AlterClientQuotas(request *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) {
  621. response := new(AlterClientQuotasResponse)
  622. err := b.sendAndReceive(request, response)
  623. if err != nil {
  624. return nil, err
  625. }
  626. return response, nil
  627. }
  628. // readFull ensures the conn ReadDeadline has been setup before making a
  629. // call to io.ReadFull
  630. func (b *Broker) readFull(buf []byte) (n int, err error) {
  631. if err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)); err != nil {
  632. return 0, err
  633. }
  634. return io.ReadFull(b.conn, buf)
  635. }
  636. // write ensures the conn WriteDeadline has been setup before making a
  637. // call to conn.Write
  638. func (b *Broker) write(buf []byte) (n int, err error) {
  639. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  640. return 0, err
  641. }
  642. return b.conn.Write(buf)
  643. }
  644. func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
  645. b.lock.Lock()
  646. defer b.lock.Unlock()
  647. if b.conn == nil {
  648. if b.connErr != nil {
  649. return nil, b.connErr
  650. }
  651. return nil, ErrNotConnected
  652. }
  653. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  654. return nil, ErrUnsupportedVersion
  655. }
  656. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  657. buf, err := encode(req, b.conf.MetricRegistry)
  658. if err != nil {
  659. return nil, err
  660. }
  661. requestTime := time.Now()
  662. // Will be decremented in responseReceiver (except error or request with NoResponse)
  663. b.addRequestInFlightMetrics(1)
  664. bytes, err := b.write(buf)
  665. b.updateOutgoingCommunicationMetrics(bytes)
  666. if err != nil {
  667. b.addRequestInFlightMetrics(-1)
  668. return nil, err
  669. }
  670. b.correlationID++
  671. if !promiseResponse {
  672. // Record request latency without the response
  673. b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
  674. return nil, nil
  675. }
  676. promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
  677. b.responses <- promise
  678. return &promise, nil
  679. }
  680. func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
  681. responseHeaderVersion := int16(-1)
  682. if res != nil {
  683. responseHeaderVersion = res.headerVersion()
  684. }
  685. promise, err := b.send(req, res != nil, responseHeaderVersion)
  686. if err != nil {
  687. return err
  688. }
  689. if promise == nil {
  690. return nil
  691. }
  692. select {
  693. case buf := <-promise.packets:
  694. return versionedDecode(buf, res, req.version())
  695. case err = <-promise.errors:
  696. return err
  697. }
  698. }
  699. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  700. b.id, err = pd.getInt32()
  701. if err != nil {
  702. return err
  703. }
  704. host, err := pd.getString()
  705. if err != nil {
  706. return err
  707. }
  708. port, err := pd.getInt32()
  709. if err != nil {
  710. return err
  711. }
  712. if version >= 1 {
  713. b.rack, err = pd.getNullableString()
  714. if err != nil {
  715. return err
  716. }
  717. }
  718. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  719. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  720. return err
  721. }
  722. return nil
  723. }
  724. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  725. host, portstr, err := net.SplitHostPort(b.addr)
  726. if err != nil {
  727. return err
  728. }
  729. port, err := strconv.ParseInt(portstr, 10, 32)
  730. if err != nil {
  731. return err
  732. }
  733. pe.putInt32(b.id)
  734. err = pe.putString(host)
  735. if err != nil {
  736. return err
  737. }
  738. pe.putInt32(int32(port))
  739. if version >= 1 {
  740. err = pe.putNullableString(b.rack)
  741. if err != nil {
  742. return err
  743. }
  744. }
  745. return nil
  746. }
  747. func (b *Broker) responseReceiver() {
  748. var dead error
  749. for response := range b.responses {
  750. if dead != nil {
  751. // This was previously incremented in send() and
  752. // we are not calling updateIncomingCommunicationMetrics()
  753. b.addRequestInFlightMetrics(-1)
  754. response.errors <- dead
  755. continue
  756. }
  757. headerLength := getHeaderLength(response.headerVersion)
  758. header := make([]byte, headerLength)
  759. bytesReadHeader, err := b.readFull(header)
  760. requestLatency := time.Since(response.requestTime)
  761. if err != nil {
  762. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  763. dead = err
  764. response.errors <- err
  765. continue
  766. }
  767. decodedHeader := responseHeader{}
  768. err = versionedDecode(header, &decodedHeader, response.headerVersion)
  769. if err != nil {
  770. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  771. dead = err
  772. response.errors <- err
  773. continue
  774. }
  775. if decodedHeader.correlationID != response.correlationID {
  776. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  777. // TODO if decoded ID < cur ID, discard until we catch up
  778. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  779. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  780. response.errors <- dead
  781. continue
  782. }
  783. buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
  784. bytesReadBody, err := b.readFull(buf)
  785. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  786. if err != nil {
  787. dead = err
  788. response.errors <- err
  789. continue
  790. }
  791. response.packets <- buf
  792. }
  793. close(b.done)
  794. }
  795. func getHeaderLength(headerVersion int16) int8 {
  796. if headerVersion < 1 {
  797. return 8
  798. } else {
  799. // header contains additional tagged field length (0), we don't support actual tags yet.
  800. return 9
  801. }
  802. }
  803. func (b *Broker) authenticateViaSASL() error {
  804. switch b.conf.Net.SASL.Mechanism {
  805. case SASLTypeOAuth:
  806. return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
  807. case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
  808. return b.sendAndReceiveSASLSCRAM()
  809. case SASLTypeGSSAPI:
  810. return b.sendAndReceiveKerberos()
  811. default:
  812. return b.sendAndReceiveSASLPlainAuth()
  813. }
  814. }
  815. func (b *Broker) sendAndReceiveKerberos() error {
  816. b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
  817. if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
  818. b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
  819. }
  820. return b.kerberosAuthenticator.Authorize(b)
  821. }
  822. func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
  823. rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
  824. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  825. buf, err := encode(req, b.conf.MetricRegistry)
  826. if err != nil {
  827. return err
  828. }
  829. requestTime := time.Now()
  830. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  831. b.addRequestInFlightMetrics(1)
  832. bytes, err := b.write(buf)
  833. b.updateOutgoingCommunicationMetrics(bytes)
  834. if err != nil {
  835. b.addRequestInFlightMetrics(-1)
  836. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  837. return err
  838. }
  839. b.correlationID++
  840. header := make([]byte, 8) // response header
  841. _, err = b.readFull(header)
  842. if err != nil {
  843. b.addRequestInFlightMetrics(-1)
  844. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  845. return err
  846. }
  847. length := binary.BigEndian.Uint32(header[:4])
  848. payload := make([]byte, length-4)
  849. n, err := b.readFull(payload)
  850. if err != nil {
  851. b.addRequestInFlightMetrics(-1)
  852. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  853. return err
  854. }
  855. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  856. res := &SaslHandshakeResponse{}
  857. err = versionedDecode(payload, res, 0)
  858. if err != nil {
  859. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  860. return err
  861. }
  862. if res.Err != ErrNoError {
  863. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  864. return res.Err
  865. }
  866. DebugLogger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
  867. return nil
  868. }
  869. // Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
  870. // Kafka 1.x.x onward added a SaslAuthenticate request/response message which
  871. // wraps the SASL flow in the Kafka protocol, which allows for returning
  872. // meaningful errors on authentication failure.
  873. //
  874. // In SASL Plain, Kafka expects the auth header to be in the following format
  875. // Message format (from https://tools.ietf.org/html/rfc4616):
  876. //
  877. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  878. // authcid = 1*SAFE ; MUST accept up to 255 octets
  879. // authzid = 1*SAFE ; MUST accept up to 255 octets
  880. // passwd = 1*SAFE ; MUST accept up to 255 octets
  881. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  882. //
  883. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  884. // ;; any UTF-8 encoded Unicode character except NUL
  885. //
  886. // With SASL v0 handshake and auth then:
  887. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  888. // When credentials are invalid, Kafka closes the connection.
  889. //
  890. // With SASL v1 handshake and auth then:
  891. // When credentials are invalid, Kafka replies with a SaslAuthenticate response
  892. // containing an error code and message detailing the authentication failure.
  893. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  894. // default to V0 to allow for backward compatibility when SASL is enabled
  895. // but not the handshake
  896. if b.conf.Net.SASL.Handshake {
  897. handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
  898. if handshakeErr != nil {
  899. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  900. return handshakeErr
  901. }
  902. }
  903. if b.conf.Net.SASL.Version == SASLHandshakeV1 {
  904. return b.sendAndReceiveV1SASLPlainAuth()
  905. }
  906. return b.sendAndReceiveV0SASLPlainAuth()
  907. }
  908. // sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
  909. func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
  910. length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  911. authBytes := make([]byte, length+4) // 4 byte length header + auth data
  912. binary.BigEndian.PutUint32(authBytes, uint32(length))
  913. copy(authBytes[4:], b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)
  914. requestTime := time.Now()
  915. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  916. b.addRequestInFlightMetrics(1)
  917. bytesWritten, err := b.write(authBytes)
  918. b.updateOutgoingCommunicationMetrics(bytesWritten)
  919. if err != nil {
  920. b.addRequestInFlightMetrics(-1)
  921. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  922. return err
  923. }
  924. header := make([]byte, 4)
  925. n, err := b.readFull(header)
  926. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  927. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  928. // Otherwise, the broker closes the connection and we get an EOF
  929. if err != nil {
  930. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  931. return err
  932. }
  933. DebugLogger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  934. return nil
  935. }
  936. // sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
  937. func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
  938. correlationID := b.correlationID
  939. requestTime := time.Now()
  940. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  941. b.addRequestInFlightMetrics(1)
  942. bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
  943. b.updateOutgoingCommunicationMetrics(bytesWritten)
  944. if err != nil {
  945. b.addRequestInFlightMetrics(-1)
  946. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  947. return err
  948. }
  949. b.correlationID++
  950. bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
  951. b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
  952. // With v1 sasl we get an error message set in the response we can return
  953. if err != nil {
  954. Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
  955. return err
  956. }
  957. return nil
  958. }
  959. // sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
  960. // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
  961. func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
  962. if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
  963. return err
  964. }
  965. token, err := provider.Token()
  966. if err != nil {
  967. return err
  968. }
  969. message, err := buildClientFirstMessage(token)
  970. if err != nil {
  971. return err
  972. }
  973. challenged, err := b.sendClientMessage(message)
  974. if err != nil {
  975. return err
  976. }
  977. if challenged {
  978. // Abort the token exchange. The broker returns the failure code.
  979. _, err = b.sendClientMessage([]byte(`\x01`))
  980. }
  981. return err
  982. }
  983. // sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
  984. // if the broker responds with a challenge, in which case the token is
  985. // rejected.
  986. func (b *Broker) sendClientMessage(message []byte) (bool, error) {
  987. requestTime := time.Now()
  988. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  989. b.addRequestInFlightMetrics(1)
  990. correlationID := b.correlationID
  991. bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
  992. b.updateOutgoingCommunicationMetrics(bytesWritten)
  993. if err != nil {
  994. b.addRequestInFlightMetrics(-1)
  995. return false, err
  996. }
  997. b.correlationID++
  998. res := &SaslAuthenticateResponse{}
  999. bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
  1000. requestLatency := time.Since(requestTime)
  1001. b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
  1002. isChallenge := len(res.SaslAuthBytes) > 0
  1003. if isChallenge && err != nil {
  1004. Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
  1005. }
  1006. return isChallenge, err
  1007. }
  1008. func (b *Broker) sendAndReceiveSASLSCRAM() error {
  1009. if b.conf.Net.SASL.Version == SASLHandshakeV0 {
  1010. return b.sendAndReceiveSASLSCRAMv0()
  1011. }
  1012. return b.sendAndReceiveSASLSCRAMv1()
  1013. }
  1014. func (b *Broker) sendAndReceiveSASLSCRAMv0() error {
  1015. if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV0); err != nil {
  1016. return err
  1017. }
  1018. scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
  1019. if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
  1020. return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
  1021. }
  1022. msg, err := scramClient.Step("")
  1023. if err != nil {
  1024. return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
  1025. }
  1026. for !scramClient.Done() {
  1027. requestTime := time.Now()
  1028. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  1029. b.addRequestInFlightMetrics(1)
  1030. length := len(msg)
  1031. authBytes := make([]byte, length+4) //4 byte length header + auth data
  1032. binary.BigEndian.PutUint32(authBytes, uint32(length))
  1033. copy(authBytes[4:], []byte(msg))
  1034. _, err := b.write(authBytes)
  1035. b.updateOutgoingCommunicationMetrics(length + 4)
  1036. if err != nil {
  1037. b.addRequestInFlightMetrics(-1)
  1038. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  1039. return err
  1040. }
  1041. b.correlationID++
  1042. header := make([]byte, 4)
  1043. _, err = b.readFull(header)
  1044. if err != nil {
  1045. b.addRequestInFlightMetrics(-1)
  1046. Logger.Printf("Failed to read response header while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  1047. return err
  1048. }
  1049. payload := make([]byte, int32(binary.BigEndian.Uint32(header)))
  1050. n, err := b.readFull(payload)
  1051. if err != nil {
  1052. b.addRequestInFlightMetrics(-1)
  1053. Logger.Printf("Failed to read response payload while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  1054. return err
  1055. }
  1056. b.updateIncomingCommunicationMetrics(n+4, time.Since(requestTime))
  1057. msg, err = scramClient.Step(string(payload))
  1058. if err != nil {
  1059. Logger.Println("SASL authentication failed", err)
  1060. return err
  1061. }
  1062. }
  1063. DebugLogger.Println("SASL authentication succeeded")
  1064. return nil
  1065. }
  1066. func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
  1067. if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
  1068. return err
  1069. }
  1070. scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
  1071. if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
  1072. return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
  1073. }
  1074. msg, err := scramClient.Step("")
  1075. if err != nil {
  1076. return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
  1077. }
  1078. for !scramClient.Done() {
  1079. requestTime := time.Now()
  1080. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  1081. b.addRequestInFlightMetrics(1)
  1082. correlationID := b.correlationID
  1083. bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
  1084. b.updateOutgoingCommunicationMetrics(bytesWritten)
  1085. if err != nil {
  1086. b.addRequestInFlightMetrics(-1)
  1087. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  1088. return err
  1089. }
  1090. b.correlationID++
  1091. challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
  1092. if err != nil {
  1093. b.addRequestInFlightMetrics(-1)
  1094. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  1095. return err
  1096. }
  1097. b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
  1098. msg, err = scramClient.Step(string(challenge))
  1099. if err != nil {
  1100. Logger.Println("SASL authentication failed", err)
  1101. return err
  1102. }
  1103. }
  1104. DebugLogger.Println("SASL authentication succeeded")
  1105. return nil
  1106. }
  1107. func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
  1108. rb := &SaslAuthenticateRequest{msg}
  1109. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  1110. buf, err := encode(req, b.conf.MetricRegistry)
  1111. if err != nil {
  1112. return 0, err
  1113. }
  1114. return b.write(buf)
  1115. }
  1116. func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
  1117. buf := make([]byte, responseLengthSize+correlationIDSize)
  1118. _, err := b.readFull(buf)
  1119. if err != nil {
  1120. return nil, err
  1121. }
  1122. header := responseHeader{}
  1123. err = versionedDecode(buf, &header, 0)
  1124. if err != nil {
  1125. return nil, err
  1126. }
  1127. if header.correlationID != correlationID {
  1128. return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  1129. }
  1130. buf = make([]byte, header.length-correlationIDSize)
  1131. _, err = b.readFull(buf)
  1132. if err != nil {
  1133. return nil, err
  1134. }
  1135. res := &SaslAuthenticateResponse{}
  1136. if err := versionedDecode(buf, res, 0); err != nil {
  1137. return nil, err
  1138. }
  1139. if res.Err != ErrNoError {
  1140. return nil, res.Err
  1141. }
  1142. return res.SaslAuthBytes, nil
  1143. }
  1144. // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
  1145. // https://tools.ietf.org/html/rfc7628
  1146. func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
  1147. var ext string
  1148. if token.Extensions != nil && len(token.Extensions) > 0 {
  1149. if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
  1150. return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
  1151. }
  1152. ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
  1153. }
  1154. resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
  1155. return resp, nil
  1156. }
  1157. // mapToString returns a list of key-value pairs ordered by key.
  1158. // keyValSep separates the key from the value. elemSep separates each pair.
  1159. func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
  1160. buf := make([]string, 0, len(extensions))
  1161. for k, v := range extensions {
  1162. buf = append(buf, k+keyValSep+v)
  1163. }
  1164. sort.Strings(buf)
  1165. return strings.Join(buf, elemSep)
  1166. }
  1167. func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
  1168. authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
  1169. rb := &SaslAuthenticateRequest{authBytes}
  1170. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  1171. buf, err := encode(req, b.conf.MetricRegistry)
  1172. if err != nil {
  1173. return 0, err
  1174. }
  1175. return b.write(buf)
  1176. }
  1177. func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
  1178. rb := &SaslAuthenticateRequest{initialResp}
  1179. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  1180. buf, err := encode(req, b.conf.MetricRegistry)
  1181. if err != nil {
  1182. return 0, err
  1183. }
  1184. return b.write(buf)
  1185. }
  1186. func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
  1187. buf := make([]byte, responseLengthSize+correlationIDSize)
  1188. bytesRead, err := b.readFull(buf)
  1189. if err != nil {
  1190. return bytesRead, err
  1191. }
  1192. header := responseHeader{}
  1193. err = versionedDecode(buf, &header, 0)
  1194. if err != nil {
  1195. return bytesRead, err
  1196. }
  1197. if header.correlationID != correlationID {
  1198. return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  1199. }
  1200. buf = make([]byte, header.length-correlationIDSize)
  1201. c, err := b.readFull(buf)
  1202. bytesRead += c
  1203. if err != nil {
  1204. return bytesRead, err
  1205. }
  1206. if err := versionedDecode(buf, res, 0); err != nil {
  1207. return bytesRead, err
  1208. }
  1209. if res.Err != ErrNoError {
  1210. return bytesRead, res.Err
  1211. }
  1212. return bytesRead, nil
  1213. }
  1214. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  1215. b.updateRequestLatencyAndInFlightMetrics(requestLatency)
  1216. b.responseRate.Mark(1)
  1217. if b.brokerResponseRate != nil {
  1218. b.brokerResponseRate.Mark(1)
  1219. }
  1220. responseSize := int64(bytes)
  1221. b.incomingByteRate.Mark(responseSize)
  1222. if b.brokerIncomingByteRate != nil {
  1223. b.brokerIncomingByteRate.Mark(responseSize)
  1224. }
  1225. b.responseSize.Update(responseSize)
  1226. if b.brokerResponseSize != nil {
  1227. b.brokerResponseSize.Update(responseSize)
  1228. }
  1229. }
  1230. func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
  1231. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  1232. b.requestLatency.Update(requestLatencyInMs)
  1233. if b.brokerRequestLatency != nil {
  1234. b.brokerRequestLatency.Update(requestLatencyInMs)
  1235. }
  1236. b.addRequestInFlightMetrics(-1)
  1237. }
  1238. func (b *Broker) addRequestInFlightMetrics(i int64) {
  1239. b.requestsInFlight.Inc(i)
  1240. if b.brokerRequestsInFlight != nil {
  1241. b.brokerRequestsInFlight.Inc(i)
  1242. }
  1243. }
  1244. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  1245. b.requestRate.Mark(1)
  1246. if b.brokerRequestRate != nil {
  1247. b.brokerRequestRate.Mark(1)
  1248. }
  1249. requestSize := int64(bytes)
  1250. b.outgoingByteRate.Mark(requestSize)
  1251. if b.brokerOutgoingByteRate != nil {
  1252. b.brokerOutgoingByteRate.Mark(requestSize)
  1253. }
  1254. b.requestSize.Update(requestSize)
  1255. if b.brokerRequestSize != nil {
  1256. b.brokerRequestSize.Update(requestSize)
  1257. }
  1258. }
  1259. func (b *Broker) registerMetrics() {
  1260. b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
  1261. b.brokerRequestRate = b.registerMeter("request-rate")
  1262. b.brokerRequestSize = b.registerHistogram("request-size")
  1263. b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
  1264. b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
  1265. b.brokerResponseRate = b.registerMeter("response-rate")
  1266. b.brokerResponseSize = b.registerHistogram("response-size")
  1267. b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
  1268. b.brokerThrottleTime = b.registerHistogram("throttle-time-in-ms")
  1269. }
  1270. func (b *Broker) unregisterMetrics() {
  1271. for _, name := range b.registeredMetrics {
  1272. b.conf.MetricRegistry.Unregister(name)
  1273. }
  1274. b.registeredMetrics = nil
  1275. }
  1276. func (b *Broker) registerMeter(name string) metrics.Meter {
  1277. nameForBroker := getMetricNameForBroker(name, b)
  1278. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1279. return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
  1280. }
  1281. func (b *Broker) registerHistogram(name string) metrics.Histogram {
  1282. nameForBroker := getMetricNameForBroker(name, b)
  1283. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1284. return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
  1285. }
  1286. func (b *Broker) registerCounter(name string) metrics.Counter {
  1287. nameForBroker := getMetricNameForBroker(name, b)
  1288. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1289. return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
  1290. }
  1291. func validServerNameTLS(addr string, cfg *tls.Config) *tls.Config {
  1292. if cfg == nil {
  1293. cfg = &tls.Config{
  1294. MinVersion: tls.VersionTLS12,
  1295. }
  1296. }
  1297. if cfg.ServerName != "" {
  1298. return cfg
  1299. }
  1300. c := cfg.Clone()
  1301. sn, _, err := net.SplitHostPort(addr)
  1302. if err != nil {
  1303. Logger.Println(fmt.Errorf("failed to get ServerName from addr %w", err))
  1304. }
  1305. c.ServerName = sn
  1306. return c
  1307. }