123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434 |
- package kpt
- import (
- "context"
- "crypto/tls"
- "crypto/x509"
- "fmt"
- "github.com/Shopify/sarama"
- "github.com/Unknwon/goconfig"
- "github.com/siddontang/go-log/log"
- "io/ioutil"
- "os"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- var (
- kafka_host string
- kafka_port int
- kafka_topic string
- saslEnable bool
- username string
- saslpassword string
- tlsEnable bool
- clientcert string
- clientkey string
- cacert string
- Offset int64
- Breaki int
- onetimerows string
- onetimerowsMqtt string
- )
- func Kafka_producer(msgs string) bool{
- config := sarama.NewConfig()
- config.Producer.RequiredAcks = sarama.WaitForAll
- config.Producer.Partitioner = sarama.NewRandomPartitioner
- config.Producer.Return.Successes = true
- config.Producer.Return.Errors = true
- config.Version = sarama.V0_11_0_2
- var Address = []string{"127.0.0.1:9092"}
- client, err := sarama.NewClient(Address, config)
- if err != nil {
- log.Fatalf("unable to create kafka client: %q", err)
- return false
- }
- producer, err := sarama.NewSyncProducerFromClient(client)
- if err != nil {
- fmt.Printf("producer_test create producer error :%s\n", err.Error())
- return false
- }
- defer producer.Close()
- partition, offset,err := producer.SendMessage(&sarama.ProducerMessage{Topic: "kafka_go_test", Key: sarama.StringEncoder("go_test"), Value: sarama.StringEncoder(msgs)})
- if err!=nil {
- log.Printf("send message(%s) err=%s \n", msgs, err)
- return false
- }else {
- fmt.Fprintf(os.Stdout, msgs + "发送成功,partition=%d, offset=%d \n", partition, offset)
- }
- return true
- }
- type Log struct {
- id int64
- sql string
- }
- type LogCou struct {
- cou int64
- }
- func Kafka_producerDB(ctx context.Context) {
- //var id int
- //var ssql string
- //var mydbe *sql.DB
- //ssql = ""
- fmt.Println("kafka 启动")
- errortimes := 0
- config := sarama.NewConfig()
- config.Producer.RequiredAcks = sarama.WaitForAll
- config.Producer.Partitioner = sarama.NewRandomPartitioner
- config.Producer.Return.Successes = true
- config.Producer.Return.Errors = true
- if saslEnable {
- config.Net.SASL.Enable = true
- config.Net.SASL.User = username
- config.Net.SASL.Password = saslpassword
- }
- if tlsEnable {
- //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
- tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
- if err != nil {
- log.Fatal(err)
- }
- config.Net.TLS.Enable = true
- config.Net.TLS.Config = tlsConfig
- }
- config.Version = sarama.V0_11_0_2
- var Address = []string{kafka_host + ":" + strconv.Itoa(kafka_port)}
- openKafka:
- client, err := sarama.NewClient(Address, config)
- if err != nil {
- logger.Errorf("unable to create kafka client: %q \r\n", err)
- time.Sleep(1 * time.Second)
- goto openKafka
- }
- producer, err := sarama.NewSyncProducerFromClient(client)
- if err != nil {
- logger.Errorf("unable to create kafka producer: %q \r\n", err)
- time.Sleep(1 * time.Second)
- if client != nil {
- err = client.Close()
- }
- goto openKafka
- }
- /***
- openDb:
- if mydbe != nil {
- mydbe.Close()
- mydbe = nil
- }
- if deletestmt != nil {
- deletestmt.Close()
- deletestmt = nil
- }
- mydbe, err = GetmyDbsConnect(user , password, host, port, "sqllog")
- if err != nil {
- logger.Errorf("Error opening DBS :%s \n", err)
- time.Sleep(1 * time.Second)
- goto openDb
- }
- ***/
- for {
- select {
- case <-ctx.Done():
- fmt.Println("kafka 退出")
- return
- default:
- //参数绑定,可以避免sql注入
- if errortimes>10 {
- errortimes = 0
- if producer != nil {
- err = producer.Close()
- }
- if client != nil {
- err = client.Close()
- }
- goto openKafka
- }
- msgs := make([]*sarama.ProducerMessage, 0)
- int_, err := strconv.Atoi(onetimerows)
- if err==nil{
- }else {
- fmt.Println("转换onetimerows错误 \r\n", err.Error())
- }
- PopPeeks,err,full,null := DqueueIndex.PopPeeksV2(int_)
- if err != nil{
- fmt.Println("PopPeeks,err ", err)
- }
- //fmt.Println("PopPeeks len ", len(PopPeeks),PopPeeks)
- if len(PopPeeks)==0 {
- time.Sleep(time.Second)
- continue
- }
- if len(PopPeeks) < int_ && time.Since(insertlog_lasttime) < 10*time.Second{
- time.Sleep(time.Second)
- continue
- }
- //fmt.Println("PopPeeks len ", len(PopPeeks),string(PopPeeks[0]),string(PopPeeks[len(PopPeeks)-1]))
- for i:= 0;i< len(PopPeeks); i++ {
- msg := &sarama.ProducerMessage{Topic: kafka_topic, Key: sarama.StringEncoder(""),
- Value: sarama.StringEncoder(string(PopPeeks[i]))}
- msgs = append(msgs, msg)
- }
- if len(msgs)>0 {
- err = producer.SendMessages(msgs)
- //fmt.Println("发送成功 ,发送数 ", len(msgs))
- if err != nil {
- logger.Errorf("send message err=%s \r\n", err.Error())
- errortimes ++
- } else{
- if !full && null{
- for i := 0;i<len(msgs)+1;i++{
- _, _ = DqueueIndex.PopOneIndex()
- if showlog >0 {
- //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1",string(pop),err)
- }
- //
- }
- }else {
- for i := 0;i<len(msgs);i++{
- _, _ = DqueueIndex.PopOneIndex()
- if showlog >0 {
- //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()2",string(pop),err)
- }
- }
- logger.Infof("发送成功 , 发送数 %d \r\n", len(msgs))
- }
- insertlog_lasttime = time.Now()
- }
- }else{
- logger.Errorf("写指针为空 \r\n", err)
- }
- //}else {
- // logger.Errorf("打开文件为空 \r\n", err)
- //}
- //if SqlBitcask!= nil {
- // currentWrite_, err := SqlBitcask.Get([]byte("currentWrite"))
- // //fmt.Printf("发送成功currentWrite_ , id = %d, 发送数 %d \r\n",currentWrite_)
- // if err == nil{
- // currentRead := -1
- // currentRead_, err := SqlBitcask.Get([]byte("currentRead"))
- // //fmt.Printf("currentRead_ , id = %d, 发送数 %d \r\n",currentRead)
- // if err == nil{
- // currentRead = BytesToInt(currentRead_)
- // }else {
- // logger.Errorf("读currentRead错误 \r\n", err.Error())
- // }
- // currentWrite := BytesToInt(currentWrite_)
- // sendcount := currentWrite-currentRead
- // int_, err := strconv.Atoi(onetimerows)
- // if err==nil{
- // if (sendcount>int_){
- // sendcount = int_
- // }
- // }else {
- // logger.Errorf("转换onetimerows错误 \r\n", err.Error())
- // }
- // oldread := currentRead+1
- // msgs := make([]*sarama.ProducerMessage, 0)
- // for i := 0; i < sendcount; i++ {
- // currentRead++
- // sql_, err := SqlBitcask.Get(IntToBytes(currentRead))
- // if err==nil{
- // msg := &sarama.ProducerMessage{Topic: kafka_topic, Key: sarama.StringEncoder(""), Value: sarama.StringEncoder(string(sql_[:]))}
- // msgs = append(msgs, msg)
- //
- // }else {
- // logger.Errorf("读%d 数据错误 \r\n", currentRead, err.Error())
- // }
- // }
- // if len(msgs)>0 {
- // //err = producer.SendMessages(msgs)
- //
- // if err != nil {
- // logger.Errorf("send message err=%s \r\n", err.Error())
- // errortimes ++
- // } else {
- // if showlog>0 {
- // fmt.Printf("发送成功 , id = %d, 发送数 %d currentRead %d \r\n", id, len(msgs),currentRead)
- // }
- // logger.Infof("发送成功 , id = %d, 发送数 %d readfrom %d currentRead %d \r\n", id, len(msgs),oldread,currentRead)
- //
- // err = SqlBitcask.Put([]byte("currentRead"), IntToBytes(currentRead))
- // if err !=nil {
- // logger.Errorf("写currentRead %d 错误 \r\n", currentRead, err.Error())
- // }
- // for i := 0; i < sendcount; i++ {
- // err = SqlBitcask.Delete(IntToBytes(oldread+i))
- // if err!=nil{
- // logger.Errorf("删除read错误 %d \r\n", oldread+i, err.Error())
- // }
- // }
- // }
- // }
- // }else{
- // logger.Errorf("写指针为空 \r\n", err)
- // }
- //}else {
- // logger.Errorf("打开文件为空 \r\n", err)
- //}
- //time.Sleep(1 * time.Second)
- }
- }
- }
- var (
- wg sync.WaitGroup
- )
- func Kafka_Consumer() {
- //创建消费者
- Breaki = 0
- for ;; {
- func() {
- CurrentPath, _ = GetCurrentPath()
- inicfg1, err := goconfig.LoadConfigFile(CurrentPath + "config.ini")
- if err != nil {
- fmt.Println("读取配置文件失败[config.ini]")
- return
- }
- kafka_host = inicfg1.MustValue("kafka", "kafka_host", "127.0.0.1")
- kafka_port = inicfg1.MustInt("kafka", "kafka_port", 9092)
- kafka_topic = inicfg1.MustValue("kafka", "kafka_topic", "kafka_go_test")
- saslEnable = inicfg1.MustBool("kafka", "saslEnable", false)
- username = inicfg1.MustValue("kafka", "username", "root")
- saslpassword = inicfg1.MustValue("kafka", "saslpassword", "root")
- tlsEnable = inicfg1.MustBool("kafka", "tlsEnable", false)
- clientcert = inicfg1.MustValue("kafka", "clientcert", "")
- clientkey = inicfg1.MustValue("kafka", "clientkey", "")
- cacert = inicfg1.MustValue("kafka", "cacert", "")
- ServiceName = inicfg1.MustValue("Service", "ServiceName", "KPTDataService")
- ServiceDisplayName = inicfg1.MustValue("Service", "ServiceDisplayName", "KPTDataService")
- ServiceDescription = inicfg1.MustValue("Service", "ServiceDescription", "科湃腾数据同步")
- Offset = inicfg1.MustInt64("kafka", "Offset", sarama.OffsetOldest)
- //fmt.Println(cfg)
- Readini()
- config := sarama.NewConfig()
- if saslEnable {
- config.Net.SASL.Enable = true
- config.Net.SASL.User = username
- config.Net.SASL.Password = saslpassword
- }
- if tlsEnable {
- //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
- tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
- if err != nil {
- log.Fatal(err)
- }
- config.Net.TLS.Enable = true
- config.Net.TLS.Config = tlsConfig
- }
- linkstr := kafka_host + ":" + strconv.Itoa(kafka_port)
- consumer, err := sarama.NewConsumer(strings.Split(linkstr, ","), config)
- defer consumer.Close()
- if err != nil {
- logger.Errorf(fmt.Sprintf("Failed to start consumer: %s", err))
- return
- }
- //设置分区
- partitionList, err := consumer.Partitions(kafka_topic)
- if err != nil {
- logger.Errorf("Failed to get the list of partitions: ", err)
- return
- }
- //循环分区
- for partition := range partitionList {
- func() {
- pc, err := consumer.ConsumePartition(kafka_topic, int32(partition), Offset)
- if err != nil {
- logger.Errorf("Failed to start consumer for partition %d: %s\n", partition, err)
- return
- }
- wg.Add(1)
- go func(pc sarama.PartitionConsumer) {
- testdb , er := GetmyDbsConnect(user, password, host, port, tableDB)
- defer testdb.Close()
- if er != nil {
- logger.Errorf("dbs can not connect %s\n",er)
- }else{
- for msg := range pc.Messages() {
- sqlstrmsg := string(msg.Value)
- //fmt.Println(string(sqlstrmsg))
- tx, err := testdb.Begin()
- if err != nil {
- fmt.Println("%q", err)
- }else {
- for _, sqlline := range strings.Split(sqlstrmsg, "\n") {
- err = ExecT(tx, sqlline)
- if err == nil {
- //logger.Infoln(fmt.Sprintf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), sqlline))
- } else {
- logger.Errorln(err.Error() + "---" + sqlline)
- }
- }
- }
- tx.Commit()
- logger.Infof(fmt.Sprintf("Partition:%d, Offset:%d, Key:%s \n\t", msg.Partition, msg.Offset, string(msg.Key)))
- Offset = msg.Offset + 1
- inicfg1.SetValue("kafka", "Offset", strconv.FormatInt(Offset, 10))
- goconfig.SaveConfigFile(inicfg1, CurrentPath+"config.ini")
- if Breaki >0 {
- break
- }
- }
- }
- defer pc.AsyncClose()
- defer wg.Done()
- }(pc)
- }()
- }
- wg.Wait()
- time.Sleep(1 * time.Second)
- logger.Errorln("错误重启")
- consumer.Close()
- }()
- }
- }
- func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
- // load client cert
- clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
- if err != nil {
- return nil, err
- }
- // load ca cert pool
- cacert, err := ioutil.ReadFile(cacertfile)
- if err != nil {
- return nil, err
- }
- cacertpool := x509.NewCertPool()
- cacertpool.AppendCertsFromPEM(cacert)
- // generate tlcconfig
- tlsConfig := tls.Config{}
- tlsConfig.RootCAs = cacertpool
- tlsConfig.Certificates = []tls.Certificate{clientcert}
- tlsConfig.BuildNameToCertificate()
- // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
- return &tlsConfig, err
- }
|