123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461 |
- package api
- import (
- "bytes"
- "crypto/hmac"
- "crypto/sha1"
- "crypto/tls"
- "crypto/x509"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "os"
- "strconv"
- "time"
- "tmr-watch/conf/setting"
- "tmr-watch/http/handle/restful"
- "tmr-watch/pkg/logging"
- "github.com/astaxie/beego/logs"
- MQTT "github.com/eclipse/paho.mqtt.golang"
- "github.com/robfig/cron"
- )
- func InitMqttClient() {
- if setting.YynserverSetting.FarmId != "" {
- c, pubTopic := MqttClient()
- deviceHeartbeat(c, pubTopic)
- mqttCron := cron.New()
- mqttCron.AddFunc("10 06 * * *", func() {
- feedtempletPush(c, pubTopic)
- stirPush(c, pubTopic)
- dustingPush(c, pubTopic)
- equipmentAccuracyPush(c, pubTopic)
- finishedWeightPush(c, pubTopic)
- CompletedTrainNumberPush(c, pubTopic)
- })
- mqttCron.Start()
- }
- }
- func MqttClient() (MQTT.Client, string) {
-
-
-
-
-
-
-
- var productKey string = setting.YynserverSetting.ProductKey
- var deviceName string = setting.YynserverSetting.DeviceName
- var deviceSecret string = setting.YynserverSetting.DeviceSecret
-
- var timeStamp string = strconv.FormatInt(time.Now().UnixNano(), 10)
- var clientId string = "go_device_id_0001"
- var subTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/get"
- var pubTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/post"
-
- var raw_broker bytes.Buffer
- raw_broker.WriteString("tls://")
- raw_broker.WriteString(productKey)
- raw_broker.WriteString(".iot-as-mqtt.cn-shanghai.aliyuncs.com:1883")
- opts := MQTT.NewClientOptions().AddBroker(raw_broker.String())
-
- auth := calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp)
- opts.SetClientID(auth.mqttClientId)
- opts.SetUsername(auth.username)
- opts.SetPassword(auth.password)
- opts.SetMaxReconnectInterval(1 * time.Second)
- opts.AutoReconnect = true
-
- opts.OnConnect = func(c MQTT.Client) {
- if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil {
- logging.Error("mqtt Subscribe err: ", token.Error())
- os.Exit(1)
- }
- }
-
- c := MQTT.NewClient(opts)
- if token := c.Connect(); token.Wait() && token.Error() != nil {
- logging.Error("mqtt Connect err: ", token.Error())
- }
- fmt.Print("Connect aliyun IoT Cloud Sucess\n")
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- return c, pubTopic
- }
- func NewTLSConfig() *tls.Config {
-
-
- certpool := x509.NewCertPool()
- pemCerts, err := ioutil.ReadFile("./x509/root.pem")
- if err != nil {
- fmt.Println("0. read file error, game over!!")
- }
- certpool.AppendCertsFromPEM(pemCerts)
-
- return &tls.Config{
-
- RootCAs: certpool,
-
-
-
- ClientAuth: tls.NoClientCert,
-
- ClientCAs: nil,
-
-
- InsecureSkipVerify: false,
-
-
- }
- }
- var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
- fmt.Printf("TOPIC: %s\n", msg.Topic())
- fmt.Printf("MSG: %s\n", msg.Payload())
- }
- type AuthInfo struct {
- password, username, mqttClientId string
- }
- func calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp string) AuthInfo {
- var raw_passwd bytes.Buffer
- raw_passwd.WriteString("clientId" + clientId)
- raw_passwd.WriteString("deviceName")
- raw_passwd.WriteString(deviceName)
- raw_passwd.WriteString("productKey")
- raw_passwd.WriteString(productKey)
- raw_passwd.WriteString("timestamp")
- raw_passwd.WriteString(timeStamp)
- fmt.Println(raw_passwd.String())
-
- mac := hmac.New(sha1.New, []byte(deviceSecret))
- mac.Write([]byte(raw_passwd.String()))
- password := fmt.Sprintf("%02x", mac.Sum(nil))
- fmt.Println(password)
- username := deviceName + "&" + productKey
- var MQTTClientId bytes.Buffer
- MQTTClientId.WriteString(clientId)
-
- MQTTClientId.WriteString("|securemode=2,_v=paho-go-1.0.0,signmethod=hmacsha1,timestamp=")
- MQTTClientId.WriteString(timeStamp)
- MQTTClientId.WriteString("|")
- auth := AuthInfo{password: password, username: username, mqttClientId: MQTTClientId.String()}
- return auth
- }
- func feedtempletPush(c MQTT.Client, pubTopic string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`select f.id as recipeId , f.tname recipeName,ft.id ingId,ft.fname ingName,ft.fweight afQty,
- ft.sort mixNo,fd.allowratio allowableError ,fd.fclass ingType, ft.fweight * ( fd.dry /100 ) dmQty,null recipeCost
- from feedtemplet f join ftdetail ft on ft.ftid = f.id join feed fd on fd.id = ft.fid
- `).Query().List()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "farmId": %s,
- "method":"getfeedtempletinfo",
- "rowCount": "1",
- "resultData":%s
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
-
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
-
-
- }
- }
- func stirPush(c MQTT.Client, pubTopic string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`SELECT
- d.mydate dropDate,
- d.projname tmrNo,
- d.times loadShift,
- d.tempid recipeId,
- d.templetname recipeName,
- f.feedcode ingId,
- d1.fname ingName,
- f.fclass ingType,
- f.dry dmPct,
- d1.sort mixNo,
- d1.feedallowratio allowableError,
- d1.lweight expWeight,
- d1.actualweightminus actualWeight,
- d1.begintime startTime,
- d1.intime endTime
- FROM
- downloadedplan d
- JOIN downloadplandtl1 d1 ON d1.pid = d.id
- JOIN feed f ON f.feedcode = d1.feedcode
- AND f.pastureid = d.pastureid
- WHERE
- DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "farmId": %s,
- "method":"uploadadddata",
- "rowCount": "1",
- "resultData":%s
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
-
-
- }
- }
- func dustingPush(c MQTT.Client, pubTopic string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`SELECT
- d.mydate dropDate,
- d.projname tmrNo,
- d.times loadShift,
- d2.fbarid penId,
- b.bcode penName,
- d2.cowcount cowCount,
- d2.sort feedingNo,
- d2.lweight expWeight,
- d2.actualweightminus actualWeight,
- d2.begintime startTime,
- d2.intime endTime
- FROM
- downloadedplan d
- JOIN downloadplandtl2 d2 ON d2.pid = d.id
- JOIN bar b ON b.id = d2.fbarid
- WHERE
- DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "farmId": %s,
- "method":"uploaddiliverdata",
- "rowCount": "1",
- "resultData":%s
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
-
-
- }
- }
- func deviceHeartbeat(c MQTT.Client, pubTopic string) {
- pushStr := fmt.Sprintf(`{"data_collect_number":%s,"status":true,"model_type":"heartbeat"}`, setting.YynserverSetting.HeartBeat)
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
-
-
- duetimecst2, _ := time.ParseInLocation("15:04:05", "00:01:00", time.Local)
- duetimecst3, _ := time.ParseInLocation("15:04:05", "00:00:00", time.Local)
- spec1 := fmt.Sprintf("@every %v", duetimecst2.Sub(duetimecst3))
-
- device := cron.New()
- device.AddFunc(spec1, func() {
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
-
- })
-
- device.Start()
-
- }
- func equipmentAccuracyPush(c MQTT.Client, pubTopic string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`select t.tname tname,1-abs(sum(d1.actualweightminus)- sum(d1.lweight))/sum(d1.lweight) rate ,d.mydate rateDate from downloadedplan d join downloadplandtl1 d1 on d1.pid = d.id join tmr t on t.datacaptureno = d.datacaptureno
- where DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )
- group by d.datacaptureno`).Query().List()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "resultData": %s,
- "farmId": %s,
- "method": "uploadrate",
- "rowCount": %d
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId, len(dataList))
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
-
-
- }
- }
- func finishedWeightPush(c MQTT.Client, pubTopic string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`select sum(d1.actualweightminus) CompleteWeught,sum(d1.lweight) planWeight,d.mydate weightDate
- from downloadedplan d join downloadplandtl1 d1 on d1.pid = d.id join tmr t on t.datacaptureno = d.datacaptureno
- where DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' )`).Query().List()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "resultData": %s,
- "farmId": %s,
- "method": "uploadweight",
- "rowCount": "1"
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
-
-
- }
- }
- func CompletedTrainNumberPush(c MQTT.Client, pubTopic string) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- dataList, err := tx.SQL(`select (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) ) planCar,
- (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) and iscompleted = 1 ) CompleteCar ,
- DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) carDate
- `).Query().List()
- if err != nil {
- logs.Error("feedtempletPush-error-1:", err)
- return
- }
- pushStr := `{
- "apiId": "getKPTData",
- "param": {
- "resultData": %s,
- "farmId": %s,
- "method": "uploadcarnumber",
- "rowCount": "1"
- }
- }`
- if len(dataList) > 0 {
- b, _ := json.Marshal(dataList)
- pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
- token := c.Publish(pubTopic, 2, false, pushStr)
- fmt.Println("publish msg: ", pushStr, token.Error())
-
-
- }
- }
- func feedHeatwatch(client MQTT.Client, msg MQTT.Message) {
- tx := restful.Engine.NewSession()
- defer tx.Close()
- data := make(map[string]interface{})
- json.Unmarshal(msg.Payload(), &data)
- if _, ok := data["feedData"]; ok {
- for _, item := range data["feedData"].([]map[string]interface{}) {
- tx.SQL(` insert into feed(pastureid,feedcode,fname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS
- WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)
- ON DUPLICATE KEY UPDATE feedcode = ?,fname = ? `, item["feedCode"], item["feedName"], item["feedCode"], item["feedName"]).Execute()
- }
- } else if _, ok := data["barData"]; ok {
- for _, item := range data["barData"].([]map[string]interface{}) {
- tx.SQL(` insert into bar(pastureid,bcode,bname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS
- WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)
- ON DUPLICATE KEY UPDATE bcode = ?,bname = ? `, item["barCode"], item["barName"], item["barCode"], item["barName"]).Execute()
- }
- }
- }
|