gssapi_kerberos.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "strings"
  9. "time"
  10. "github.com/jcmturner/gofork/encoding/asn1"
  11. "github.com/jcmturner/gokrb5/v8/asn1tools"
  12. "github.com/jcmturner/gokrb5/v8/gssapi"
  13. "github.com/jcmturner/gokrb5/v8/iana/chksumtype"
  14. "github.com/jcmturner/gokrb5/v8/iana/keyusage"
  15. "github.com/jcmturner/gokrb5/v8/messages"
  16. "github.com/jcmturner/gokrb5/v8/types"
  17. )
  18. const (
  19. TOK_ID_KRB_AP_REQ = 256
  20. GSS_API_GENERIC_TAG = 0x60
  21. KRB5_USER_AUTH = 1
  22. KRB5_KEYTAB_AUTH = 2
  23. GSS_API_INITIAL = 1
  24. GSS_API_VERIFY = 2
  25. GSS_API_FINISH = 3
  26. )
  27. type GSSAPIConfig struct {
  28. AuthType int
  29. KeyTabPath string
  30. KerberosConfigPath string
  31. ServiceName string
  32. Username string
  33. Password string
  34. Realm string
  35. DisablePAFXFAST bool
  36. }
  37. type GSSAPIKerberosAuth struct {
  38. Config *GSSAPIConfig
  39. ticket messages.Ticket
  40. encKey types.EncryptionKey
  41. NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error)
  42. step int
  43. }
  44. type KerberosClient interface {
  45. Login() error
  46. GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)
  47. Domain() string
  48. CName() types.PrincipalName
  49. Destroy()
  50. }
  51. // writePackage appends length in big endian before the payload, and sends it to kafka
  52. func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) (int, error) {
  53. length := uint64(len(payload))
  54. size := length + 4 // 4 byte length header + payload
  55. if size > math.MaxInt32 {
  56. return 0, errors.New("payload too large, will overflow int32")
  57. }
  58. finalPackage := make([]byte, size)
  59. copy(finalPackage[4:], payload)
  60. binary.BigEndian.PutUint32(finalPackage, uint32(length))
  61. bytes, err := broker.conn.Write(finalPackage)
  62. if err != nil {
  63. return bytes, err
  64. }
  65. return bytes, nil
  66. }
  67. // readPackage reads payload length (4 bytes) and then reads the payload into []byte
  68. func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, int, error) {
  69. bytesRead := 0
  70. lengthInBytes := make([]byte, 4)
  71. bytes, err := io.ReadFull(broker.conn, lengthInBytes)
  72. if err != nil {
  73. return nil, bytesRead, err
  74. }
  75. bytesRead += bytes
  76. payloadLength := binary.BigEndian.Uint32(lengthInBytes)
  77. payloadBytes := make([]byte, payloadLength) // buffer for read..
  78. bytes, err = io.ReadFull(broker.conn, payloadBytes) // read bytes
  79. if err != nil {
  80. return payloadBytes, bytesRead, err
  81. }
  82. bytesRead += bytes
  83. return payloadBytes, bytesRead, nil
  84. }
  85. func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum() []byte {
  86. a := make([]byte, 24)
  87. flags := []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf}
  88. binary.LittleEndian.PutUint32(a[:4], 16)
  89. for _, i := range flags {
  90. f := binary.LittleEndian.Uint32(a[20:24])
  91. f |= uint32(i)
  92. binary.LittleEndian.PutUint32(a[20:24], f)
  93. }
  94. return a
  95. }
  96. /*
  97. *
  98. * Construct Kerberos AP_REQ package, conforming to RFC-4120
  99. * https://tools.ietf.org/html/rfc4120#page-84
  100. *
  101. */
  102. func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(
  103. domain string, cname types.PrincipalName,
  104. ticket messages.Ticket,
  105. sessionKey types.EncryptionKey) ([]byte, error) {
  106. auth, err := types.NewAuthenticator(domain, cname)
  107. if err != nil {
  108. return nil, err
  109. }
  110. auth.Cksum = types.Checksum{
  111. CksumType: chksumtype.GSSAPI,
  112. Checksum: krbAuth.newAuthenticatorChecksum(),
  113. }
  114. APReq, err := messages.NewAPReq(
  115. ticket,
  116. sessionKey,
  117. auth,
  118. )
  119. if err != nil {
  120. return nil, err
  121. }
  122. aprBytes := make([]byte, 2)
  123. binary.BigEndian.PutUint16(aprBytes, TOK_ID_KRB_AP_REQ)
  124. tb, err := APReq.Marshal()
  125. if err != nil {
  126. return nil, err
  127. }
  128. aprBytes = append(aprBytes, tb...)
  129. return aprBytes, nil
  130. }
  131. /*
  132. *
  133. * Append the GSS-API header to the payload, conforming to RFC-2743
  134. * Section 3.1, Mechanism-Independent Token Format
  135. *
  136. * https://tools.ietf.org/html/rfc2743#page-81
  137. *
  138. * GSSAPIHeader + <specific mechanism payload>
  139. *
  140. */
  141. func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) ([]byte, error) {
  142. oidBytes, err := asn1.Marshal(gssapi.OIDKRB5.OID())
  143. if err != nil {
  144. return nil, err
  145. }
  146. tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload))
  147. GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...)
  148. GSSHeader = append(GSSHeader, oidBytes...)
  149. GSSPackage := append(GSSHeader, payload...)
  150. return GSSPackage, nil
  151. }
  152. func (krbAuth *GSSAPIKerberosAuth) initSecContext(bytes []byte, kerberosClient KerberosClient) ([]byte, error) {
  153. switch krbAuth.step {
  154. case GSS_API_INITIAL:
  155. aprBytes, err := krbAuth.createKrb5Token(
  156. kerberosClient.Domain(),
  157. kerberosClient.CName(),
  158. krbAuth.ticket,
  159. krbAuth.encKey)
  160. if err != nil {
  161. return nil, err
  162. }
  163. krbAuth.step = GSS_API_VERIFY
  164. return krbAuth.appendGSSAPIHeader(aprBytes)
  165. case GSS_API_VERIFY:
  166. wrapTokenReq := gssapi.WrapToken{}
  167. if err := wrapTokenReq.Unmarshal(bytes, true); err != nil {
  168. return nil, err
  169. }
  170. // Validate response.
  171. isValid, err := wrapTokenReq.Verify(krbAuth.encKey, keyusage.GSSAPI_ACCEPTOR_SEAL)
  172. if !isValid {
  173. return nil, err
  174. }
  175. wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encKey)
  176. if err != nil {
  177. return nil, err
  178. }
  179. krbAuth.step = GSS_API_FINISH
  180. return wrapTokenResponse.Marshal()
  181. }
  182. return nil, nil
  183. }
  184. /* This does the handshake for authorization */
  185. func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
  186. kerberosClient, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
  187. if err != nil {
  188. Logger.Printf("Kerberos client error: %s", err)
  189. return err
  190. }
  191. err = kerberosClient.Login()
  192. if err != nil {
  193. Logger.Printf("Kerberos client error: %s", err)
  194. return err
  195. }
  196. // Construct SPN using serviceName and host
  197. // SPN format: <SERVICE>/<FQDN>
  198. host := strings.SplitN(broker.addr, ":", 2)[0] // Strip port part
  199. spn := fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host)
  200. ticket, encKey, err := kerberosClient.GetServiceTicket(spn)
  201. if err != nil {
  202. Logger.Printf("Error getting Kerberos service ticket : %s", err)
  203. return err
  204. }
  205. krbAuth.ticket = ticket
  206. krbAuth.encKey = encKey
  207. krbAuth.step = GSS_API_INITIAL
  208. var receivedBytes []byte = nil
  209. defer kerberosClient.Destroy()
  210. for {
  211. packBytes, err := krbAuth.initSecContext(receivedBytes, kerberosClient)
  212. if err != nil {
  213. Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
  214. return err
  215. }
  216. requestTime := time.Now()
  217. bytesWritten, err := krbAuth.writePackage(broker, packBytes)
  218. if err != nil {
  219. Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
  220. return err
  221. }
  222. broker.updateOutgoingCommunicationMetrics(bytesWritten)
  223. if krbAuth.step == GSS_API_VERIFY {
  224. bytesRead := 0
  225. receivedBytes, bytesRead, err = krbAuth.readPackage(broker)
  226. requestLatency := time.Since(requestTime)
  227. broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
  228. if err != nil {
  229. Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
  230. return err
  231. }
  232. } else if krbAuth.step == GSS_API_FINISH {
  233. return nil
  234. }
  235. }
  236. }