package api import ( "bytes" "crypto/hmac" "crypto/sha1" "crypto/tls" "crypto/x509" "encoding/json" "fmt" "io/ioutil" "os" "strconv" "time" "tmr-watch/conf/setting" "tmr-watch/http/handle/restful" "tmr-watch/pkg/logging" "github.com/astaxie/beego/logs" MQTT "github.com/eclipse/paho.mqtt.golang" "github.com/robfig/cron" ) func InitMqttClient() { if setting.YynserverSetting.FarmId != "" { c, pubTopic := MqttClient() deviceHeartbeat(c, pubTopic) mqttCron := cron.New() mqttCron.AddFunc("10 06 * * *", func() { feedtempletPush(c, pubTopic) stirPush(c, pubTopic) dustingPush(c, pubTopic) equipmentAccuracyPush(c, pubTopic) finishedWeightPush(c, pubTopic) CompletedTrainNumberPush(c, pubTopic) }) mqttCron.Start() } } func MqttClient() (MQTT.Client, string) { // set the device info, include product key, device name, and device secret // var productKey string = "a1NmXfrjL8M" // var deviceName string = "4776_p_breed" // var deviceSecret string = "c2591b89adff22e1c9f0fc03363f56a4" // ProductKey // DeviceName // DeviceSecret var productKey string = setting.YynserverSetting.ProductKey var deviceName string = setting.YynserverSetting.DeviceName var deviceSecret string = setting.YynserverSetting.DeviceSecret // set timestamp, clientid, subscribe topic and publish topic var timeStamp string = strconv.FormatInt(time.Now().UnixNano(), 10) var clientId string = "go_device_id_0001" var subTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/get" var pubTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/post" // set the login broker url var raw_broker bytes.Buffer raw_broker.WriteString("tls://") raw_broker.WriteString(productKey) raw_broker.WriteString(".iot-as-mqtt.cn-shanghai.aliyuncs.com:1883") opts := MQTT.NewClientOptions().AddBroker(raw_broker.String()) // calculate the login auth info, and set it into the connection options auth := calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp) opts.SetClientID(auth.mqttClientId) opts.SetUsername(auth.username) opts.SetPassword(auth.password) opts.SetMaxReconnectInterval(1 * time.Second) opts.AutoReconnect = true // opts.SetKeepAlive(60 * 2 * time.Second) opts.OnConnect = func(c MQTT.Client) { if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil { logging.Error("mqtt Subscribe err: ", token.Error()) os.Exit(1) } } // create and start a client using the above ClientOptions c := MQTT.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { logging.Error("mqtt Connect err: ", token.Error()) } fmt.Print("Connect aliyun IoT Cloud Sucess\n") // if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil { // fmt.Println(token.Error()) // os.Exit(1) // } // subscribe to subTopic("/a1Zd7n5yTt8/deng/user/get") and request messages to be delivered // fmt.Println("Subscribe topic "+subTopic+" success\n", c.IsConnected()) // publish 5 messages to pubTopic("/a1Zd7n5yTt8/deng/user/update") // for i := 0; i < 50; i++ { // fmt.Println("publish msg:", i) // text := fmt.Sprintf("ABC #%d", i) // token := c.Publish(pubTopic, 0, false, text) // fmt.Println("publish msg: ", text) // token.Wait() // time.Sleep(2 * time.Second) // } // unsubscribe from subTopic("/a1Zd7n5yTt8/deng/user/get") // if token := c.Unsubscribe(subTopic); token.Wait() && token.Error() != nil { // fmt.Println(token.Error()) // os.Exit(1) // } // c.Disconnect(250) return c, pubTopic } func NewTLSConfig() *tls.Config { // Import trusted certificates from CAfile.pem. // Alternatively, manually add CA certificates to default openssl CA bundle. certpool := x509.NewCertPool() pemCerts, err := ioutil.ReadFile("./x509/root.pem") if err != nil { fmt.Println("0. read file error, game over!!") } certpool.AppendCertsFromPEM(pemCerts) // Create tls.Config with desired tls properties return &tls.Config{ // RootCAs = certs used to verify server cert. RootCAs: certpool, // ClientAuth = whether to request cert from server. // Since the server is set up for SSL, this happens // anyways. ClientAuth: tls.NoClientCert, // ClientCAs = certs used to validate client cert. ClientCAs: nil, // InsecureSkipVerify = verify that cert contents // match server. IP matches what is in cert etc. InsecureSkipVerify: false, // Certificates = list of certs client sends to server. // Certificates: []tls.Certificate{cert}, } } // define a function for the default message handler var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: %s\n", msg.Payload()) } type AuthInfo struct { password, username, mqttClientId string } func calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp string) AuthInfo { var raw_passwd bytes.Buffer raw_passwd.WriteString("clientId" + clientId) raw_passwd.WriteString("deviceName") raw_passwd.WriteString(deviceName) raw_passwd.WriteString("productKey") raw_passwd.WriteString(productKey) raw_passwd.WriteString("timestamp") raw_passwd.WriteString(timeStamp) fmt.Println(raw_passwd.String()) // hmac, use sha1 mac := hmac.New(sha1.New, []byte(deviceSecret)) mac.Write([]byte(raw_passwd.String())) password := fmt.Sprintf("%02x", mac.Sum(nil)) fmt.Println(password) username := deviceName + "&" + productKey var MQTTClientId bytes.Buffer MQTTClientId.WriteString(clientId) // hmac, use sha1; securemode=2 means TLS connection MQTTClientId.WriteString("|securemode=2,_v=paho-go-1.0.0,signmethod=hmacsha1,timestamp=") MQTTClientId.WriteString(timeStamp) MQTTClientId.WriteString("|") auth := AuthInfo{password: password, username: username, mqttClientId: MQTTClientId.String()} return auth } func feedtempletPush(c MQTT.Client, pubTopic string) { tx := restful.Engine.NewSession() defer tx.Close() dataList, err := tx.SQL(`select f.id as recipeId , f.tname recipeName,ft.id ingId,ft.fname ingName,ft.fweight afQty, ft.sort mixNo,fd.allowratio allowableError ,fd.fclass ingType, ft.fweight * ( fd.dry /100 ) dmQty,null recipeCost from feedtemplet f join ftdetail ft on ft.ftid = f.id join feed fd on fd.id = ft.fid `).Query().List() if err != nil { logs.Error("feedtempletPush-error-1:", err) return } pushStr := `{ "apiId": "getKPTData", "param": { "farmId": %s, "method":"getfeedtempletinfo", "rowCount": "1", "resultData":%s } }` if len(dataList) > 0 { b, _ := json.Marshal(dataList) pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b)) // c.Publish(pubTopic, 2, false, pushStr) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // time.Sleep(2 * time.Second) } } func stirPush(c MQTT.Client, pubTopic string) { tx := restful.Engine.NewSession() defer tx.Close() dataList, err := tx.SQL(`SELECT d.mydate dropDate, d.projname tmrNo, d.times loadShift, d.tempid recipeId, d.templetname recipeName, f.feedcode ingId, d1.fname ingName, f.fclass ingType, f.dry dmPct, d1.sort mixNo, d1.feedallowratio allowableError, d1.lweight expWeight, d1.actualweightminus actualWeight, d1.begintime startTime, d1.intime endTime FROM downloadedplan d JOIN downloadplandtl1 d1 ON d1.pid = d.id JOIN feed f ON f.feedcode = d1.feedcode AND f.pastureid = d.pastureid WHERE DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List() if err != nil { logs.Error("feedtempletPush-error-1:", err) return } pushStr := `{ "apiId": "getKPTData", "param": { "farmId": %s, "method":"uploadadddata", "rowCount": "1", "resultData":%s } }` if len(dataList) > 0 { b, _ := json.Marshal(dataList) pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b)) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // time.Sleep(2 * time.Second) } } // 撒料信息 func dustingPush(c MQTT.Client, pubTopic string) { tx := restful.Engine.NewSession() defer tx.Close() dataList, err := tx.SQL(`SELECT d.mydate dropDate, d.projname tmrNo, d.times loadShift, d2.fbarid penId, b.bcode penName, d2.cowcount cowCount, d2.sort feedingNo, d2.lweight expWeight, d2.actualweightminus actualWeight, d2.begintime startTime, d2.intime endTime FROM downloadedplan d JOIN downloadplandtl2 d2 ON d2.pid = d.id JOIN bar b ON b.id = d2.fbarid WHERE DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List() if err != nil { logs.Error("feedtempletPush-error-1:", err) return } pushStr := `{ "apiId": "getKPTData", "param": { "farmId": %s, "method":"uploaddiliverdata", "rowCount": "1", "resultData":%s } }` if len(dataList) > 0 { b, _ := json.Marshal(dataList) pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b)) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // time.Sleep(2 * time.Second) } } //设备心跳 func deviceHeartbeat(c MQTT.Client, pubTopic string) { pushStr := fmt.Sprintf(`{"data_collect_number":%s,"status":true,"model_type":"heartbeat"}`, setting.YynserverSetting.HeartBeat) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // go func() { duetimecst2, _ := time.ParseInLocation("15:04:05", "00:01:00", time.Local) duetimecst3, _ := time.ParseInLocation("15:04:05", "00:00:00", time.Local) spec1 := fmt.Sprintf("@every %v", duetimecst2.Sub(duetimecst3)) // for { device := cron.New() device.AddFunc(spec1, func() { token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() }) // } device.Start() // }() } // 准确率 func equipmentAccuracyPush(c MQTT.Client, pubTopic string) { tx := restful.Engine.NewSession() defer tx.Close() dataList, err := tx.SQL(`select t.tname tname,1-abs(sum(d1.actualweightminus)- sum(d1.lweight))/sum(d1.lweight) rate ,d.mydate rateDate from downloadedplan d join downloadplandtl1 d1 on d1.pid = d.id join tmr t on t.datacaptureno = d.datacaptureno where DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' ) group by d.datacaptureno`).Query().List() if err != nil { logs.Error("feedtempletPush-error-1:", err) return } pushStr := `{ "apiId": "getKPTData", "param": { "resultData": %s, "farmId": %s, "method": "uploadrate", "rowCount": %d } }` if len(dataList) > 0 { b, _ := json.Marshal(dataList) pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId, len(dataList)) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // time.Sleep(2 * time.Second) } } // 完成重量 func finishedWeightPush(c MQTT.Client, pubTopic string) { tx := restful.Engine.NewSession() defer tx.Close() dataList, err := tx.SQL(`select sum(d1.actualweightminus) CompleteWeught,sum(d1.lweight) planWeight,d.mydate weightDate from downloadedplan d join downloadplandtl1 d1 on d1.pid = d.id join tmr t on t.datacaptureno = d.datacaptureno where DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' )`).Query().List() if err != nil { logs.Error("feedtempletPush-error-1:", err) return } pushStr := `{ "apiId": "getKPTData", "param": { "resultData": %s, "farmId": %s, "method": "uploadweight", "rowCount": "1" } }` if len(dataList) > 0 { b, _ := json.Marshal(dataList) pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // time.Sleep(2 * time.Second) } } // 完成车次 func CompletedTrainNumberPush(c MQTT.Client, pubTopic string) { tx := restful.Engine.NewSession() defer tx.Close() dataList, err := tx.SQL(`select (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) ) planCar, (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) and iscompleted = 1 ) CompleteCar , DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) carDate `).Query().List() if err != nil { logs.Error("feedtempletPush-error-1:", err) return } pushStr := `{ "apiId": "getKPTData", "param": { "resultData": %s, "farmId": %s, "method": "uploadcarnumber", "rowCount": "1" } }` if len(dataList) > 0 { b, _ := json.Marshal(dataList) pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // time.Sleep(2 * time.Second) } } func feedHeatwatch(client MQTT.Client, msg MQTT.Message) { tx := restful.Engine.NewSession() defer tx.Close() data := make(map[string]interface{}) json.Unmarshal(msg.Payload(), &data) if _, ok := data["feedData"]; ok { for _, item := range data["feedData"].([]map[string]interface{}) { tx.SQL(` insert into feed(pastureid,feedcode,fname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?) ON DUPLICATE KEY UPDATE feedcode = ?,fname = ? `, item["feedCode"], item["feedName"], item["feedCode"], item["feedName"]).Execute() } } else if _, ok := data["barData"]; ok { for _, item := range data["barData"].([]map[string]interface{}) { tx.SQL(` insert into bar(pastureid,bcode,bname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?) ON DUPLICATE KEY UPDATE bcode = ?,bname = ? `, item["barCode"], item["barName"], item["barCode"], item["barName"]).Execute() } } }