package kpt import ( "context" "crypto/tls" "crypto/x509" "fmt" "github.com/Shopify/sarama" "github.com/Unknwon/goconfig" "github.com/siddontang/go-log/log" "io/ioutil" "os" "strconv" "strings" "sync" "time" ) var ( kafka_host string kafka_port int kafka_topic string saslEnable bool username string saslpassword string tlsEnable bool clientcert string clientkey string cacert string Offset int64 Breaki int onetimerows string onetimerowsMqtt string ) func Kafka_producer(msgs string) bool{ config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true config.Producer.Return.Errors = true config.Version = sarama.V0_11_0_2 var Address = []string{"127.0.0.1:9092"} client, err := sarama.NewClient(Address, config) if err != nil { log.Fatalf("unable to create kafka client: %q", err) return false } producer, err := sarama.NewSyncProducerFromClient(client) if err != nil { fmt.Printf("producer_test create producer error :%s\n", err.Error()) return false } defer producer.Close() partition, offset,err := producer.SendMessage(&sarama.ProducerMessage{Topic: "kafka_go_test", Key: sarama.StringEncoder("go_test"), Value: sarama.StringEncoder(msgs)}) if err!=nil { log.Printf("send message(%s) err=%s \n", msgs, err) return false }else { fmt.Fprintf(os.Stdout, msgs + "发送成功,partition=%d, offset=%d \n", partition, offset) } return true } type Log struct { id int64 sql string } type LogCou struct { cou int64 } func Kafka_producerDB(ctx context.Context) { //var id int //var ssql string //var mydbe *sql.DB //ssql = "" fmt.Println("kafka 启动") errortimes := 0 config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true config.Producer.Return.Errors = true if saslEnable { config.Net.SASL.Enable = true config.Net.SASL.User = username config.Net.SASL.Password = saslpassword } if tlsEnable { //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert) if err != nil { log.Fatal(err) } config.Net.TLS.Enable = true config.Net.TLS.Config = tlsConfig } config.Version = sarama.V0_11_0_2 var Address = []string{kafka_host + ":" + strconv.Itoa(kafka_port)} openKafka: client, err := sarama.NewClient(Address, config) if err != nil { logger.Errorf("unable to create kafka client: %q \r\n", err) time.Sleep(1 * time.Second) goto openKafka } producer, err := sarama.NewSyncProducerFromClient(client) if err != nil { logger.Errorf("unable to create kafka producer: %q \r\n", err) time.Sleep(1 * time.Second) if client != nil { err = client.Close() } goto openKafka } /*** openDb: if mydbe != nil { mydbe.Close() mydbe = nil } if deletestmt != nil { deletestmt.Close() deletestmt = nil } mydbe, err = GetmyDbsConnect(user , password, host, port, "sqllog") if err != nil { logger.Errorf("Error opening DBS :%s \n", err) time.Sleep(1 * time.Second) goto openDb } ***/ for { select { case <-ctx.Done(): fmt.Println("kafka 退出") return default: //参数绑定,可以避免sql注入 if errortimes>10 { errortimes = 0 if producer != nil { err = producer.Close() } if client != nil { err = client.Close() } goto openKafka } msgs := make([]*sarama.ProducerMessage, 0) int_, err := strconv.Atoi(onetimerows) if err==nil{ }else { fmt.Println("转换onetimerows错误 \r\n", err.Error()) } PopPeeks,err,full,null := DqueueIndex.PopPeeksV2(int_) if err != nil{ fmt.Println("PopPeeks,err ", err) } //fmt.Println("PopPeeks len ", len(PopPeeks),PopPeeks) if len(PopPeeks)==0 { time.Sleep(time.Second) continue } if len(PopPeeks) < int_ && time.Since(insertlog_lasttime) < 10*time.Second{ time.Sleep(time.Second) continue } //fmt.Println("PopPeeks len ", len(PopPeeks),string(PopPeeks[0]),string(PopPeeks[len(PopPeeks)-1])) for i:= 0;i< len(PopPeeks); i++ { msg := &sarama.ProducerMessage{Topic: kafka_topic, Key: sarama.StringEncoder(""), Value: sarama.StringEncoder(string(PopPeeks[i]))} msgs = append(msgs, msg) } if len(msgs)>0 { err = producer.SendMessages(msgs) //fmt.Println("发送成功 ,发送数 ", len(msgs)) if err != nil { logger.Errorf("send message err=%s \r\n", err.Error()) errortimes ++ } else{ if !full && null{ for i := 0;i0 { //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1",string(pop),err) } // } }else { for i := 0;i0 { //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()2",string(pop),err) } } logger.Infof("发送成功 , 发送数 %d \r\n", len(msgs)) } insertlog_lasttime = time.Now() } }else{ logger.Errorf("写指针为空 \r\n", err) } //}else { // logger.Errorf("打开文件为空 \r\n", err) //} //if SqlBitcask!= nil { // currentWrite_, err := SqlBitcask.Get([]byte("currentWrite")) // //fmt.Printf("发送成功currentWrite_ , id = %d, 发送数 %d \r\n",currentWrite_) // if err == nil{ // currentRead := -1 // currentRead_, err := SqlBitcask.Get([]byte("currentRead")) // //fmt.Printf("currentRead_ , id = %d, 发送数 %d \r\n",currentRead) // if err == nil{ // currentRead = BytesToInt(currentRead_) // }else { // logger.Errorf("读currentRead错误 \r\n", err.Error()) // } // currentWrite := BytesToInt(currentWrite_) // sendcount := currentWrite-currentRead // int_, err := strconv.Atoi(onetimerows) // if err==nil{ // if (sendcount>int_){ // sendcount = int_ // } // }else { // logger.Errorf("转换onetimerows错误 \r\n", err.Error()) // } // oldread := currentRead+1 // msgs := make([]*sarama.ProducerMessage, 0) // for i := 0; i < sendcount; i++ { // currentRead++ // sql_, err := SqlBitcask.Get(IntToBytes(currentRead)) // if err==nil{ // msg := &sarama.ProducerMessage{Topic: kafka_topic, Key: sarama.StringEncoder(""), Value: sarama.StringEncoder(string(sql_[:]))} // msgs = append(msgs, msg) // // }else { // logger.Errorf("读%d 数据错误 \r\n", currentRead, err.Error()) // } // } // if len(msgs)>0 { // //err = producer.SendMessages(msgs) // // if err != nil { // logger.Errorf("send message err=%s \r\n", err.Error()) // errortimes ++ // } else { // if showlog>0 { // fmt.Printf("发送成功 , id = %d, 发送数 %d currentRead %d \r\n", id, len(msgs),currentRead) // } // logger.Infof("发送成功 , id = %d, 发送数 %d readfrom %d currentRead %d \r\n", id, len(msgs),oldread,currentRead) // // err = SqlBitcask.Put([]byte("currentRead"), IntToBytes(currentRead)) // if err !=nil { // logger.Errorf("写currentRead %d 错误 \r\n", currentRead, err.Error()) // } // for i := 0; i < sendcount; i++ { // err = SqlBitcask.Delete(IntToBytes(oldread+i)) // if err!=nil{ // logger.Errorf("删除read错误 %d \r\n", oldread+i, err.Error()) // } // } // } // } // }else{ // logger.Errorf("写指针为空 \r\n", err) // } //}else { // logger.Errorf("打开文件为空 \r\n", err) //} //time.Sleep(1 * time.Second) } } } var ( wg sync.WaitGroup ) func Kafka_Consumer() { //创建消费者 Breaki = 0 for ;; { func() { CurrentPath, _ = GetCurrentPath() inicfg1, err := goconfig.LoadConfigFile(CurrentPath + "config.ini") if err != nil { fmt.Println("读取配置文件失败[config.ini]") return } kafka_host = inicfg1.MustValue("kafka", "kafka_host", "127.0.0.1") kafka_port = inicfg1.MustInt("kafka", "kafka_port", 9092) kafka_topic = inicfg1.MustValue("kafka", "kafka_topic", "kafka_go_test") saslEnable = inicfg1.MustBool("kafka", "saslEnable", false) username = inicfg1.MustValue("kafka", "username", "root") saslpassword = inicfg1.MustValue("kafka", "saslpassword", "root") tlsEnable = inicfg1.MustBool("kafka", "tlsEnable", false) clientcert = inicfg1.MustValue("kafka", "clientcert", "") clientkey = inicfg1.MustValue("kafka", "clientkey", "") cacert = inicfg1.MustValue("kafka", "cacert", "") ServiceName = inicfg1.MustValue("Service", "ServiceName", "KPTDataService") ServiceDisplayName = inicfg1.MustValue("Service", "ServiceDisplayName", "KPTDataService") ServiceDescription = inicfg1.MustValue("Service", "ServiceDescription", "科湃腾数据同步") Offset = inicfg1.MustInt64("kafka", "Offset", sarama.OffsetOldest) //fmt.Println(cfg) Readini() config := sarama.NewConfig() if saslEnable { config.Net.SASL.Enable = true config.Net.SASL.User = username config.Net.SASL.Password = saslpassword } if tlsEnable { //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert) if err != nil { log.Fatal(err) } config.Net.TLS.Enable = true config.Net.TLS.Config = tlsConfig } linkstr := kafka_host + ":" + strconv.Itoa(kafka_port) consumer, err := sarama.NewConsumer(strings.Split(linkstr, ","), config) defer consumer.Close() if err != nil { logger.Errorf(fmt.Sprintf("Failed to start consumer: %s", err)) return } //设置分区 partitionList, err := consumer.Partitions(kafka_topic) if err != nil { logger.Errorf("Failed to get the list of partitions: ", err) return } //循环分区 for partition := range partitionList { func() { pc, err := consumer.ConsumePartition(kafka_topic, int32(partition), Offset) if err != nil { logger.Errorf("Failed to start consumer for partition %d: %s\n", partition, err) return } wg.Add(1) go func(pc sarama.PartitionConsumer) { testdb , er := GetmyDbsConnect(user, password, host, port, tableDB) defer testdb.Close() if er != nil { logger.Errorf("dbs can not connect %s\n",er) }else{ for msg := range pc.Messages() { sqlstrmsg := string(msg.Value) //fmt.Println(string(sqlstrmsg)) tx, err := testdb.Begin() if err != nil { fmt.Println("%q", err) }else { for _, sqlline := range strings.Split(sqlstrmsg, "\n") { err = ExecT(tx, sqlline) if err == nil { //logger.Infoln(fmt.Sprintf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), sqlline)) } else { logger.Errorln(err.Error() + "---" + sqlline) } } } tx.Commit() logger.Infof(fmt.Sprintf("Partition:%d, Offset:%d, Key:%s \n\t", msg.Partition, msg.Offset, string(msg.Key))) Offset = msg.Offset + 1 inicfg1.SetValue("kafka", "Offset", strconv.FormatInt(Offset, 10)) goconfig.SaveConfigFile(inicfg1, CurrentPath+"config.ini") if Breaki >0 { break } } } defer pc.AsyncClose() defer wg.Done() }(pc) }() } wg.Wait() time.Sleep(1 * time.Second) logger.Errorln("错误重启") consumer.Close() }() } } func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) { // load client cert clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile) if err != nil { return nil, err } // load ca cert pool cacert, err := ioutil.ReadFile(cacertfile) if err != nil { return nil, err } cacertpool := x509.NewCertPool() cacertpool.AppendCertsFromPEM(cacert) // generate tlcconfig tlsConfig := tls.Config{} tlsConfig.RootCAs = cacertpool tlsConfig.Certificates = []tls.Certificate{clientcert} tlsConfig.BuildNameToCertificate() // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert: return &tlsConfig, err }