kafka.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. package kpt
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "fmt"
  7. "github.com/Shopify/sarama"
  8. "github.com/Unknwon/goconfig"
  9. "github.com/siddontang/go-log/log"
  10. "io/ioutil"
  11. "os"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. kafka_host string
  19. kafka_port int
  20. kafka_topic string
  21. saslEnable bool
  22. username string
  23. saslpassword string
  24. tlsEnable bool
  25. clientcert string
  26. clientkey string
  27. cacert string
  28. Offset int64
  29. Breaki int
  30. onetimerows string
  31. onetimerowsMqtt string
  32. )
  33. func Kafka_producer(msgs string) bool{
  34. config := sarama.NewConfig()
  35. config.Producer.RequiredAcks = sarama.WaitForAll
  36. config.Producer.Partitioner = sarama.NewRandomPartitioner
  37. config.Producer.Return.Successes = true
  38. config.Producer.Return.Errors = true
  39. config.Version = sarama.V0_11_0_2
  40. var Address = []string{"127.0.0.1:9092"}
  41. client, err := sarama.NewClient(Address, config)
  42. if err != nil {
  43. log.Fatalf("unable to create kafka client: %q", err)
  44. return false
  45. }
  46. producer, err := sarama.NewSyncProducerFromClient(client)
  47. if err != nil {
  48. fmt.Printf("producer_test create producer error :%s\n", err.Error())
  49. return false
  50. }
  51. defer producer.Close()
  52. partition, offset,err := producer.SendMessage(&sarama.ProducerMessage{Topic: "kafka_go_test", Key: sarama.StringEncoder("go_test"), Value: sarama.StringEncoder(msgs)})
  53. if err!=nil {
  54. log.Printf("send message(%s) err=%s \n", msgs, err)
  55. return false
  56. }else {
  57. fmt.Fprintf(os.Stdout, msgs + "发送成功,partition=%d, offset=%d \n", partition, offset)
  58. }
  59. return true
  60. }
  61. type Log struct {
  62. id int64
  63. sql string
  64. }
  65. type LogCou struct {
  66. cou int64
  67. }
  68. func Kafka_producerDB(ctx context.Context) {
  69. //var id int
  70. //var ssql string
  71. //var mydbe *sql.DB
  72. //ssql = ""
  73. fmt.Println("kafka 启动")
  74. errortimes := 0
  75. config := sarama.NewConfig()
  76. config.Producer.RequiredAcks = sarama.WaitForAll
  77. config.Producer.Partitioner = sarama.NewRandomPartitioner
  78. config.Producer.Return.Successes = true
  79. config.Producer.Return.Errors = true
  80. if saslEnable {
  81. config.Net.SASL.Enable = true
  82. config.Net.SASL.User = username
  83. config.Net.SASL.Password = saslpassword
  84. }
  85. if tlsEnable {
  86. //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  87. tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
  88. if err != nil {
  89. log.Fatal(err)
  90. }
  91. config.Net.TLS.Enable = true
  92. config.Net.TLS.Config = tlsConfig
  93. }
  94. config.Version = sarama.V0_11_0_2
  95. var Address = []string{kafka_host + ":" + strconv.Itoa(kafka_port)}
  96. openKafka:
  97. client, err := sarama.NewClient(Address, config)
  98. if err != nil {
  99. logger.Errorf("unable to create kafka client: %q \r\n", err)
  100. time.Sleep(1 * time.Second)
  101. goto openKafka
  102. }
  103. producer, err := sarama.NewSyncProducerFromClient(client)
  104. if err != nil {
  105. logger.Errorf("unable to create kafka producer: %q \r\n", err)
  106. time.Sleep(1 * time.Second)
  107. if client != nil {
  108. err = client.Close()
  109. }
  110. goto openKafka
  111. }
  112. /***
  113. openDb:
  114. if mydbe != nil {
  115. mydbe.Close()
  116. mydbe = nil
  117. }
  118. if deletestmt != nil {
  119. deletestmt.Close()
  120. deletestmt = nil
  121. }
  122. mydbe, err = GetmyDbsConnect(user , password, host, port, "sqllog")
  123. if err != nil {
  124. logger.Errorf("Error opening DBS :%s \n", err)
  125. time.Sleep(1 * time.Second)
  126. goto openDb
  127. }
  128. ***/
  129. for {
  130. select {
  131. case <-ctx.Done():
  132. fmt.Println("kafka 退出")
  133. return
  134. default:
  135. //参数绑定,可以避免sql注入
  136. if errortimes>10 {
  137. errortimes = 0
  138. if producer != nil {
  139. err = producer.Close()
  140. }
  141. if client != nil {
  142. err = client.Close()
  143. }
  144. goto openKafka
  145. }
  146. msgs := make([]*sarama.ProducerMessage, 0)
  147. int_, err := strconv.Atoi(onetimerows)
  148. if err==nil{
  149. }else {
  150. fmt.Println("转换onetimerows错误 \r\n", err.Error())
  151. }
  152. PopPeeks,err,full,null := DqueueIndex.PopPeeksV2(int_)
  153. if err != nil{
  154. fmt.Println("PopPeeks,err ", err)
  155. }
  156. //fmt.Println("PopPeeks len ", len(PopPeeks),PopPeeks)
  157. if len(PopPeeks)==0 {
  158. time.Sleep(time.Second)
  159. continue
  160. }
  161. if len(PopPeeks) < int_ && time.Since(insertlog_lasttime) < 10*time.Second{
  162. time.Sleep(time.Second)
  163. continue
  164. }
  165. //fmt.Println("PopPeeks len ", len(PopPeeks),string(PopPeeks[0]),string(PopPeeks[len(PopPeeks)-1]))
  166. for i:= 0;i< len(PopPeeks); i++ {
  167. msg := &sarama.ProducerMessage{Topic: kafka_topic, Key: sarama.StringEncoder(""),
  168. Value: sarama.StringEncoder(string(PopPeeks[i]))}
  169. msgs = append(msgs, msg)
  170. }
  171. if len(msgs)>0 {
  172. err = producer.SendMessages(msgs)
  173. //fmt.Println("发送成功 ,发送数 ", len(msgs))
  174. if err != nil {
  175. logger.Errorf("send message err=%s \r\n", err.Error())
  176. errortimes ++
  177. } else{
  178. if !full && null{
  179. for i := 0;i<len(msgs)+1;i++{
  180. _, _ = DqueueIndex.PopOneIndex()
  181. if showlog >0 {
  182. //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1",string(pop),err)
  183. }
  184. //
  185. }
  186. }else {
  187. for i := 0;i<len(msgs);i++{
  188. _, _ = DqueueIndex.PopOneIndex()
  189. if showlog >0 {
  190. //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()2",string(pop),err)
  191. }
  192. }
  193. logger.Infof("发送成功 , 发送数 %d \r\n", len(msgs))
  194. }
  195. insertlog_lasttime = time.Now()
  196. }
  197. }else{
  198. logger.Errorf("写指针为空 \r\n", err)
  199. }
  200. //}else {
  201. // logger.Errorf("打开文件为空 \r\n", err)
  202. //}
  203. //if SqlBitcask!= nil {
  204. // currentWrite_, err := SqlBitcask.Get([]byte("currentWrite"))
  205. // //fmt.Printf("发送成功currentWrite_ , id = %d, 发送数 %d \r\n",currentWrite_)
  206. // if err == nil{
  207. // currentRead := -1
  208. // currentRead_, err := SqlBitcask.Get([]byte("currentRead"))
  209. // //fmt.Printf("currentRead_ , id = %d, 发送数 %d \r\n",currentRead)
  210. // if err == nil{
  211. // currentRead = BytesToInt(currentRead_)
  212. // }else {
  213. // logger.Errorf("读currentRead错误 \r\n", err.Error())
  214. // }
  215. // currentWrite := BytesToInt(currentWrite_)
  216. // sendcount := currentWrite-currentRead
  217. // int_, err := strconv.Atoi(onetimerows)
  218. // if err==nil{
  219. // if (sendcount>int_){
  220. // sendcount = int_
  221. // }
  222. // }else {
  223. // logger.Errorf("转换onetimerows错误 \r\n", err.Error())
  224. // }
  225. // oldread := currentRead+1
  226. // msgs := make([]*sarama.ProducerMessage, 0)
  227. // for i := 0; i < sendcount; i++ {
  228. // currentRead++
  229. // sql_, err := SqlBitcask.Get(IntToBytes(currentRead))
  230. // if err==nil{
  231. // msg := &sarama.ProducerMessage{Topic: kafka_topic, Key: sarama.StringEncoder(""), Value: sarama.StringEncoder(string(sql_[:]))}
  232. // msgs = append(msgs, msg)
  233. //
  234. // }else {
  235. // logger.Errorf("读%d 数据错误 \r\n", currentRead, err.Error())
  236. // }
  237. // }
  238. // if len(msgs)>0 {
  239. // //err = producer.SendMessages(msgs)
  240. //
  241. // if err != nil {
  242. // logger.Errorf("send message err=%s \r\n", err.Error())
  243. // errortimes ++
  244. // } else {
  245. // if showlog>0 {
  246. // fmt.Printf("发送成功 , id = %d, 发送数 %d currentRead %d \r\n", id, len(msgs),currentRead)
  247. // }
  248. // logger.Infof("发送成功 , id = %d, 发送数 %d readfrom %d currentRead %d \r\n", id, len(msgs),oldread,currentRead)
  249. //
  250. // err = SqlBitcask.Put([]byte("currentRead"), IntToBytes(currentRead))
  251. // if err !=nil {
  252. // logger.Errorf("写currentRead %d 错误 \r\n", currentRead, err.Error())
  253. // }
  254. // for i := 0; i < sendcount; i++ {
  255. // err = SqlBitcask.Delete(IntToBytes(oldread+i))
  256. // if err!=nil{
  257. // logger.Errorf("删除read错误 %d \r\n", oldread+i, err.Error())
  258. // }
  259. // }
  260. // }
  261. // }
  262. // }else{
  263. // logger.Errorf("写指针为空 \r\n", err)
  264. // }
  265. //}else {
  266. // logger.Errorf("打开文件为空 \r\n", err)
  267. //}
  268. //time.Sleep(1 * time.Second)
  269. }
  270. }
  271. }
  272. var (
  273. wg sync.WaitGroup
  274. )
  275. func Kafka_Consumer() {
  276. //创建消费者
  277. Breaki = 0
  278. for ;; {
  279. func() {
  280. CurrentPath, _ = GetCurrentPath()
  281. inicfg1, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
  282. if err != nil {
  283. fmt.Println("读取配置文件失败[config.ini]")
  284. return
  285. }
  286. kafka_host = inicfg1.MustValue("kafka", "kafka_host", "127.0.0.1")
  287. kafka_port = inicfg1.MustInt("kafka", "kafka_port", 9092)
  288. kafka_topic = inicfg1.MustValue("kafka", "kafka_topic", "kafka_go_test")
  289. saslEnable = inicfg1.MustBool("kafka", "saslEnable", false)
  290. username = inicfg1.MustValue("kafka", "username", "root")
  291. saslpassword = inicfg1.MustValue("kafka", "saslpassword", "root")
  292. tlsEnable = inicfg1.MustBool("kafka", "tlsEnable", false)
  293. clientcert = inicfg1.MustValue("kafka", "clientcert", "")
  294. clientkey = inicfg1.MustValue("kafka", "clientkey", "")
  295. cacert = inicfg1.MustValue("kafka", "cacert", "")
  296. ServiceName = inicfg1.MustValue("Service", "ServiceName", "KPTDataService")
  297. ServiceDisplayName = inicfg1.MustValue("Service", "ServiceDisplayName", "KPTDataService")
  298. ServiceDescription = inicfg1.MustValue("Service", "ServiceDescription", "科湃腾数据同步")
  299. Offset = inicfg1.MustInt64("kafka", "Offset", sarama.OffsetOldest)
  300. //fmt.Println(cfg)
  301. Readini()
  302. config := sarama.NewConfig()
  303. if saslEnable {
  304. config.Net.SASL.Enable = true
  305. config.Net.SASL.User = username
  306. config.Net.SASL.Password = saslpassword
  307. }
  308. if tlsEnable {
  309. //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  310. tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
  311. if err != nil {
  312. log.Fatal(err)
  313. }
  314. config.Net.TLS.Enable = true
  315. config.Net.TLS.Config = tlsConfig
  316. }
  317. linkstr := kafka_host + ":" + strconv.Itoa(kafka_port)
  318. consumer, err := sarama.NewConsumer(strings.Split(linkstr, ","), config)
  319. defer consumer.Close()
  320. if err != nil {
  321. logger.Errorf(fmt.Sprintf("Failed to start consumer: %s", err))
  322. return
  323. }
  324. //设置分区
  325. partitionList, err := consumer.Partitions(kafka_topic)
  326. if err != nil {
  327. logger.Errorf("Failed to get the list of partitions: ", err)
  328. return
  329. }
  330. //循环分区
  331. for partition := range partitionList {
  332. func() {
  333. pc, err := consumer.ConsumePartition(kafka_topic, int32(partition), Offset)
  334. if err != nil {
  335. logger.Errorf("Failed to start consumer for partition %d: %s\n", partition, err)
  336. return
  337. }
  338. wg.Add(1)
  339. go func(pc sarama.PartitionConsumer) {
  340. testdb , er := GetmyDbsConnect(user, password, host, port, tableDB)
  341. defer testdb.Close()
  342. if er != nil {
  343. logger.Errorf("dbs can not connect %s\n",er)
  344. }else{
  345. for msg := range pc.Messages() {
  346. sqlstrmsg := string(msg.Value)
  347. //fmt.Println(string(sqlstrmsg))
  348. tx, err := testdb.Begin()
  349. if err != nil {
  350. fmt.Println("%q", err)
  351. }else {
  352. for _, sqlline := range strings.Split(sqlstrmsg, "\n") {
  353. err = ExecT(tx, sqlline)
  354. if err == nil {
  355. //logger.Infoln(fmt.Sprintf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), sqlline))
  356. } else {
  357. logger.Errorln(err.Error() + "---" + sqlline)
  358. }
  359. }
  360. }
  361. tx.Commit()
  362. logger.Infof(fmt.Sprintf("Partition:%d, Offset:%d, Key:%s \n\t", msg.Partition, msg.Offset, string(msg.Key)))
  363. Offset = msg.Offset + 1
  364. inicfg1.SetValue("kafka", "Offset", strconv.FormatInt(Offset, 10))
  365. goconfig.SaveConfigFile(inicfg1, CurrentPath+"config.ini")
  366. if Breaki >0 {
  367. break
  368. }
  369. }
  370. }
  371. defer pc.AsyncClose()
  372. defer wg.Done()
  373. }(pc)
  374. }()
  375. }
  376. wg.Wait()
  377. time.Sleep(1 * time.Second)
  378. logger.Errorln("错误重启")
  379. consumer.Close()
  380. }()
  381. }
  382. }
  383. func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
  384. // load client cert
  385. clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
  386. if err != nil {
  387. return nil, err
  388. }
  389. // load ca cert pool
  390. cacert, err := ioutil.ReadFile(cacertfile)
  391. if err != nil {
  392. return nil, err
  393. }
  394. cacertpool := x509.NewCertPool()
  395. cacertpool.AppendCertsFromPEM(cacert)
  396. // generate tlcconfig
  397. tlsConfig := tls.Config{}
  398. tlsConfig.RootCAs = cacertpool
  399. tlsConfig.Certificates = []tls.Certificate{clientcert}
  400. tlsConfig.BuildNameToCertificate()
  401. // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
  402. return &tlsConfig, err
  403. }