package api import ( "bytes" "crypto/hmac" "crypto/sha1" "crypto/tls" "crypto/x509" "encoding/json" "fmt" "io/ioutil" "os" "strconv" "time" "tmr-watch/conf/setting" "tmr-watch/http/handle/restful" "tmr-watch/pkg/logging" "github.com/astaxie/beego/logs" MQTT "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/robfig/cron" ) func InitMqttClient() { if setting.YynserverSetting.FarmId != "" { c, pubTopic := MqttClient() deviceHeartbeat(c, pubTopic) // now := time.Now().AddDate(0, 0, -5).Format("2006-01-02") // stirPush(c, pubTopic, now) // dustingPush(c, pubTopic, now) // equipmentAccuracyPush(c, pubTopic, now) // finishedWeightPush(c, pubTopic, now) // feedtempletPush(c, pubTopic) // CompletedTrainNumberPush(c, pubTopic, now) mqttCron := cron.New() mqttCron.AddFunc("10 06 * * *", func() { now := time.Now().AddDate(0, 0, -1).Format("2006-01-02") stirPush(c, pubTopic, now) dustingPush(c, pubTopic, now) equipmentAccuracyPush(c, pubTopic, now) finishedWeightPush(c, pubTopic, now) feedtempletPush(c, pubTopic) CompletedTrainNumberPush(c, pubTopic, now) }) mqttCron.Start() } } func MqttClient() (MQTT.Client, string) { // set the device info, include product key, device name, and device secret // var productKey string = "a1NmXfrjL8M" // var deviceName string = "4776_p_breed" // var deviceSecret string = "c2591b89adff22e1c9f0fc03363f56a4" // ProductKey // DeviceName // DeviceSecret var productKey string = setting.YynserverSetting.ProductKey var deviceName string = setting.YynserverSetting.DeviceName var deviceSecret string = setting.YynserverSetting.DeviceSecret // product_key =k03txxLKFae // device_name =4623_p_breed // device_secret =d06ababb2b10ba25bca3041e35ac604d // host = iot-010a5xth.mqtt.iothub.aliyuncs.com // farmId=1830004623 // heartBeat=18300046234623_p_breed // TopicName=/k03txxLKFae/4623_p_breed/user/heatwatch/tmrBreed/post var timeStamp string = strconv.FormatInt(time.Now().UnixNano(), 10) var clientId string = "go" + setting.YynserverSetting.FarmId var subTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/get" var pubTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/post" // set the login broker url var raw_broker bytes.Buffer raw_broker.WriteString("tcp://") raw_broker.WriteString("iot-010a5xth.mqtt.iothub.aliyuncs.com:1883") opts := MQTT.NewClientOptions().AddBroker(raw_broker.String()) // calculate the login auth info, and set it into the connection options auth := calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp) opts.SetClientID(auth.mqttClientId) opts.SetUsername(auth.username) opts.SetPassword(auth.password) opts.SetMaxReconnectInterval(1 * time.Second) opts.AutoReconnect = true // opts.SetKeepAlive(60 * 2 * time.Second) opts.OnConnect = func(c MQTT.Client) { if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil { logging.Error("mqtt Subscribe err: ", token.Error()) os.Exit(1) } } c := mqtt.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } fmt.Print("Connect aliyun IoT Cloud Sucess\n") return c, pubTopic } func NewTLSConfig() *tls.Config { // Import trusted certificates from CAfile.pem. // Alternatively, manually add CA certificates to default openssl CA bundle. certpool := x509.NewCertPool() pemCerts, err := ioutil.ReadFile("./x509/root.pem") if err != nil { fmt.Println("0. read file error, game over!!") } certpool.AppendCertsFromPEM(pemCerts) // Create tls.Config with desired tls properties return &tls.Config{ // RootCAs = certs used to verify server cert. RootCAs: certpool, // ClientAuth = whether to request cert from server. // Since the server is set up for SSL, this happens // anyways. ClientAuth: tls.NoClientCert, // ClientCAs = certs used to validate client cert. ClientCAs: nil, // InsecureSkipVerify = verify that cert contents // match server. IP matches what is in cert etc. InsecureSkipVerify: false, // Certificates = list of certs client sends to server. // Certificates: []tls.Certificate{cert}, } } // define a function for the default message handler var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: %s\n", msg.Payload()) } type AuthInfo struct { password, username, mqttClientId string } func calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp string) AuthInfo { var raw_passwd bytes.Buffer raw_passwd.WriteString("clientId" + clientId) raw_passwd.WriteString("deviceName") raw_passwd.WriteString(deviceName) raw_passwd.WriteString("productKey") raw_passwd.WriteString(productKey) raw_passwd.WriteString("timestamp") raw_passwd.WriteString(timeStamp) fmt.Println(raw_passwd.String()) // hmac, use sha1 mac := hmac.New(sha1.New, []byte(deviceSecret)) mac.Write([]byte(raw_passwd.String())) password := fmt.Sprintf("%02x", mac.Sum(nil)) fmt.Println(password) username := deviceName + "&" + productKey var MQTTClientId bytes.Buffer MQTTClientId.WriteString(clientId) // hmac, use sha1; securemode=2 means TLS connection MQTTClientId.WriteString("|securemode=2,_v=paho-go-1.0.0,signmethod=hmacsha1,timestamp=") MQTTClientId.WriteString(timeStamp) MQTTClientId.WriteString("|") auth := AuthInfo{password: password, username: username, mqttClientId: MQTTClientId.String()} return auth } func feedtempletPush(c MQTT.Client, pubTopic string) { tx := restful.Engine.NewSession() defer tx.Close() dataList, err := tx.SQL(`SELECT f.id AS recipeId, f.tname recipeName, ft.id ingId, ifnull(fd.fname,fdy.fname) ingName, if(fd.fname is not null, ft.fweight,fty.fweight) afQty, ft.sort mixNo, ifnull(fd.allowratio,fdy.allowratio) allowableError, ifnull(fd.fclass,fdy.fclass) ingType, if(fd.fname is not null,ft.fweight * ( fd.dry / 100 ), fty.fweight * ( fdy.dry / 100 )) dmQty, '' recipeCost FROM feedtemplet f JOIN ftdetail ft ON ft.ftid = f.id left JOIN feed fd ON fd.id = ft.fid left join feedtemplet fy on fy.id = ft.preftid left join ftdetail fty on fty.ftid = fy.id left JOIN feed fdy ON fdy.id = fty.fid `).QueryString() if err != nil { logs.Error("feedtempletPush-error-1:", err) return } pushStr := `{ "apiId": "getKPTData", "param": { "farmId": "%s", "method":"getfeedtempletinfo", "rowCount": "1", "resultData":%s } }` if len(dataList) > 0 { b, _ := json.Marshal(dataList) pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b)) // c.Publish(pubTopic, 2, false, pushStr) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // time.Sleep(2 * time.Second) } } func stirPush(c MQTT.Client, pubTopic, date string) { tx := restful.Engine.NewSession() defer tx.Close() dataList, err := tx.SQL(`SELECT DATE_FORMAT(d.mydate,'%Y-%m-%d') loadDate, d.sort tmrNo, d.times loadShift, d.tempid recipeId, d.templetname recipeName, f.feedcode ingId, d1.fname ingName, 12 ingType, f.dry dmPct, d1.sort mixNo, d1.feedallowratio allowable_error, d1.lweight expWeight, d1.actualweightminus actualWeight, if((select count(1) from downloadplandtl1 where pid = d.id and sort < d1.sort order by sort desc) >0 ,(select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadplandtl1 where pid = d.id and sort < d1.sort order by sort desc limit 1),DATE_FORMAT(d.intime,'%Y-%m-%d %H:%i:%s') ) startTime, DATE_FORMAT(d1.intime,'%Y-%m-%d %H:%i:%s') endTime , ifnull(if(d.driverId !=0 ,(select drivername from driver where id = d.driverId),(SELECT dr.driver FROM dutyrecord dr WHERE dr.pastureid = d.pastureid AND dr.eqid = d.tmrid and dr.times= d.times AND dr.operatetime <=d.mydate ORDER BY dr.operatetime DESC LIMIT 1)),'')tmrName FROM downloadedplan d JOIN downloadplandtl1 d1 ON d1.pid = d.id JOIN feed f ON f.feedcode = d1.feedcode AND f.pastureid = d.pastureid WHERE DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = ? `, date).QueryString() if err != nil { logs.Error("feedtempletPush-error-1:", err) return } pushStr := `{ "apiId": "getKPTData", "param": { "farmId": "%s", "method":"uploadadddata", "rowCount": "%d", "resultData":%s } }` if len(dataList) > 0 { b, _ := json.Marshal(dataList) pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, len(dataList), string(b)) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // time.Sleep(2 * time.Second) } } // 撒料信息 func dustingPush(c MQTT.Client, pubTopic, date string) { tx := restful.Engine.NewSession() defer tx.Close() dataList, err := tx.SQL(`SELECT ifnull(if(d.driverId !=0 ,(select drivername from driver where id = d.driverId),(SELECT dr.driver FROM dutyrecord dr WHERE dr.pastureid = d.pastureid AND dr.eqid = d.tmrid and dr.times= d.times AND dr.operatetime <=d.mydate ORDER BY dr.operatetime DESC LIMIT 1)),'')tmrName , DATE_FORMAT(d.mydate,'%Y-%m-%d') dropDate, d.sort tmrNo, d.times dropShift, d2.fbarid penId, b.bname penName, fp.ccount cowCount, d2.sort feedingNo, d2.lweight expWeight, d2.actualweightminus actualWeight, if((select count(1) from downloadplandtl2 where pid = d.id and sort < d2.sort order by sort desc) >0 ,(select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadplandtl2 where pid = d.id and sort < d2.sort order by sort desc limit 1), (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadplandtl1 where pid = d.id order by sort desc limit 1) ) startTime, DATE_FORMAT(d2.intime,'%Y-%m-%d %H:%i:%s') endTime FROM downloadedplan d JOIN downloadplandtl2 d2 ON d2.pid = d.id JOIN bar b ON b.id = d2.fbarid join feedp fp on fp.barid = b.id join tmr t on t.id = d.tmrid left join driver on driver.drivercode = t.eqcode WHERE DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = ? `, date).QueryString() if err != nil { logs.Error("feedtempletPush-error-1:", err) return } pushStr := `{ "apiId": "getKPTData", "param": { "farmId": "%s", "method":"uploaddiliverdata", "rowCount": "%d", "resultData":%s } }` if len(dataList) > 0 { b, _ := json.Marshal(dataList) pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, len(dataList), string(b)) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // time.Sleep(2 * time.Second) } } //设备心跳 func deviceHeartbeat(c MQTT.Client, pubTopic string) { pushStr := fmt.Sprintf(`{"data_collect_number":%s,"status":true,"model_type":"heartbeat"}`, setting.YynserverSetting.HeartBeat) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // go func() { duetimecst2, _ := time.ParseInLocation("15:04:05", "00:01:00", time.Local) duetimecst3, _ := time.ParseInLocation("15:04:05", "00:00:00", time.Local) spec1 := fmt.Sprintf("@every %v", duetimecst2.Sub(duetimecst3)) // for { device := cron.New() device.AddFunc(spec1, func() { token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error(), time.Now()) // token.Wait() }) // } device.Start() // }() } // 准确率 func equipmentAccuracyPush(c MQTT.Client, pubTopic, date string) { tx := restful.Engine.NewSession() defer tx.Close() dataList, err := tx.SQL(`SELECT t.tname Name, 1-abs ( sum( d1.actualweightminus )- sum( d1.lweight ))/ sum( d1.lweight ) Rate, d.mydate RateDate , DATE_FORMAT(d.intime,'%Y-%m-%d %H:%i:%s') startTime, (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadplandtl2 where pid = d.id order by sort desc limit 1) endTime FROM downloadedplan d JOIN downloadplandtl1 d1 ON d1.pid = d.id JOIN tmr t ON t.datacaptureno = d.datacaptureno WHERE DATE_FORMAT( d.mydate, '%Y-%m-%d' ) = ? and d.lpplantype in(0,1) GROUP BY d.datacaptureno`, date).QueryString() if err != nil { logs.Error("feedtempletPush-error-1:", err) return } pushStr := `{ "apiId": "getKPTData", "param": { "resultData": %s, "farmId": "%s", "method": "uploadrate", "rowCount": %d } }` if len(dataList) > 0 { b, _ := json.Marshal(dataList) pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId, len(dataList)) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // time.Sleep(2 * time.Second) } } // 完成重量 func finishedWeightPush(c MQTT.Client, pubTopic, date string) { tx := restful.Engine.NewSession() defer tx.Close() dataList, err := tx.SQL(`SELECT sum( d1.actualweightminus ) completeWeight, sum( d1.lweight ) planWeight, d.mydate weightDate , DATE_FORMAT(d.intime,'%Y-%m-%d %H:%i:%s') startTime, (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadedplan where mydate = d.mydate and intime is not null order by sort desc limit 1) endTime FROM downloadedplan d JOIN downloadplandtl1 d1 ON d1.pid = d.id JOIN tmr t ON t.datacaptureno = d.datacaptureno WHERE DATE_FORMAT( d.mydate, '%Y-%m-%d' ) = ? and lpplantype in(0,1)`, date).QueryString() if err != nil { logs.Error("feedtempletPush-error-1:", err) return } pushStr := `{ "apiId": "getKPTData", "param": { "resultData": %s, "farmId": "%s", "method": "uploadweight", "rowCount": "1" } }` if len(dataList) > 0 { b, _ := json.Marshal(dataList) pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // time.Sleep(2 * time.Second) } } // 完成车次 func CompletedTrainNumberPush(c MQTT.Client, pubTopic, date string) { tx := restful.Engine.NewSession() defer tx.Close() dataList, err := tx.SQL(` select (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = ? and lpplantype in(0,1) ) planCar, (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = ? and iscompleted = 1 and lpplantype in(0,1)) CompleteCar , ? carDate, (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = ? and iscompleted = 1 and lpplantype in(0,1) order by intime asc limit 1 ) startTime, (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = ? and iscompleted = 1 and lpplantype in(0,1) and intime is not null order by intime desc limit 1 ) endTime `, date, date, date, date, date).QueryString() if err != nil { logs.Error("feedtempletPush-error-1:", err) return } pushStr := `{ "apiId": "getKPTData", "param": { "resultData": %s, "farmId": "%s", "method": "uploadcarnumber", "rowCount": "1" } }` if len(dataList) > 0 { b, _ := json.Marshal(dataList) pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId) token := c.Publish(pubTopic, 2, false, pushStr) fmt.Println("publish msg: ", pushStr, token.Error()) // token.Wait() // time.Sleep(2 * time.Second) } } func feedHeatwatch(client MQTT.Client, msg MQTT.Message) { tx := restful.Engine.NewSession() defer tx.Close() data := make(map[string]interface{}) json.Unmarshal(msg.Payload(), &data) if _, ok := data["feedData"]; ok { for _, item := range data["feedData"].([]map[string]interface{}) { tx.SQL(` insert into feed(pastureid,feedcode,fname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?) ON DUPLICATE KEY UPDATE feedcode = ?,fname = ? `, item["feedCode"], item["feedName"], item["feedCode"], item["feedName"]).Execute() } } else if _, ok := data["barData"]; ok { for _, item := range data["barData"].([]map[string]interface{}) { tx.SQL(` insert into bar(pastureid,bcode,bname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?) ON DUPLICATE KEY UPDATE bcode = ?,bname = ? `, item["barCode"], item["barName"], item["barCode"], item["barName"]).Execute() } } }