123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461 |
- package api
- import (
- "bytes"
- "crypto/hmac"
- "crypto/sha1"
- "crypto/tls"
- "crypto/x509"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "os"
- "strconv"
- "time"
- "tmr-watch/conf/setting"
- "tmr-watch/http/handle/restful"
- "tmr-watch/pkg/logging"
- "github.com/astaxie/beego/logs"
- MQTT "github.com/eclipse/paho.mqtt.golang"
- "github.com/robfig/cron"
- )
- func InitMqttClient() {
- if setting.YynserverSetting.FarmId != "" {
- c, pubTopic := MqttClient()
- deviceHeartbeat(c, pubTopic)
- mqttCron := cron.New()
- mqttCron.AddFunc("10 06 * * *", func() {
- feedtempletPush(c, pubTopic)
- stirPush(c, pubTopic)
- dustingPush(c, pubTopic)
- equipmentAccuracyPush(c, pubTopic)
- finishedWeightPush(c, pubTopic)
- CompletedTrainNumberPush(c, pubTopic)
- })
- mqttCron.Start()
- }
- }
- func MqttClient() (MQTT.Client, string) {
- // set the device info, include product key, device name, and device secret
- // var productKey string = "a1NmXfrjL8M"
- // var deviceName string = "4776_p_breed"
- // var deviceSecret string = "c2591b89adff22e1c9f0fc03363f56a4"
- // ProductKey
- // DeviceName
- // DeviceSecret
- var productKey string = setting.YynserverSetting.ProductKey
- var deviceName string = setting.YynserverSetting.DeviceName
- var deviceSecret string = setting.YynserverSetting.DeviceSecret
- // set timestamp, clientid, subscribe topic and publish topic
- var timeStamp string = strconv.FormatInt(time.Now().UnixNano(), 10)
- var clientId string = "go_device_id_0001"
- var subTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/get"
- var pubTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/post"
- // set the login broker url
- var raw_broker bytes.Buffer
- raw_broker.WriteString("tls://")
- raw_broker.WriteString(productKey)
- raw_broker.WriteString(".iot-as-mqtt.cn-shanghai.aliyuncs.com:1883")
- opts := MQTT.NewClientOptions().AddBroker(raw_broker.String())
- // calculate the login auth info, and set it into the connection options
- auth := calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp)
- opts.SetClientID(auth.mqttClientId)
- opts.SetUsername(auth.username)
- opts.SetPassword(auth.password)
- opts.SetMaxReconnectInterval(1 * time.Second)
- opts.AutoReconnect = true
- // opts.SetKeepAlive(60 * 2 * time.Second)
- opts.OnConnect = func(c MQTT.Client) {
- if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil {
- logging.Error("mqtt Subscribe err: ", token.Error())
- os.Exit(1)
- }
- }
- // create and start a client using the above ClientOptions
- c := MQTT.NewClient(opts)
- if token := c.Connect(); token.Wait() && token.Error() != nil {
- logging.Error("mqtt Connect err: ", token.Error())
- }
- fmt.Print("Connect aliyun IoT Cloud Sucess\n")
- // if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil {
- // fmt.Println(token.Error())
- // os.Exit(1)
- // }
- // subscribe to subTopic("/a1Zd7n5yTt8/deng/user/get") and request messages to be delivered
- // fmt.Println("Subscribe topic "+subTopic+" success\n", c.IsConnected())
- // publish 5 messages to pubTopic("/a1Zd7n5yTt8/deng/user/update")
- // for i := 0; i < 50; i++ {
- // fmt.Println("publish msg:", i)
- // text := fmt.Sprintf("ABC #%d", i)
- // token := c.Publish(pubTopic, 0, false, text)
- // fmt.Println("publish msg: ", text)
- // token.Wait()
- // time.Sleep(2 * time.Second)
- // }
- // unsubscribe from subTopic("/a1Zd7n5yTt8/deng/user/get")
- // if token := c.Unsubscribe(subTopic); token.Wait() && token.Error() != nil {
- // fmt.Println(token.Error())
- // os.Exit(1)
- // }
- // c.Disconnect(250)
- return c, pubTopic
- }
- func NewTLSConfig() *tls.Config {
- // Import trusted certificates from CAfile.pem.
- // Alternatively, manually add CA certificates to default openssl CA bundle.
- certpool := x509.NewCertPool()
- pemCerts, err := ioutil.ReadFile("./x509/root.pem")
- if err != nil {
- fmt.Println("0. read file error, game over!!")
- }
- certpool.AppendCertsFromPEM(pemCerts)
- // Create tls.Config with desired tls properties
- return &tls.Config{
- // RootCAs = certs used to verify server cert.
- RootCAs: certpool,
- // ClientAuth = whether to request cert from server.
- // Since the server is set up for SSL, this happens
- // anyways.
- ClientAuth: tls.NoClientCert,
- // ClientCAs = certs used to validate client cert.
- ClientCAs: nil,
- // InsecureSkipVerify = verify that cert contents
- // match server. IP matches what is in cert etc.
- InsecureSkipVerify: false,
- // Certificates = list of certs client sends to server.
- // Certificates: []tls.Certificate{cert},
- }
- }
- // define a function for the default message handler
- var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
- fmt.Printf("TOPIC: %s\n", msg.Topic())
- fmt.Printf("MSG: %s\n", msg.Payload())
- }
- type AuthInfo struct {
- password, username, mqttClientId string
- }
- func calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp string) AuthInfo {
- var raw_passwd bytes.Buffer
- raw_passwd.WriteString("clientId" + clientId)
- raw_passwd.WriteString("deviceName")
- raw_passwd.WriteString(deviceName)
- raw_passwd.WriteString("productKey")
- raw_passwd.WriteString(productKey)
- raw_passwd.WriteString("timestamp")
- raw_passwd.WriteString(timeStamp)
- fmt.Println(raw_passwd.String())
- // hmac, use sha1
- mac := hmac.New(sha1.New, []byte(deviceSecret))
- mac.Write([]byte(raw_passwd.String()))
- password := fmt.Sprintf("%02x", mac.Sum(nil))
- fmt.Println(password)
- username := deviceName + "&" + productKey
- var MQTTClientId bytes.Buffer
- MQTTClientId.WriteString(clientId)
- // hmac, use sha1; securemode=2 means TLS connection
- MQTTClientId.WriteString("|securemode=2,_v=paho-go-1.0.0,signmethod=hmacsha1,timestamp=")
- MQTTClientId.WriteString(timeStamp)
- MQTTClientId.WriteString("|")
- auth := AuthInfo{password: password, username: username, mqttClientId: MQTTClientId.String()}
- return auth
- }
- func feedtempletPush(c MQTT.Client, pubTopic string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`select f.id as recipeId , f.tname recipeName,ft.id ingId,ft.fname ingName,ft.fweight afQty,
- ft.sort mixNo,fd.allowratio allowableError ,fd.fclass ingType, ft.fweight * ( fd.dry /100 ) dmQty,null recipeCost
- from feedtemplet f join ftdetail ft on ft.ftid = f.id join feed fd on fd.id = ft.fid
- `).Query().List()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "farmId": %s,
- "method":"getfeedtempletinfo",
- "rowCount": "1",
- "resultData":%s
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
- // c.Publish(pubTopic, 2, false, pushStr)
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // time.Sleep(2 * time.Second)
- }
- }
- func stirPush(c MQTT.Client, pubTopic string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`SELECT
- d.mydate dropDate,
- d.projname tmrNo,
- d.times loadShift,
- d.tempid recipeId,
- d.templetname recipeName,
- f.feedcode ingId,
- d1.fname ingName,
- f.fclass ingType,
- f.dry dmPct,
- d1.sort mixNo,
- d1.feedallowratio allowableError,
- d1.lweight expWeight,
- d1.actualweightminus actualWeight,
- d1.begintime startTime,
- d1.intime endTime
- FROM
- downloadedplan d
- JOIN downloadplandtl1 d1 ON d1.pid = d.id
- JOIN feed f ON f.feedcode = d1.feedcode
- AND f.pastureid = d.pastureid
- WHERE
- DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "farmId": %s,
- "method":"uploadadddata",
- "rowCount": "1",
- "resultData":%s
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // time.Sleep(2 * time.Second)
- }
- }
- // 撒料信息
- func dustingPush(c MQTT.Client, pubTopic string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`SELECT
- d.mydate dropDate,
- d.projname tmrNo,
- d.times loadShift,
- d2.fbarid penId,
- b.bcode penName,
- d2.cowcount cowCount,
- d2.sort feedingNo,
- d2.lweight expWeight,
- d2.actualweightminus actualWeight,
- d2.begintime startTime,
- d2.intime endTime
- FROM
- downloadedplan d
- JOIN downloadplandtl2 d2 ON d2.pid = d.id
- JOIN bar b ON b.id = d2.fbarid
- WHERE
- DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "farmId": %s,
- "method":"uploaddiliverdata",
- "rowCount": "1",
- "resultData":%s
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // time.Sleep(2 * time.Second)
- }
- }
- //设备心跳
- func deviceHeartbeat(c MQTT.Client, pubTopic string) {
- pushStr := fmt.Sprintf(`{"data_collect_number":%s,"status":true,"model_type":"heartbeat"}`, setting.YynserverSetting.HeartBeat)
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // go func() {
- duetimecst2, _ := time.ParseInLocation("15:04:05", "00:01:00", time.Local)
- duetimecst3, _ := time.ParseInLocation("15:04:05", "00:00:00", time.Local)
- spec1 := fmt.Sprintf("@every %v", duetimecst2.Sub(duetimecst3))
- // for {
- device := cron.New()
- device.AddFunc(spec1, func() {
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- })
- // }
- device.Start()
- // }()
- }
- // 准确率
- func equipmentAccuracyPush(c MQTT.Client, pubTopic string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`select t.tname tname,1-abs(sum(d1.actualweightminus)- sum(d1.lweight))/sum(d1.lweight) rate ,d.mydate rateDate from downloadedplan d join downloadplandtl1 d1 on d1.pid = d.id join tmr t on t.datacaptureno = d.datacaptureno
- where DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )
- group by d.datacaptureno`).Query().List()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "resultData": %s,
- "farmId": %s,
- "method": "uploadrate",
- "rowCount": %d
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId, len(dataList))
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // time.Sleep(2 * time.Second)
- }
- }
- // 完成重量
- func finishedWeightPush(c MQTT.Client, pubTopic string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`select sum(d1.actualweightminus) CompleteWeught,sum(d1.lweight) planWeight,d.mydate weightDate
- from downloadedplan d join downloadplandtl1 d1 on d1.pid = d.id join tmr t on t.datacaptureno = d.datacaptureno
- where DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' )`).Query().List()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "resultData": %s,
- "farmId": %s,
- "method": "uploadweight",
- "rowCount": "1"
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // time.Sleep(2 * time.Second)
- }
- }
- // 完成车次
- func CompletedTrainNumberPush(c MQTT.Client, pubTopic string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`select (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) ) planCar,
- (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) and iscompleted = 1 ) CompleteCar ,
- DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) carDate
- `).Query().List()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "resultData": %s,
- "farmId": %s,
- "method": "uploadcarnumber",
- "rowCount": "1"
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // time.Sleep(2 * time.Second)
- }
- }
- func feedHeatwatch(client MQTT.Client, msg MQTT.Message) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- data := make(map[string]interface{})
- json.Unmarshal(msg.Payload(), &data)
- if _, ok := data["feedData"]; ok {
- for _, item := range data["feedData"].([]map[string]interface{}) {
- tx.SQL(` insert into feed(pastureid,feedcode,fname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS
- WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)
- ON DUPLICATE KEY UPDATE feedcode = ?,fname = ? `, item["feedCode"], item["feedName"], item["feedCode"], item["feedName"]).Execute()
- }
- } else if _, ok := data["barData"]; ok {
- for _, item := range data["barData"].([]map[string]interface{}) {
- tx.SQL(` insert into bar(pastureid,bcode,bname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS
- WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)
- ON DUPLICATE KEY UPDATE bcode = ?,bname = ? `, item["barCode"], item["barName"], item["barCode"], item["barName"]).Execute()
- }
- }
- }
|