123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501 |
- package api
- import (
- "bytes"
- "crypto/hmac"
- "crypto/sha1"
- "crypto/tls"
- "crypto/x509"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "os"
- "strconv"
- "time"
- "tmr-watch/conf/setting"
- "tmr-watch/http/handle/restful"
- "tmr-watch/pkg/logging"
- "github.com/astaxie/beego/logs"
- MQTT "github.com/eclipse/paho.mqtt.golang"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "github.com/robfig/cron"
- )
- func InitMqttClient() {
- if setting.YynserverSetting.FarmId != "" {
- c, pubTopic := MqttClient()
- deviceHeartbeat(c, pubTopic)
- // now := time.Now().AddDate(0, 0, -5).Format("2006-01-02")
- // stirPush(c, pubTopic, now)
- // dustingPush(c, pubTopic, now)
- // equipmentAccuracyPush(c, pubTopic, now)
- // finishedWeightPush(c, pubTopic, now)
- // feedtempletPush(c, pubTopic)
- // CompletedTrainNumberPush(c, pubTopic, now)
- mqttCron := cron.New()
- mqttCron.AddFunc("10 06 * * *", func() {
- now := time.Now().AddDate(0, 0, -1).Format("2006-01-02")
- stirPush(c, pubTopic, now)
- dustingPush(c, pubTopic, now)
- equipmentAccuracyPush(c, pubTopic, now)
- finishedWeightPush(c, pubTopic, now)
- feedtempletPush(c, pubTopic)
- CompletedTrainNumberPush(c, pubTopic, now)
- })
- mqttCron.Start()
- }
- }
- func MqttClient() (MQTT.Client, string) {
- // set the device info, include product key, device name, and device secret
- // var productKey string = "a1NmXfrjL8M"
- // var deviceName string = "4776_p_breed"
- // var deviceSecret string = "c2591b89adff22e1c9f0fc03363f56a4"
- // ProductKey
- // DeviceName
- // DeviceSecret
- var productKey string = setting.YynserverSetting.ProductKey
- var deviceName string = setting.YynserverSetting.DeviceName
- var deviceSecret string = setting.YynserverSetting.DeviceSecret
- // product_key =k03txxLKFae
- // device_name =4623_p_breed
- // device_secret =d06ababb2b10ba25bca3041e35ac604d
- // host = iot-010a5xth.mqtt.iothub.aliyuncs.com
- // farmId=1830004623
- // heartBeat=18300046234623_p_breed
- // TopicName=/k03txxLKFae/4623_p_breed/user/heatwatch/tmrBreed/post
- var timeStamp string = strconv.FormatInt(time.Now().UnixNano(), 10)
- var clientId string = "go" + setting.YynserverSetting.FarmId
- var subTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/get"
- var pubTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/post"
- // set the login broker url
- var raw_broker bytes.Buffer
- raw_broker.WriteString("tcp://")
- raw_broker.WriteString("iot-010a5xth.mqtt.iothub.aliyuncs.com:1883")
- opts := MQTT.NewClientOptions().AddBroker(raw_broker.String())
- // calculate the login auth info, and set it into the connection options
- auth := calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp)
- opts.SetClientID(auth.mqttClientId)
- opts.SetUsername(auth.username)
- opts.SetPassword(auth.password)
- opts.SetMaxReconnectInterval(1 * time.Second)
- opts.AutoReconnect = true
- // opts.SetKeepAlive(60 * 2 * time.Second)
- opts.OnConnect = func(c MQTT.Client) {
- if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil {
- logging.Error("mqtt Subscribe err: ", token.Error())
- os.Exit(1)
- }
- }
- c := mqtt.NewClient(opts)
- if token := c.Connect(); token.Wait() && token.Error() != nil {
- fmt.Println(token.Error())
- os.Exit(1)
- }
- fmt.Print("Connect aliyun IoT Cloud Sucess\n")
- return c, pubTopic
- }
- func NewTLSConfig() *tls.Config {
- // Import trusted certificates from CAfile.pem.
- // Alternatively, manually add CA certificates to default openssl CA bundle.
- certpool := x509.NewCertPool()
- pemCerts, err := ioutil.ReadFile("./x509/root.pem")
- if err != nil {
- fmt.Println("0. read file error, game over!!")
- }
- certpool.AppendCertsFromPEM(pemCerts)
- // Create tls.Config with desired tls properties
- return &tls.Config{
- // RootCAs = certs used to verify server cert.
- RootCAs: certpool,
- // ClientAuth = whether to request cert from server.
- // Since the server is set up for SSL, this happens
- // anyways.
- ClientAuth: tls.NoClientCert,
- // ClientCAs = certs used to validate client cert.
- ClientCAs: nil,
- // InsecureSkipVerify = verify that cert contents
- // match server. IP matches what is in cert etc.
- InsecureSkipVerify: false,
- // Certificates = list of certs client sends to server.
- // Certificates: []tls.Certificate{cert},
- }
- }
- // define a function for the default message handler
- var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
- fmt.Printf("TOPIC: %s\n", msg.Topic())
- fmt.Printf("MSG: %s\n", msg.Payload())
- }
- type AuthInfo struct {
- password, username, mqttClientId string
- }
- func calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp string) AuthInfo {
- var raw_passwd bytes.Buffer
- raw_passwd.WriteString("clientId" + clientId)
- raw_passwd.WriteString("deviceName")
- raw_passwd.WriteString(deviceName)
- raw_passwd.WriteString("productKey")
- raw_passwd.WriteString(productKey)
- raw_passwd.WriteString("timestamp")
- raw_passwd.WriteString(timeStamp)
- fmt.Println(raw_passwd.String())
- // hmac, use sha1
- mac := hmac.New(sha1.New, []byte(deviceSecret))
- mac.Write([]byte(raw_passwd.String()))
- password := fmt.Sprintf("%02x", mac.Sum(nil))
- fmt.Println(password)
- username := deviceName + "&" + productKey
- var MQTTClientId bytes.Buffer
- MQTTClientId.WriteString(clientId)
- // hmac, use sha1; securemode=2 means TLS connection
- MQTTClientId.WriteString("|securemode=2,_v=paho-go-1.0.0,signmethod=hmacsha1,timestamp=")
- MQTTClientId.WriteString(timeStamp)
- MQTTClientId.WriteString("|")
- auth := AuthInfo{password: password, username: username, mqttClientId: MQTTClientId.String()}
- return auth
- }
- func feedtempletPush(c MQTT.Client, pubTopic string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`SELECT
- f.id AS recipeId,
- f.tname recipeName,
- ft.id ingId,
- ifnull(fd.fname,fdy.fname) ingName,
- if(fd.fname is not null, ft.fweight,fty.fweight) afQty,
- ft.sort mixNo,
- ifnull(fd.allowratio,fdy.allowratio) allowableError,
- ifnull(fd.fclass,fdy.fclass) ingType,
- if(fd.fname is not null,ft.fweight * ( fd.dry / 100 ), fty.fweight * ( fdy.dry / 100 )) dmQty,
- '' recipeCost
- FROM
- feedtemplet f
- JOIN ftdetail ft ON ft.ftid = f.id
- left JOIN feed fd ON fd.id = ft.fid
- left join feedtemplet fy on fy.id = ft.preftid
- left join ftdetail fty on fty.ftid = fy.id
- left JOIN feed fdy ON fdy.id = fty.fid
-
- `).QueryString()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "farmId": "%s",
- "method":"getfeedtempletinfo",
- "rowCount": "1",
- "resultData":%s
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
- // c.Publish(pubTopic, 2, false, pushStr)
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // time.Sleep(2 * time.Second)
- }
- }
- func stirPush(c MQTT.Client, pubTopic, date string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`SELECT
- DATE_FORMAT(d.mydate,'%Y-%m-%d') loadDate,
- d.sort tmrNo,
- d.times loadShift,
- d.tempid recipeId,
- d.templetname recipeName,
- f.feedcode ingId,
- d1.fname ingName,
- 12 ingType,
- f.dry dmPct,
- d1.sort mixNo,
- d1.feedallowratio allowable_error,
- d1.lweight expWeight,
- d1.actualweightminus actualWeight,
- if((select count(1) from downloadplandtl1 where pid = d.id and sort < d1.sort order by sort desc) >0 ,(select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadplandtl1 where pid = d.id and sort < d1.sort order by sort desc limit 1),DATE_FORMAT(d.intime,'%Y-%m-%d %H:%i:%s') ) startTime,
- DATE_FORMAT(d1.intime,'%Y-%m-%d %H:%i:%s') endTime , ifnull(if(d.driverId !=0 ,(select drivername from driver where id = d.driverId),(SELECT dr.driver FROM dutyrecord dr
- WHERE dr.pastureid = d.pastureid AND dr.eqid = d.tmrid and dr.times= d.times AND dr.operatetime <=d.mydate
- ORDER BY dr.operatetime DESC LIMIT 1)),'')tmrName
- FROM
- downloadedplan d
- JOIN downloadplandtl1 d1 ON d1.pid = d.id
- JOIN feed f ON f.feedcode = d1.feedcode
- AND f.pastureid = d.pastureid
- WHERE
- DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = ?
- `, date).QueryString()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "farmId": "%s",
- "method":"uploadadddata",
- "rowCount": "%d",
- "resultData":%s
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, len(dataList), string(b))
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // time.Sleep(2 * time.Second)
- }
- }
- // 撒料信息
- func dustingPush(c MQTT.Client, pubTopic, date string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`SELECT
- ifnull(if(d.driverId !=0 ,(select drivername from driver where id = d.driverId),(SELECT dr.driver FROM dutyrecord dr
- WHERE dr.pastureid = d.pastureid AND dr.eqid = d.tmrid and dr.times= d.times AND dr.operatetime <=d.mydate
- ORDER BY dr.operatetime DESC LIMIT 1)),'')tmrName ,
- DATE_FORMAT(d.mydate,'%Y-%m-%d') dropDate,
- d.sort tmrNo,
- d.times dropShift,
- d2.fbarid penId,
- b.bname penName,
- fp.ccount cowCount,
- d2.sort feedingNo,
- d2.lweight expWeight,
- d2.actualweightminus actualWeight,
- if((select count(1) from downloadplandtl2 where pid = d.id and sort < d2.sort order by sort desc) >0 ,(select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadplandtl2 where pid = d.id and sort < d2.sort order by sort desc limit 1), (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadplandtl1 where pid = d.id order by sort desc limit 1) ) startTime,
- DATE_FORMAT(d2.intime,'%Y-%m-%d %H:%i:%s') endTime
- FROM
- downloadedplan d
- JOIN downloadplandtl2 d2 ON d2.pid = d.id
- JOIN bar b ON b.id = d2.fbarid
- join feedp fp on fp.barid = b.id
- join tmr t on t.id = d.tmrid
- left join driver on driver.drivercode = t.eqcode
- WHERE
- DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = ? `, date).QueryString()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "farmId": "%s",
- "method":"uploaddiliverdata",
- "rowCount": "%d",
- "resultData":%s
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, len(dataList), string(b))
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // time.Sleep(2 * time.Second)
- }
- }
- //设备心跳
- func deviceHeartbeat(c MQTT.Client, pubTopic string) {
- pushStr := fmt.Sprintf(`{"data_collect_number":%s,"status":true,"model_type":"heartbeat"}`, setting.YynserverSetting.HeartBeat)
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // go func() {
- duetimecst2, _ := time.ParseInLocation("15:04:05", "00:01:00", time.Local)
- duetimecst3, _ := time.ParseInLocation("15:04:05", "00:00:00", time.Local)
- spec1 := fmt.Sprintf("@every %v", duetimecst2.Sub(duetimecst3))
- // for {
- device := cron.New()
- device.AddFunc(spec1, func() {
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error(), time.Now())
- // token.Wait()
- })
- // }
- device.Start()
- // }()
- }
- // 准确率
- func equipmentAccuracyPush(c MQTT.Client, pubTopic, date string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`SELECT
- t.tname Name,
- 1-abs (
- sum( d1.actualweightminus )- sum( d1.lweight ))/ sum( d1.lweight ) Rate,
- d.mydate RateDate , DATE_FORMAT(d.intime,'%Y-%m-%d %H:%i:%s') startTime,
- (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadplandtl2 where pid = d.id order by sort desc limit 1) endTime
- FROM
- downloadedplan d
- JOIN downloadplandtl1 d1 ON d1.pid = d.id
- JOIN tmr t ON t.datacaptureno = d.datacaptureno
- WHERE
- DATE_FORMAT( d.mydate, '%Y-%m-%d' ) = ? and d.lpplantype in(0,1)
- GROUP BY
- d.datacaptureno`, date).QueryString()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "resultData": %s,
- "farmId": "%s",
- "method": "uploadrate",
- "rowCount": %d
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId, len(dataList))
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // time.Sleep(2 * time.Second)
- }
- }
- // 完成重量
- func finishedWeightPush(c MQTT.Client, pubTopic, date string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`SELECT
- sum( d1.actualweightminus ) completeWeight,
- sum( d1.lweight ) planWeight,
- d.mydate weightDate , DATE_FORMAT(d.intime,'%Y-%m-%d %H:%i:%s') startTime,
- (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadedplan where mydate = d.mydate and intime is not null order by sort desc limit 1) endTime
- FROM
- downloadedplan d
- JOIN downloadplandtl1 d1 ON d1.pid = d.id
- JOIN tmr t ON t.datacaptureno = d.datacaptureno
- WHERE
- DATE_FORMAT( d.mydate, '%Y-%m-%d' ) = ? and lpplantype in(0,1)`, date).QueryString()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "resultData": %s,
- "farmId": "%s",
- "method": "uploadweight",
- "rowCount": "1"
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // time.Sleep(2 * time.Second)
- }
- }
- // 完成车次
- func CompletedTrainNumberPush(c MQTT.Client, pubTopic, date string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(` select (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = ? and lpplantype in(0,1) ) planCar,
- (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = ? and iscompleted = 1 and lpplantype in(0,1)) CompleteCar ,
- ? carDate,
- (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = ? and iscompleted = 1 and lpplantype in(0,1) order by intime asc limit 1 ) startTime,
- (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = ? and iscompleted = 1 and lpplantype in(0,1) and intime is not null order by intime desc limit 1 ) endTime
- `, date, date, date, date, date).QueryString()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "resultData": %s,
- "farmId": "%s",
- "method": "uploadcarnumber",
- "rowCount": "1"
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
- // token.Wait()
- // time.Sleep(2 * time.Second)
- }
- }
- func feedHeatwatch(client MQTT.Client, msg MQTT.Message) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- data := make(map[string]interface{})
- json.Unmarshal(msg.Payload(), &data)
- if _, ok := data["feedData"]; ok {
- for _, item := range data["feedData"].([]map[string]interface{}) {
- tx.SQL(` insert into feed(pastureid,feedcode,fname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS
- WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)
- ON DUPLICATE KEY UPDATE feedcode = ?,fname = ? `, item["feedCode"], item["feedName"], item["feedCode"], item["feedName"]).Execute()
- }
- } else if _, ok := data["barData"]; ok {
- for _, item := range data["barData"].([]map[string]interface{}) {
- tx.SQL(` insert into bar(pastureid,bcode,bname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS
- WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)
- ON DUPLICATE KEY UPDATE bcode = ?,bname = ? `, item["barCode"], item["barName"], item["barCode"], item["barName"]).Execute()
- }
- }
- }
|