mqtt.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. package api
  2. import (
  3. "bytes"
  4. "crypto/hmac"
  5. "crypto/sha1"
  6. "crypto/tls"
  7. "crypto/x509"
  8. "encoding/json"
  9. "fmt"
  10. "io/ioutil"
  11. "os"
  12. "strconv"
  13. "time"
  14. "../../pkg/logging"
  15. "../../pkg/setting"
  16. "../../routers/restful"
  17. "github.com/astaxie/beego/logs"
  18. MQTT "github.com/eclipse/paho.mqtt.golang"
  19. "github.com/robfig/cron"
  20. )
  21. func InitMqttClient() {
  22. if setting.YynserverSetting.FarmId != "" {
  23. c, pubTopic := MqttClient()
  24. deviceHeartbeat(c, pubTopic)
  25. mqttCron := cron.New()
  26. mqttCron.AddFunc("10 06 * * *", func() {
  27. feedtempletPush(c, pubTopic)
  28. stirPush(c, pubTopic)
  29. dustingPush(c, pubTopic)
  30. equipmentAccuracyPush(c, pubTopic)
  31. finishedWeightPush(c, pubTopic)
  32. CompletedTrainNumberPush(c, pubTopic)
  33. })
  34. mqttCron.Start()
  35. }
  36. }
  37. func MqttClient() (MQTT.Client, string) {
  38. // set the device info, include product key, device name, and device secret
  39. // var productKey string = "a1NmXfrjL8M"
  40. // var deviceName string = "4776_p_breed"
  41. // var deviceSecret string = "c2591b89adff22e1c9f0fc03363f56a4"
  42. // ProductKey
  43. // DeviceName
  44. // DeviceSecret
  45. var productKey string = setting.YynserverSetting.ProductKey
  46. var deviceName string = setting.YynserverSetting.DeviceName
  47. var deviceSecret string = setting.YynserverSetting.DeviceSecret
  48. // set timestamp, clientid, subscribe topic and publish topic
  49. var timeStamp string = strconv.FormatInt(time.Now().UnixNano(), 10)
  50. var clientId string = "go_device_id_0001"
  51. var subTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/get"
  52. var pubTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/post"
  53. // set the login broker url
  54. var raw_broker bytes.Buffer
  55. raw_broker.WriteString("tls://")
  56. raw_broker.WriteString(productKey)
  57. raw_broker.WriteString(".iot-as-mqtt.cn-shanghai.aliyuncs.com:1883")
  58. opts := MQTT.NewClientOptions().AddBroker(raw_broker.String())
  59. // calculate the login auth info, and set it into the connection options
  60. auth := calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp)
  61. opts.SetClientID(auth.mqttClientId)
  62. opts.SetUsername(auth.username)
  63. opts.SetPassword(auth.password)
  64. opts.SetMaxReconnectInterval(1 * time.Second)
  65. opts.AutoReconnect = true
  66. // opts.SetKeepAlive(60 * 2 * time.Second)
  67. opts.OnConnect = func(c MQTT.Client) {
  68. if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil {
  69. logging.Error("mqtt Subscribe err: ", token.Error())
  70. os.Exit(1)
  71. }
  72. }
  73. // create and start a client using the above ClientOptions
  74. c := MQTT.NewClient(opts)
  75. if token := c.Connect(); token.Wait() && token.Error() != nil {
  76. logging.Error("mqtt Connect err: ", token.Error())
  77. }
  78. fmt.Print("Connect aliyun IoT Cloud Sucess\n")
  79. // if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil {
  80. // fmt.Println(token.Error())
  81. // os.Exit(1)
  82. // }
  83. // subscribe to subTopic("/a1Zd7n5yTt8/deng/user/get") and request messages to be delivered
  84. // fmt.Println("Subscribe topic "+subTopic+" success\n", c.IsConnected())
  85. // publish 5 messages to pubTopic("/a1Zd7n5yTt8/deng/user/update")
  86. // for i := 0; i < 50; i++ {
  87. // fmt.Println("publish msg:", i)
  88. // text := fmt.Sprintf("ABC #%d", i)
  89. // token := c.Publish(pubTopic, 0, false, text)
  90. // fmt.Println("publish msg: ", text)
  91. // token.Wait()
  92. // time.Sleep(2 * time.Second)
  93. // }
  94. // unsubscribe from subTopic("/a1Zd7n5yTt8/deng/user/get")
  95. // if token := c.Unsubscribe(subTopic); token.Wait() && token.Error() != nil {
  96. // fmt.Println(token.Error())
  97. // os.Exit(1)
  98. // }
  99. // c.Disconnect(250)
  100. return c, pubTopic
  101. }
  102. func NewTLSConfig() *tls.Config {
  103. // Import trusted certificates from CAfile.pem.
  104. // Alternatively, manually add CA certificates to default openssl CA bundle.
  105. certpool := x509.NewCertPool()
  106. pemCerts, err := ioutil.ReadFile("./x509/root.pem")
  107. if err != nil {
  108. fmt.Println("0. read file error, game over!!")
  109. }
  110. certpool.AppendCertsFromPEM(pemCerts)
  111. // Create tls.Config with desired tls properties
  112. return &tls.Config{
  113. // RootCAs = certs used to verify server cert.
  114. RootCAs: certpool,
  115. // ClientAuth = whether to request cert from server.
  116. // Since the server is set up for SSL, this happens
  117. // anyways.
  118. ClientAuth: tls.NoClientCert,
  119. // ClientCAs = certs used to validate client cert.
  120. ClientCAs: nil,
  121. // InsecureSkipVerify = verify that cert contents
  122. // match server. IP matches what is in cert etc.
  123. InsecureSkipVerify: false,
  124. // Certificates = list of certs client sends to server.
  125. // Certificates: []tls.Certificate{cert},
  126. }
  127. }
  128. // define a function for the default message handler
  129. var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
  130. fmt.Printf("TOPIC: %s\n", msg.Topic())
  131. fmt.Printf("MSG: %s\n", msg.Payload())
  132. }
  133. type AuthInfo struct {
  134. password, username, mqttClientId string
  135. }
  136. func calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp string) AuthInfo {
  137. var raw_passwd bytes.Buffer
  138. raw_passwd.WriteString("clientId" + clientId)
  139. raw_passwd.WriteString("deviceName")
  140. raw_passwd.WriteString(deviceName)
  141. raw_passwd.WriteString("productKey")
  142. raw_passwd.WriteString(productKey)
  143. raw_passwd.WriteString("timestamp")
  144. raw_passwd.WriteString(timeStamp)
  145. fmt.Println(raw_passwd.String())
  146. // hmac, use sha1
  147. mac := hmac.New(sha1.New, []byte(deviceSecret))
  148. mac.Write([]byte(raw_passwd.String()))
  149. password := fmt.Sprintf("%02x", mac.Sum(nil))
  150. fmt.Println(password)
  151. username := deviceName + "&" + productKey
  152. var MQTTClientId bytes.Buffer
  153. MQTTClientId.WriteString(clientId)
  154. // hmac, use sha1; securemode=2 means TLS connection
  155. MQTTClientId.WriteString("|securemode=2,_v=paho-go-1.0.0,signmethod=hmacsha1,timestamp=")
  156. MQTTClientId.WriteString(timeStamp)
  157. MQTTClientId.WriteString("|")
  158. auth := AuthInfo{password: password, username: username, mqttClientId: MQTTClientId.String()}
  159. return auth
  160. }
  161. func feedtempletPush(c MQTT.Client, pubTopic string) {
  162. tx := restful.Engine.NewSession()
  163. defer tx.Close()
  164. dataList, err := tx.SQL(`select f.id as recipeId , f.tname recipeName,ft.id ingId,ft.fname ingName,ft.fweight afQty,
  165. ft.sort mixNo,fd.allowratio allowableError ,fd.fclass ingType, ft.fweight * ( fd.dry /100 ) dmQty,null recipeCost
  166. from feedtemplet f join ftdetail ft on ft.ftid = f.id join feed fd on fd.id = ft.fid
  167. `).Query().List()
  168. if err != nil {
  169. logs.Error("feedtempletPush-error-1:", err)
  170. return
  171. }
  172. pushStr := `{
  173. "apiId": "getKPTData",
  174. "param": {
  175. "farmId": %s,
  176. "method":"getfeedtempletinfo",
  177. "rowCount": "1",
  178. "resultData":%s
  179. }
  180. }`
  181. if len(dataList) > 0 {
  182. b, _ := json.Marshal(dataList)
  183. pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
  184. // c.Publish(pubTopic, 2, false, pushStr)
  185. token := c.Publish(pubTopic, 2, false, pushStr)
  186. fmt.Println("publish msg: ", pushStr, token.Error())
  187. // token.Wait()
  188. // time.Sleep(2 * time.Second)
  189. }
  190. }
  191. func stirPush(c MQTT.Client, pubTopic string) {
  192. tx := restful.Engine.NewSession()
  193. defer tx.Close()
  194. dataList, err := tx.SQL(`SELECT
  195. d.mydate dropDate,
  196. d.projname tmrNo,
  197. d.times loadShift,
  198. d.tempid recipeId,
  199. d.templetname recipeName,
  200. f.feedcode ingId,
  201. d1.fname ingName,
  202. f.fclass ingType,
  203. f.dry dmPct,
  204. d1.sort mixNo,
  205. d1.feedallowratio allowableError,
  206. d1.lweight expWeight,
  207. d1.actualweightminus actualWeight,
  208. d1.begintime startTime,
  209. d1.intime endTime
  210. FROM
  211. downloadedplan d
  212. JOIN downloadplandtl1 d1 ON d1.pid = d.id
  213. JOIN feed f ON f.feedcode = d1.feedcode
  214. AND f.pastureid = d.pastureid
  215. WHERE
  216. DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List()
  217. if err != nil {
  218. logs.Error("feedtempletPush-error-1:", err)
  219. return
  220. }
  221. pushStr := `{
  222. "apiId": "getKPTData",
  223. "param": {
  224. "farmId": %s,
  225. "method":"uploadadddata",
  226. "rowCount": "1",
  227. "resultData":%s
  228. }
  229. }`
  230. if len(dataList) > 0 {
  231. b, _ := json.Marshal(dataList)
  232. pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
  233. token := c.Publish(pubTopic, 2, false, pushStr)
  234. fmt.Println("publish msg: ", pushStr, token.Error())
  235. // token.Wait()
  236. // time.Sleep(2 * time.Second)
  237. }
  238. }
  239. // 撒料信息
  240. func dustingPush(c MQTT.Client, pubTopic string) {
  241. tx := restful.Engine.NewSession()
  242. defer tx.Close()
  243. dataList, err := tx.SQL(`SELECT
  244. d.mydate dropDate,
  245. d.projname tmrNo,
  246. d.times loadShift,
  247. d2.fbarid penId,
  248. b.bcode penName,
  249. d2.cowcount cowCount,
  250. d2.sort feedingNo,
  251. d2.lweight expWeight,
  252. d2.actualweightminus actualWeight,
  253. d2.begintime startTime,
  254. d2.intime endTime
  255. FROM
  256. downloadedplan d
  257. JOIN downloadplandtl2 d2 ON d2.pid = d.id
  258. JOIN bar b ON b.id = d2.fbarid
  259. WHERE
  260. DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List()
  261. if err != nil {
  262. logs.Error("feedtempletPush-error-1:", err)
  263. return
  264. }
  265. pushStr := `{
  266. "apiId": "getKPTData",
  267. "param": {
  268. "farmId": %s,
  269. "method":"uploaddiliverdata",
  270. "rowCount": "1",
  271. "resultData":%s
  272. }
  273. }`
  274. if len(dataList) > 0 {
  275. b, _ := json.Marshal(dataList)
  276. pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
  277. token := c.Publish(pubTopic, 2, false, pushStr)
  278. fmt.Println("publish msg: ", pushStr, token.Error())
  279. // token.Wait()
  280. // time.Sleep(2 * time.Second)
  281. }
  282. }
  283. //设备心跳
  284. func deviceHeartbeat(c MQTT.Client, pubTopic string) {
  285. pushStr := fmt.Sprintf(`{"data_collect_number":%s,"status":true,"model_type":"heartbeat"}`, setting.YynserverSetting.HeartBeat)
  286. token := c.Publish(pubTopic, 2, false, pushStr)
  287. fmt.Println("publish msg: ", pushStr, token.Error())
  288. // token.Wait()
  289. // go func() {
  290. duetimecst2, _ := time.ParseInLocation("15:04:05", "00:01:00", time.Local)
  291. duetimecst3, _ := time.ParseInLocation("15:04:05", "00:00:00", time.Local)
  292. spec1 := fmt.Sprintf("@every %v", duetimecst2.Sub(duetimecst3))
  293. // for {
  294. device := cron.New()
  295. device.AddFunc(spec1, func() {
  296. token := c.Publish(pubTopic, 2, false, pushStr)
  297. fmt.Println("publish msg: ", pushStr, token.Error())
  298. // token.Wait()
  299. })
  300. // }
  301. device.Start()
  302. // }()
  303. }
  304. // 准确率
  305. func equipmentAccuracyPush(c MQTT.Client, pubTopic string) {
  306. tx := restful.Engine.NewSession()
  307. defer tx.Close()
  308. dataList, err := tx.SQL(`select t.tname tname,1-abs(sum(d1.actualweightminus)- sum(d1.lweight))/sum(d1.lweight) rate ,d.mydate rateDate from downloadedplan d join downloadplandtl1 d1 on d1.pid = d.id join tmr t on t.datacaptureno = d.datacaptureno
  309. where DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )
  310. group by d.datacaptureno`).Query().List()
  311. if err != nil {
  312. logs.Error("feedtempletPush-error-1:", err)
  313. return
  314. }
  315. pushStr := `{
  316. "apiId": "getKPTData",
  317. "param": {
  318. "resultData": %s,
  319. "farmId": %s,
  320. "method": "uploadrate",
  321. "rowCount": %d
  322. }
  323. }`
  324. if len(dataList) > 0 {
  325. b, _ := json.Marshal(dataList)
  326. pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId, len(dataList))
  327. token := c.Publish(pubTopic, 2, false, pushStr)
  328. fmt.Println("publish msg: ", pushStr, token.Error())
  329. // token.Wait()
  330. // time.Sleep(2 * time.Second)
  331. }
  332. }
  333. // 完成重量
  334. func finishedWeightPush(c MQTT.Client, pubTopic string) {
  335. tx := restful.Engine.NewSession()
  336. defer tx.Close()
  337. dataList, err := tx.SQL(`select sum(d1.actualweightminus) CompleteWeught,sum(d1.lweight) planWeight,d.mydate weightDate
  338. from downloadedplan d join downloadplandtl1 d1 on d1.pid = d.id join tmr t on t.datacaptureno = d.datacaptureno
  339. where DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' )`).Query().List()
  340. if err != nil {
  341. logs.Error("feedtempletPush-error-1:", err)
  342. return
  343. }
  344. pushStr := `{
  345. "apiId": "getKPTData",
  346. "param": {
  347. "resultData": %s,
  348. "farmId": %s,
  349. "method": "uploadweight",
  350. "rowCount": "1"
  351. }
  352. }`
  353. if len(dataList) > 0 {
  354. b, _ := json.Marshal(dataList)
  355. pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
  356. token := c.Publish(pubTopic, 2, false, pushStr)
  357. fmt.Println("publish msg: ", pushStr, token.Error())
  358. // token.Wait()
  359. // time.Sleep(2 * time.Second)
  360. }
  361. }
  362. // 完成车次
  363. func CompletedTrainNumberPush(c MQTT.Client, pubTopic string) {
  364. tx := restful.Engine.NewSession()
  365. defer tx.Close()
  366. dataList, err := tx.SQL(`select (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) ) planCar,
  367. (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) and iscompleted = 1 ) CompleteCar ,
  368. DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) carDate
  369. `).Query().List()
  370. if err != nil {
  371. logs.Error("feedtempletPush-error-1:", err)
  372. return
  373. }
  374. pushStr := `{
  375. "apiId": "getKPTData",
  376. "param": {
  377. "resultData": %s,
  378. "farmId": %s,
  379. "method": "uploadcarnumber",
  380. "rowCount": "1"
  381. }
  382. }`
  383. if len(dataList) > 0 {
  384. b, _ := json.Marshal(dataList)
  385. pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
  386. token := c.Publish(pubTopic, 2, false, pushStr)
  387. fmt.Println("publish msg: ", pushStr, token.Error())
  388. // token.Wait()
  389. // time.Sleep(2 * time.Second)
  390. }
  391. }
  392. func feedHeatwatch(client MQTT.Client, msg MQTT.Message) {
  393. tx := restful.Engine.NewSession()
  394. defer tx.Close()
  395. data := make(map[string]interface{})
  396. json.Unmarshal(msg.Payload(), &data)
  397. if _, ok := data["feedData"]; ok {
  398. for _, item := range data["feedData"].([]map[string]interface{}) {
  399. tx.SQL(` insert into feed(pastureid,feedcode,fname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS
  400. WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)
  401. ON DUPLICATE KEY UPDATE feedcode = ?,fname = ? `, item["feedCode"], item["feedName"], item["feedCode"], item["feedName"]).Execute()
  402. }
  403. } else if _, ok := data["barData"]; ok {
  404. for _, item := range data["barData"].([]map[string]interface{}) {
  405. tx.SQL(` insert into bar(pastureid,bcode,bname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS
  406. WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)
  407. ON DUPLICATE KEY UPDATE bcode = ?,bname = ? `, item["barCode"], item["barName"], item["barCode"], item["barName"]).Execute()
  408. }
  409. }
  410. }