| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461 | 
							- package api
 
- import (
 
- 	"bytes"
 
- 	"crypto/hmac"
 
- 	"crypto/sha1"
 
- 	"crypto/tls"
 
- 	"crypto/x509"
 
- 	"encoding/json"
 
- 	"fmt"
 
- 	"io/ioutil"
 
- 	"os"
 
- 	"strconv"
 
- 	"time"
 
- 	"tmr-watch/conf/setting"
 
- 	"tmr-watch/http/handle/restful"
 
- 	"tmr-watch/pkg/logging"
 
- 	"github.com/astaxie/beego/logs"
 
- 	MQTT "github.com/eclipse/paho.mqtt.golang"
 
- 	"github.com/robfig/cron"
 
- )
 
- func InitMqttClient() {
 
- 	if setting.YynserverSetting.FarmId != "" {
 
- 		c, pubTopic := MqttClient()
 
- 		deviceHeartbeat(c, pubTopic)
 
- 		mqttCron := cron.New()
 
- 		mqttCron.AddFunc("10 06 * * *", func() {
 
- 			feedtempletPush(c, pubTopic)
 
- 			stirPush(c, pubTopic)
 
- 			dustingPush(c, pubTopic)
 
- 			equipmentAccuracyPush(c, pubTopic)
 
- 			finishedWeightPush(c, pubTopic)
 
- 			CompletedTrainNumberPush(c, pubTopic)
 
- 		})
 
- 		mqttCron.Start()
 
- 	}
 
- }
 
- func MqttClient() (MQTT.Client, string) {
 
- 	// set the device info, include product key, device name, and device secret
 
- 	// var productKey string = "a1NmXfrjL8M"
 
- 	// var deviceName string = "4776_p_breed"
 
- 	// var deviceSecret string = "c2591b89adff22e1c9f0fc03363f56a4"
 
- 	// 	ProductKey
 
- 	// DeviceName
 
- 	// DeviceSecret
 
- 	var productKey string = setting.YynserverSetting.ProductKey
 
- 	var deviceName string = setting.YynserverSetting.DeviceName
 
- 	var deviceSecret string = setting.YynserverSetting.DeviceSecret
 
- 	// set timestamp, clientid, subscribe topic and publish topic
 
- 	var timeStamp string = strconv.FormatInt(time.Now().UnixNano(), 10)
 
- 	var clientId string = "go_device_id_0001"
 
- 	var subTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/get"
 
- 	var pubTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/post"
 
- 	// set the login broker url
 
- 	var raw_broker bytes.Buffer
 
- 	raw_broker.WriteString("tls://")
 
- 	raw_broker.WriteString(productKey)
 
- 	raw_broker.WriteString(".iot-as-mqtt.cn-shanghai.aliyuncs.com:1883")
 
- 	opts := MQTT.NewClientOptions().AddBroker(raw_broker.String())
 
- 	// calculate the login auth info, and set it into the connection options
 
- 	auth := calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp)
 
- 	opts.SetClientID(auth.mqttClientId)
 
- 	opts.SetUsername(auth.username)
 
- 	opts.SetPassword(auth.password)
 
- 	opts.SetMaxReconnectInterval(1 * time.Second)
 
- 	opts.AutoReconnect = true
 
- 	// opts.SetKeepAlive(60 * 2 * time.Second)
 
- 	opts.OnConnect = func(c MQTT.Client) {
 
- 		if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil {
 
- 			logging.Error("mqtt Subscribe err: ", token.Error())
 
- 			os.Exit(1)
 
- 		}
 
- 	}
 
- 	// create and start a client using the above ClientOptions
 
- 	c := MQTT.NewClient(opts)
 
- 	if token := c.Connect(); token.Wait() && token.Error() != nil {
 
- 		logging.Error("mqtt Connect err: ", token.Error())
 
- 	}
 
- 	fmt.Print("Connect aliyun IoT Cloud Sucess\n")
 
- 	// if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil {
 
- 	// 	fmt.Println(token.Error())
 
- 	// 	os.Exit(1)
 
- 	// }
 
- 	// subscribe to subTopic("/a1Zd7n5yTt8/deng/user/get") and request messages to be delivered
 
- 	// fmt.Println("Subscribe topic "+subTopic+" success\n", c.IsConnected())
 
- 	// publish 5 messages to pubTopic("/a1Zd7n5yTt8/deng/user/update")
 
- 	// for i := 0; i < 50; i++ {
 
- 	// 	fmt.Println("publish msg:", i)
 
- 	// 	text := fmt.Sprintf("ABC #%d", i)
 
- 	// 	token := c.Publish(pubTopic, 0, false, text)
 
- 	// 	fmt.Println("publish msg: ", text)
 
- 	// 	token.Wait()
 
- 	// 	time.Sleep(2 * time.Second)
 
- 	// }
 
- 	// unsubscribe from subTopic("/a1Zd7n5yTt8/deng/user/get")
 
- 	// if token := c.Unsubscribe(subTopic); token.Wait() && token.Error() != nil {
 
- 	// 	fmt.Println(token.Error())
 
- 	// 	os.Exit(1)
 
- 	// }
 
- 	// c.Disconnect(250)
 
- 	return c, pubTopic
 
- }
 
- func NewTLSConfig() *tls.Config {
 
- 	// Import trusted certificates from CAfile.pem.
 
- 	// Alternatively, manually add CA certificates to default openssl CA bundle.
 
- 	certpool := x509.NewCertPool()
 
- 	pemCerts, err := ioutil.ReadFile("./x509/root.pem")
 
- 	if err != nil {
 
- 		fmt.Println("0. read file error, game over!!")
 
- 	}
 
- 	certpool.AppendCertsFromPEM(pemCerts)
 
- 	// Create tls.Config with desired tls properties
 
- 	return &tls.Config{
 
- 		// RootCAs = certs used to verify server cert.
 
- 		RootCAs: certpool,
 
- 		// ClientAuth = whether to request cert from server.
 
- 		// Since the server is set up for SSL, this happens
 
- 		// anyways.
 
- 		ClientAuth: tls.NoClientCert,
 
- 		// ClientCAs = certs used to validate client cert.
 
- 		ClientCAs: nil,
 
- 		// InsecureSkipVerify = verify that cert contents
 
- 		// match server. IP matches what is in cert etc.
 
- 		InsecureSkipVerify: false,
 
- 		// Certificates = list of certs client sends to server.
 
- 		// Certificates: []tls.Certificate{cert},
 
- 	}
 
- }
 
- // define a function for the default message handler
 
- var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
 
- 	fmt.Printf("TOPIC: %s\n", msg.Topic())
 
- 	fmt.Printf("MSG: %s\n", msg.Payload())
 
- }
 
- type AuthInfo struct {
 
- 	password, username, mqttClientId string
 
- }
 
- func calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp string) AuthInfo {
 
- 	var raw_passwd bytes.Buffer
 
- 	raw_passwd.WriteString("clientId" + clientId)
 
- 	raw_passwd.WriteString("deviceName")
 
- 	raw_passwd.WriteString(deviceName)
 
- 	raw_passwd.WriteString("productKey")
 
- 	raw_passwd.WriteString(productKey)
 
- 	raw_passwd.WriteString("timestamp")
 
- 	raw_passwd.WriteString(timeStamp)
 
- 	fmt.Println(raw_passwd.String())
 
- 	// hmac, use sha1
 
- 	mac := hmac.New(sha1.New, []byte(deviceSecret))
 
- 	mac.Write([]byte(raw_passwd.String()))
 
- 	password := fmt.Sprintf("%02x", mac.Sum(nil))
 
- 	fmt.Println(password)
 
- 	username := deviceName + "&" + productKey
 
- 	var MQTTClientId bytes.Buffer
 
- 	MQTTClientId.WriteString(clientId)
 
- 	// hmac, use sha1; securemode=2 means TLS connection
 
- 	MQTTClientId.WriteString("|securemode=2,_v=paho-go-1.0.0,signmethod=hmacsha1,timestamp=")
 
- 	MQTTClientId.WriteString(timeStamp)
 
- 	MQTTClientId.WriteString("|")
 
- 	auth := AuthInfo{password: password, username: username, mqttClientId: MQTTClientId.String()}
 
- 	return auth
 
- }
 
- func feedtempletPush(c MQTT.Client, pubTopic string) {
 
- 	tx := restful.Engine.NewSession()
 
- 	defer tx.Close()
 
- 	dataList, err := tx.SQL(`select f.id  as recipeId , f.tname recipeName,ft.id ingId,ft.fname ingName,ft.fweight  afQty,
 
- 	ft.sort mixNo,fd.allowratio allowableError ,fd.fclass ingType, ft.fweight * ( fd.dry /100 )  dmQty,null recipeCost 
 
- 	 from feedtemplet  f   join ftdetail ft  on ft.ftid = f.id  join feed fd on fd.id = ft.fid	
 
- 	`).Query().List()
 
- 	if err != nil {
 
- 		logs.Error("feedtempletPush-error-1:", err)
 
- 		return
 
- 	}
 
- 	pushStr := `{
 
- 		"apiId": "getKPTData", 
 
- 		"param": {
 
- 		"farmId": %s, 
 
- 				  "method":"getfeedtempletinfo",
 
- 				  "rowCount": "1",
 
- 		"resultData":%s
 
- 		 }
 
- 	}`
 
- 	if len(dataList) > 0 {
 
- 		b, _ := json.Marshal(dataList)
 
- 		pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
 
- 		// c.Publish(pubTopic, 2, false, pushStr)
 
- 		token := c.Publish(pubTopic, 2, false, pushStr)
 
- 		fmt.Println("publish msg: ", pushStr, token.Error())
 
- 		// token.Wait()
 
- 		// time.Sleep(2 * time.Second)
 
- 	}
 
- }
 
- func stirPush(c MQTT.Client, pubTopic string) {
 
- 	tx := restful.Engine.NewSession()
 
- 	defer tx.Close()
 
- 	dataList, err := tx.SQL(`SELECT
 
- 	d.mydate dropDate,
 
- 	d.projname tmrNo,
 
- 	d.times loadShift,
 
- 	d.tempid recipeId,
 
- 	d.templetname recipeName,
 
- 	f.feedcode ingId,
 
- 	d1.fname ingName,
 
- 	f.fclass ingType,
 
- 	f.dry dmPct,
 
- 	d1.sort mixNo,
 
- 	d1.feedallowratio allowableError,
 
- 	d1.lweight expWeight,
 
- 	d1.actualweightminus actualWeight,
 
- 	d1.begintime startTime,
 
- 	d1.intime endTime 
 
- FROM
 
- 	downloadedplan d
 
- 	JOIN downloadplandtl1 d1 ON d1.pid = d.id
 
- 	JOIN feed f ON f.feedcode = d1.feedcode 
 
- 	AND f.pastureid = d.pastureid 
 
- WHERE
 
- 	DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List()
 
- 	if err != nil {
 
- 		logs.Error("feedtempletPush-error-1:", err)
 
- 		return
 
- 	}
 
- 	pushStr := `{
 
- 		"apiId": "getKPTData", 
 
- 	   "param": {
 
- 	   "farmId": %s, 
 
- 				 "method":"uploadadddata",
 
- 				"rowCount": "1",
 
- 	   "resultData":%s
 
- 		}
 
- 	   }`
 
- 	if len(dataList) > 0 {
 
- 		b, _ := json.Marshal(dataList)
 
- 		pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
 
- 		token := c.Publish(pubTopic, 2, false, pushStr)
 
- 		fmt.Println("publish msg: ", pushStr, token.Error())
 
- 		// token.Wait()
 
- 		// time.Sleep(2 * time.Second)
 
- 	}
 
- }
 
- // 撒料信息
 
- func dustingPush(c MQTT.Client, pubTopic string) {
 
- 	tx := restful.Engine.NewSession()
 
- 	defer tx.Close()
 
- 	dataList, err := tx.SQL(`SELECT
 
- 	d.mydate dropDate,
 
- 	d.projname tmrNo,
 
- 	d.times loadShift,
 
-   d2.fbarid  penId,
 
-   b.bcode penName,
 
- 	d2.cowcount cowCount,
 
- 	d2.sort feedingNo,
 
- 	d2.lweight expWeight,
 
- 	d2.actualweightminus actualWeight,
 
- 	d2.begintime startTime,
 
- 	d2.intime endTime
 
- FROM
 
- 	downloadedplan d
 
- 	JOIN downloadplandtl2 d2 ON d2.pid = d.id
 
- 	JOIN bar b ON b.id = d2.fbarid 
 
- WHERE
 
- 	DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List()
 
- 	if err != nil {
 
- 		logs.Error("feedtempletPush-error-1:", err)
 
- 		return
 
- 	}
 
- 	pushStr := `{
 
- 		"apiId": "getKPTData", 
 
- 		"param": {
 
- 		"farmId": %s, 
 
- 				  "method":"uploaddiliverdata",
 
- 				  "rowCount": "1",
 
- 		"resultData":%s
 
- 		 }
 
- 		}`
 
- 	if len(dataList) > 0 {
 
- 		b, _ := json.Marshal(dataList)
 
- 		pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
 
- 		token := c.Publish(pubTopic, 2, false, pushStr)
 
- 		fmt.Println("publish msg: ", pushStr, token.Error())
 
- 		// token.Wait()
 
- 		// time.Sleep(2 * time.Second)
 
- 	}
 
- }
 
- //设备心跳
 
- func deviceHeartbeat(c MQTT.Client, pubTopic string) {
 
- 	pushStr := fmt.Sprintf(`{"data_collect_number":%s,"status":true,"model_type":"heartbeat"}`, setting.YynserverSetting.HeartBeat)
 
- 	token := c.Publish(pubTopic, 2, false, pushStr)
 
- 	fmt.Println("publish msg: ", pushStr, token.Error())
 
- 	// token.Wait()
 
- 	// go func() {
 
- 	duetimecst2, _ := time.ParseInLocation("15:04:05", "00:01:00", time.Local)
 
- 	duetimecst3, _ := time.ParseInLocation("15:04:05", "00:00:00", time.Local)
 
- 	spec1 := fmt.Sprintf("@every %v", duetimecst2.Sub(duetimecst3))
 
- 	// for {
 
- 	device := cron.New()
 
- 	device.AddFunc(spec1, func() {
 
- 		token := c.Publish(pubTopic, 2, false, pushStr)
 
- 		fmt.Println("publish msg: ", pushStr, token.Error())
 
- 		// token.Wait()
 
- 	})
 
- 	// }
 
- 	device.Start()
 
- 	// }()
 
- }
 
- // 准确率
 
- func equipmentAccuracyPush(c MQTT.Client, pubTopic string) {
 
- 	tx := restful.Engine.NewSession()
 
- 	defer tx.Close()
 
- 	dataList, err := tx.SQL(`select t.tname tname,1-abs(sum(d1.actualweightminus)- sum(d1.lweight))/sum(d1.lweight) rate ,d.mydate rateDate  from downloadedplan d join downloadplandtl1 d1 on d1.pid = d.id  join tmr t on t.datacaptureno = d.datacaptureno 
 
- 	where DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )
 
- 	 group by d.datacaptureno`).Query().List()
 
- 	if err != nil {
 
- 		logs.Error("feedtempletPush-error-1:", err)
 
- 		return
 
- 	}
 
- 	pushStr := `{
 
- 		"apiId": "getKPTData",
 
- 		"param": {
 
- 			"resultData": %s,
 
- 			"farmId": %s,         
 
- 			"method": "uploadrate",          
 
- 			"rowCount": %d                  
 
- 		}
 
- 	}`
 
- 	if len(dataList) > 0 {
 
- 		b, _ := json.Marshal(dataList)
 
- 		pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId, len(dataList))
 
- 		token := c.Publish(pubTopic, 2, false, pushStr)
 
- 		fmt.Println("publish msg: ", pushStr, token.Error())
 
- 		// token.Wait()
 
- 		// time.Sleep(2 * time.Second)
 
- 	}
 
- }
 
- // 完成重量
 
- func finishedWeightPush(c MQTT.Client, pubTopic string) {
 
- 	tx := restful.Engine.NewSession()
 
- 	defer tx.Close()
 
- 	dataList, err := tx.SQL(`select  sum(d1.actualweightminus) CompleteWeught,sum(d1.lweight) planWeight,d.mydate weightDate 
 
- 	 from downloadedplan d join downloadplandtl1 d1 on d1.pid = d.id  join tmr t on t.datacaptureno = d.datacaptureno 
 
- 	where DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' )`).Query().List()
 
- 	if err != nil {
 
- 		logs.Error("feedtempletPush-error-1:", err)
 
- 		return
 
- 	}
 
- 	pushStr := `{
 
- 		"apiId": "getKPTData",
 
- 		"param": {
 
- 			"resultData": %s,
 
- 			"farmId": %s,          
 
- 			"method": "uploadweight",
 
- 			"rowCount": "1"                 
 
- 		}
 
- 	}`
 
- 	if len(dataList) > 0 {
 
- 		b, _ := json.Marshal(dataList)
 
- 		pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
 
- 		token := c.Publish(pubTopic, 2, false, pushStr)
 
- 		fmt.Println("publish msg: ", pushStr, token.Error())
 
- 		// token.Wait()
 
- 		// time.Sleep(2 * time.Second)
 
- 	}
 
- }
 
- // 完成车次
 
- func CompletedTrainNumberPush(c MQTT.Client, pubTopic string) {
 
- 	tx := restful.Engine.NewSession()
 
- 	defer tx.Close()
 
- 	dataList, err := tx.SQL(`select (select count(1) from downloadedplan  where  DATE_FORMAT(mydate ,'%Y-%m-%d' ) =  DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' )  )  planCar, 
 
- 	(select count(1) from downloadedplan  where  DATE_FORMAT(mydate ,'%Y-%m-%d' ) =  DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) and iscompleted = 1 ) CompleteCar ,
 
- 	DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) carDate
 
- 	`).Query().List()
 
- 	if err != nil {
 
- 		logs.Error("feedtempletPush-error-1:", err)
 
- 		return
 
- 	}
 
- 	pushStr := `{
 
- 		"apiId": "getKPTData",
 
- 		"param": {
 
- 			"resultData": %s,
 
- 			"farmId": %s,       
 
- 			"method": "uploadcarnumber",
 
- 			"rowCount": "1"             
 
- 		}
 
- 	}`
 
- 	if len(dataList) > 0 {
 
- 		b, _ := json.Marshal(dataList)
 
- 		pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
 
- 		token := c.Publish(pubTopic, 2, false, pushStr)
 
- 		fmt.Println("publish msg: ", pushStr, token.Error())
 
- 		// token.Wait()
 
- 		// time.Sleep(2 * time.Second)
 
- 	}
 
- }
 
- func feedHeatwatch(client MQTT.Client, msg MQTT.Message) {
 
- 	tx := restful.Engine.NewSession()
 
- 	defer tx.Close()
 
- 	data := make(map[string]interface{})
 
- 	json.Unmarshal(msg.Payload(), &data)
 
- 	if _, ok := data["feedData"]; ok {
 
- 		for _, item := range data["feedData"].([]map[string]interface{}) {
 
- 			tx.SQL(` insert into feed(pastureid,feedcode,fname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS 
 
- 			WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)
 
- 			 ON DUPLICATE KEY UPDATE feedcode = ?,fname = ? `, item["feedCode"], item["feedName"], item["feedCode"], item["feedName"]).Execute()
 
- 		}
 
- 	} else if _, ok := data["barData"]; ok {
 
- 		for _, item := range data["barData"].([]map[string]interface{}) {
 
- 			tx.SQL(` insert into bar(pastureid,bcode,bname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS 
 
- 				WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)
 
- 				 ON DUPLICATE KEY UPDATE bcode = ?,bname = ? `, item["barCode"], item["barName"], item["barCode"], item["barName"]).Execute()
 
- 		}
 
- 	}
 
- }
 
 
  |