|| package apiimport (	"bytes"	"crypto/hmac"	"crypto/sha1"	"crypto/tls"	"crypto/x509"	"encoding/json"	"fmt"	"io/ioutil"	"os"	"strconv"	"time"	"tmr-watch/conf/setting"	"tmr-watch/http/handle/restful"	"tmr-watch/pkg/logging"	"github.com/astaxie/beego/logs"	MQTT "github.com/eclipse/paho.mqtt.golang"	"github.com/robfig/cron")func InitMqttClient() {	if setting.YynserverSetting.FarmId != "" {		c, pubTopic := MqttClient()		deviceHeartbeat(c, pubTopic)		mqttCron := cron.New()		mqttCron.AddFunc("10 06 * * *", func() {			feedtempletPush(c, pubTopic)			stirPush(c, pubTopic)			dustingPush(c, pubTopic)			equipmentAccuracyPush(c, pubTopic)			finishedWeightPush(c, pubTopic)			CompletedTrainNumberPush(c, pubTopic)		})		mqttCron.Start()	}}func MqttClient() (MQTT.Client, string) {	// set the device info, include product key, device name, and device secret	// var productKey string = "a1NmXfrjL8M"	// var deviceName string = "4776_p_breed"	// var deviceSecret string = "c2591b89adff22e1c9f0fc03363f56a4"	// 	ProductKey	// DeviceName	// DeviceSecret	var productKey string = setting.YynserverSetting.ProductKey	var deviceName string = setting.YynserverSetting.DeviceName	var deviceSecret string = setting.YynserverSetting.DeviceSecret	// set timestamp, clientid, subscribe topic and publish topic	var timeStamp string = strconv.FormatInt(time.Now().UnixNano(), 10)	var clientId string = "go_device_id_0001"	var subTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/get"	var pubTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/post"	// set the login broker url	var raw_broker bytes.Buffer	raw_broker.WriteString("tls://")	raw_broker.WriteString(productKey)	raw_broker.WriteString(".iot-as-mqtt.cn-shanghai.aliyuncs.com:1883")	opts := MQTT.NewClientOptions().AddBroker(raw_broker.String())	// calculate the login auth info, and set it into the connection options	auth := calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp)	opts.SetClientID(auth.mqttClientId)	opts.SetUsername(auth.username)	opts.SetPassword(auth.password)	opts.SetMaxReconnectInterval(1 * time.Second)	opts.AutoReconnect = true	// opts.SetKeepAlive(60 * 2 * time.Second)	opts.OnConnect = func(c MQTT.Client) {		if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil {			logging.Error("mqtt Subscribe err: ", token.Error())			os.Exit(1)		}	}	// create and start a client using the above ClientOptions	c := MQTT.NewClient(opts)	if token := c.Connect(); token.Wait() && token.Error() != nil {		logging.Error("mqtt Connect err: ", token.Error())	}	fmt.Print("Connect aliyun IoT Cloud Sucess\n")	// if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil {	// 	fmt.Println(token.Error())	// 	os.Exit(1)	// }	// subscribe to subTopic("/a1Zd7n5yTt8/deng/user/get") and request messages to be delivered	// fmt.Println("Subscribe topic "+subTopic+" success\n", c.IsConnected())	// publish 5 messages to pubTopic("/a1Zd7n5yTt8/deng/user/update")	// for i := 0; i < 50; i++ {	// 	fmt.Println("publish msg:", i)	// 	text := fmt.Sprintf("ABC #%d", i)	// 	token := c.Publish(pubTopic, 0, false, text)	// 	fmt.Println("publish msg: ", text)	// 	token.Wait()	// 	time.Sleep(2 * time.Second)	// }	// unsubscribe from subTopic("/a1Zd7n5yTt8/deng/user/get")	// if token := c.Unsubscribe(subTopic); token.Wait() && token.Error() != nil {	// 	fmt.Println(token.Error())	// 	os.Exit(1)	// }	// c.Disconnect(250)	return c, pubTopic}func NewTLSConfig() *tls.Config {	// Import trusted certificates from CAfile.pem.	// Alternatively, manually add CA certificates to default openssl CA bundle.	certpool := x509.NewCertPool()	pemCerts, err := ioutil.ReadFile("./x509/root.pem")	if err != nil {		fmt.Println("0. read file error, game over!!")	}	certpool.AppendCertsFromPEM(pemCerts)	// Create tls.Config with desired tls properties	return &tls.Config{		// RootCAs = certs used to verify server cert.		RootCAs: certpool,		// ClientAuth = whether to request cert from server.		// Since the server is set up for SSL, this happens		// anyways.		ClientAuth: tls.NoClientCert,		// ClientCAs = certs used to validate client cert.		ClientCAs: nil,		// InsecureSkipVerify = verify that cert contents		// match server. IP matches what is in cert etc.		InsecureSkipVerify: false,		// Certificates = list of certs client sends to server.		// Certificates: []tls.Certificate{cert},	}}// define a function for the default message handlervar f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {	fmt.Printf("TOPIC: %s\n", msg.Topic())	fmt.Printf("MSG: %s\n", msg.Payload())}type AuthInfo struct {	password, username, mqttClientId string}func calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp string) AuthInfo {	var raw_passwd bytes.Buffer	raw_passwd.WriteString("clientId" + clientId)	raw_passwd.WriteString("deviceName")	raw_passwd.WriteString(deviceName)	raw_passwd.WriteString("productKey")	raw_passwd.WriteString(productKey)	raw_passwd.WriteString("timestamp")	raw_passwd.WriteString(timeStamp)	fmt.Println(raw_passwd.String())	// hmac, use sha1	mac := hmac.New(sha1.New, []byte(deviceSecret))	mac.Write([]byte(raw_passwd.String()))	password := fmt.Sprintf("%02x", mac.Sum(nil))	fmt.Println(password)	username := deviceName + "&" + productKey	var MQTTClientId bytes.Buffer	MQTTClientId.WriteString(clientId)	// hmac, use sha1; securemode=2 means TLS connection	MQTTClientId.WriteString("|securemode=2,_v=paho-go-1.0.0,signmethod=hmacsha1,timestamp=")	MQTTClientId.WriteString(timeStamp)	MQTTClientId.WriteString("|")	auth := AuthInfo{password: password, username: username, mqttClientId: MQTTClientId.String()}	return auth}func feedtempletPush(c MQTT.Client, pubTopic string) {	tx := restful.Engine.NewSession()	defer tx.Close()	dataList, err := tx.SQL(`select f.id  as recipeId , f.tname recipeName,ft.id ingId,ft.fname ingName,ft.fweight  afQty,	ft.sort mixNo,fd.allowratio allowableError ,fd.fclass ingType, ft.fweight * ( fd.dry /100 )  dmQty,null recipeCost 	 from feedtemplet  f   join ftdetail ft  on ft.ftid = f.id  join feed fd on fd.id = ft.fid		`).Query().List()	if err != nil {		logs.Error("feedtempletPush-error-1:", err)		return	}	pushStr := `{		"apiId": "getKPTData", 		"param": {		"farmId": %s, 				  "method":"getfeedtempletinfo",				  "rowCount": "1",		"resultData":%s		 }	}`	if len(dataList) > 0 {		b, _ := json.Marshal(dataList)		pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))		// c.Publish(pubTopic, 2, false, pushStr)		token := c.Publish(pubTopic, 2, false, pushStr)		fmt.Println("publish msg: ", pushStr, token.Error())		// token.Wait()		// time.Sleep(2 * time.Second)	}}func stirPush(c MQTT.Client, pubTopic string) {	tx := restful.Engine.NewSession()	defer tx.Close()	dataList, err := tx.SQL(`SELECT	d.mydate dropDate,	d.projname tmrNo,	d.times loadShift,	d.tempid recipeId,	d.templetname recipeName,	f.feedcode ingId,	d1.fname ingName,	f.fclass ingType,	f.dry dmPct,	d1.sort mixNo,	d1.feedallowratio allowableError,	d1.lweight expWeight,	d1.actualweightminus actualWeight,	d1.begintime startTime,	d1.intime endTime FROM	downloadedplan d	JOIN downloadplandtl1 d1 ON d1.pid = d.id	JOIN feed f ON f.feedcode = d1.feedcode 	AND f.pastureid = d.pastureid WHERE	DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List()	if err != nil {		logs.Error("feedtempletPush-error-1:", err)		return	}	pushStr := `{		"apiId": "getKPTData", 	   "param": {	   "farmId": %s, 				 "method":"uploadadddata",				"rowCount": "1",	   "resultData":%s		}	   }`	if len(dataList) > 0 {		b, _ := json.Marshal(dataList)		pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))		token := c.Publish(pubTopic, 2, false, pushStr)		fmt.Println("publish msg: ", pushStr, token.Error())		// token.Wait()		// time.Sleep(2 * time.Second)	}}// 撒料信息func dustingPush(c MQTT.Client, pubTopic string) {	tx := restful.Engine.NewSession()	defer tx.Close()	dataList, err := tx.SQL(`SELECT	d.mydate dropDate,	d.projname tmrNo,	d.times loadShift,  d2.fbarid  penId,  b.bcode penName,	d2.cowcount cowCount,	d2.sort feedingNo,	d2.lweight expWeight,	d2.actualweightminus actualWeight,	d2.begintime startTime,	d2.intime endTimeFROM	downloadedplan d	JOIN downloadplandtl2 d2 ON d2.pid = d.id	JOIN bar b ON b.id = d2.fbarid WHERE	DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List()	if err != nil {		logs.Error("feedtempletPush-error-1:", err)		return	}	pushStr := `{		"apiId": "getKPTData", 		"param": {		"farmId": %s, 				  "method":"uploaddiliverdata",				  "rowCount": "1",		"resultData":%s		 }		}`	if len(dataList) > 0 {		b, _ := json.Marshal(dataList)		pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))		token := c.Publish(pubTopic, 2, false, pushStr)		fmt.Println("publish msg: ", pushStr, token.Error())		// token.Wait()		// time.Sleep(2 * time.Second)	}}//设备心跳func deviceHeartbeat(c MQTT.Client, pubTopic string) {	pushStr := fmt.Sprintf(`{"data_collect_number":%s,"status":true,"model_type":"heartbeat"}`, setting.YynserverSetting.HeartBeat)	token := c.Publish(pubTopic, 2, false, pushStr)	fmt.Println("publish msg: ", pushStr, token.Error())	// token.Wait()	// go func() {	duetimecst2, _ := time.ParseInLocation("15:04:05", "00:01:00", time.Local)	duetimecst3, _ := time.ParseInLocation("15:04:05", "00:00:00", time.Local)	spec1 := fmt.Sprintf("@every %v", duetimecst2.Sub(duetimecst3))	// for {	device := cron.New()	device.AddFunc(spec1, func() {		token := c.Publish(pubTopic, 2, false, pushStr)		fmt.Println("publish msg: ", pushStr, token.Error())		// token.Wait()	})	// }	device.Start()	// }()}// 准确率func equipmentAccuracyPush(c MQTT.Client, pubTopic string) {	tx := restful.Engine.NewSession()	defer tx.Close()	dataList, err := tx.SQL(`select t.tname tname,1-abs(sum(d1.actualweightminus)- sum(d1.lweight))/sum(d1.lweight) rate ,d.mydate rateDate  from downloadedplan d join downloadplandtl1 d1 on d1.pid = d.id  join tmr t on t.datacaptureno = d.datacaptureno 	where DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )	 group by d.datacaptureno`).Query().List()	if err != nil {		logs.Error("feedtempletPush-error-1:", err)		return	}	pushStr := `{		"apiId": "getKPTData",		"param": {			"resultData": %s,			"farmId": %s,         			"method": "uploadrate",          			"rowCount": %d                  		}	}`	if len(dataList) > 0 {		b, _ := json.Marshal(dataList)		pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId, len(dataList))		token := c.Publish(pubTopic, 2, false, pushStr)		fmt.Println("publish msg: ", pushStr, token.Error())		// token.Wait()		// time.Sleep(2 * time.Second)	}}// 完成重量func finishedWeightPush(c MQTT.Client, pubTopic string) {	tx := restful.Engine.NewSession()	defer tx.Close()	dataList, err := tx.SQL(`select  sum(d1.actualweightminus) CompleteWeught,sum(d1.lweight) planWeight,d.mydate weightDate 	 from downloadedplan d join downloadplandtl1 d1 on d1.pid = d.id  join tmr t on t.datacaptureno = d.datacaptureno 	where DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' )`).Query().List()	if err != nil {		logs.Error("feedtempletPush-error-1:", err)		return	}	pushStr := `{		"apiId": "getKPTData",		"param": {			"resultData": %s,			"farmId": %s,          			"method": "uploadweight",			"rowCount": "1"                 		}	}`	if len(dataList) > 0 {		b, _ := json.Marshal(dataList)		pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)		token := c.Publish(pubTopic, 2, false, pushStr)		fmt.Println("publish msg: ", pushStr, token.Error())		// token.Wait()		// time.Sleep(2 * time.Second)	}}// 完成车次func CompletedTrainNumberPush(c MQTT.Client, pubTopic string) {	tx := restful.Engine.NewSession()	defer tx.Close()	dataList, err := tx.SQL(`select (select count(1) from downloadedplan  where  DATE_FORMAT(mydate ,'%Y-%m-%d' ) =  DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' )  )  planCar, 	(select count(1) from downloadedplan  where  DATE_FORMAT(mydate ,'%Y-%m-%d' ) =  DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) and iscompleted = 1 ) CompleteCar ,	DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) carDate	`).Query().List()	if err != nil {		logs.Error("feedtempletPush-error-1:", err)		return	}	pushStr := `{		"apiId": "getKPTData",		"param": {			"resultData": %s,			"farmId": %s,       			"method": "uploadcarnumber",			"rowCount": "1"             		}	}`	if len(dataList) > 0 {		b, _ := json.Marshal(dataList)		pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)		token := c.Publish(pubTopic, 2, false, pushStr)		fmt.Println("publish msg: ", pushStr, token.Error())		// token.Wait()		// time.Sleep(2 * time.Second)	}}func feedHeatwatch(client MQTT.Client, msg MQTT.Message) {	tx := restful.Engine.NewSession()	defer tx.Close()	data := make(map[string]interface{})	json.Unmarshal(msg.Payload(), &data)	if _, ok := data["feedData"]; ok {		for _, item := range data["feedData"].([]map[string]interface{}) {			tx.SQL(` insert into feed(pastureid,feedcode,fname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS 			WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)			 ON DUPLICATE KEY UPDATE feedcode = ?,fname = ? `, item["feedCode"], item["feedName"], item["feedCode"], item["feedName"]).Execute()		}	} else if _, ok := data["barData"]; ok {		for _, item := range data["barData"].([]map[string]interface{}) {			tx.SQL(` insert into bar(pastureid,bcode,bname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS 				WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)				 ON DUPLICATE KEY UPDATE bcode = ?,bname = ? `, item["barCode"], item["barName"], item["barCode"], item["barName"]).Execute()		}	}}
 |