mqtt.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. package api
  2. import (
  3. "bytes"
  4. "crypto/hmac"
  5. "crypto/sha1"
  6. "crypto/tls"
  7. "crypto/x509"
  8. "encoding/json"
  9. "fmt"
  10. "io/ioutil"
  11. "os"
  12. "strconv"
  13. "time"
  14. "tmr-watch/conf/setting"
  15. "tmr-watch/http/handle/restful"
  16. "tmr-watch/pkg/logging"
  17. "github.com/astaxie/beego/logs"
  18. MQTT "github.com/eclipse/paho.mqtt.golang"
  19. mqtt "github.com/eclipse/paho.mqtt.golang"
  20. "github.com/robfig/cron"
  21. )
  22. func InitMqttClient() {
  23. if setting.YynserverSetting.FarmId != "" {
  24. c, pubTopic := MqttClient()
  25. deviceHeartbeat(c, pubTopic)
  26. // mqttCron := cron.New()
  27. // mqttCron.AddFunc("10 06 * * *", func() {
  28. // feedtempletPush(c, pubTopic)
  29. stirPush(c, pubTopic)
  30. dustingPush(c, pubTopic)
  31. // equipmentAccuracyPush(c, pubTopic)
  32. // finishedWeightPush(c, pubTopic)
  33. // CompletedTrainNumberPush(c, pubTopic)
  34. // })
  35. // mqttCron.Start()
  36. }
  37. }
  38. func MqttClient() (MQTT.Client, string) {
  39. // set the device info, include product key, device name, and device secret
  40. // var productKey string = "a1NmXfrjL8M"
  41. // var deviceName string = "4776_p_breed"
  42. // var deviceSecret string = "c2591b89adff22e1c9f0fc03363f56a4"
  43. // ProductKey
  44. // DeviceName
  45. // DeviceSecret
  46. var productKey string = setting.YynserverSetting.ProductKey
  47. var deviceName string = setting.YynserverSetting.DeviceName
  48. var deviceSecret string = setting.YynserverSetting.DeviceSecret
  49. // FarmId = "1830004623"
  50. // ProductKey = "k03txxLKFae"
  51. // DeviceName = "4623_p_breed"
  52. // DeviceSecret = "d06ababb2b10ba25bca3041e35ac604d"
  53. // HeartBeat = "18300046234623_p_breed"
  54. // set timestamp, clientid, subscribe topic and publish topic
  55. var timeStamp string = strconv.FormatInt(time.Now().UnixNano(), 10)
  56. var clientId string = "go1708496085"
  57. var subTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/get"
  58. var pubTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/post"
  59. // set the login broker url
  60. var raw_broker bytes.Buffer
  61. raw_broker.WriteString("tcp://")
  62. // raw_broker.WriteString(productKey)
  63. raw_broker.WriteString("iot-010a5xth.mqtt.iothub.aliyuncs.com:1883")
  64. opts := MQTT.NewClientOptions().AddBroker(raw_broker.String())
  65. // calculate the login auth info, and set it into the connection options
  66. auth := calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp)
  67. opts.SetClientID(auth.mqttClientId)
  68. opts.SetUsername(auth.username)
  69. opts.SetPassword(auth.password)
  70. opts.SetMaxReconnectInterval(1 * time.Second)
  71. opts.AutoReconnect = true
  72. // opts.SetKeepAlive(60 * 2 * time.Second)
  73. opts.OnConnect = func(c MQTT.Client) {
  74. if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil {
  75. logging.Error("mqtt Subscribe err: ", token.Error())
  76. os.Exit(1)
  77. }
  78. }
  79. c := mqtt.NewClient(opts)
  80. if token := c.Connect(); token.Wait() && token.Error() != nil {
  81. fmt.Println(token.Error())
  82. os.Exit(1)
  83. }
  84. fmt.Print("Connect aliyun IoT Cloud Sucess\n")
  85. return c, pubTopic
  86. }
  87. func NewTLSConfig() *tls.Config {
  88. // Import trusted certificates from CAfile.pem.
  89. // Alternatively, manually add CA certificates to default openssl CA bundle.
  90. certpool := x509.NewCertPool()
  91. pemCerts, err := ioutil.ReadFile("./x509/root.pem")
  92. if err != nil {
  93. fmt.Println("0. read file error, game over!!")
  94. }
  95. certpool.AppendCertsFromPEM(pemCerts)
  96. // Create tls.Config with desired tls properties
  97. return &tls.Config{
  98. // RootCAs = certs used to verify server cert.
  99. RootCAs: certpool,
  100. // ClientAuth = whether to request cert from server.
  101. // Since the server is set up for SSL, this happens
  102. // anyways.
  103. ClientAuth: tls.NoClientCert,
  104. // ClientCAs = certs used to validate client cert.
  105. ClientCAs: nil,
  106. // InsecureSkipVerify = verify that cert contents
  107. // match server. IP matches what is in cert etc.
  108. InsecureSkipVerify: false,
  109. // Certificates = list of certs client sends to server.
  110. // Certificates: []tls.Certificate{cert},
  111. }
  112. }
  113. // define a function for the default message handler
  114. var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
  115. fmt.Printf("TOPIC: %s\n", msg.Topic())
  116. fmt.Printf("MSG: %s\n", msg.Payload())
  117. }
  118. type AuthInfo struct {
  119. password, username, mqttClientId string
  120. }
  121. func calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp string) AuthInfo {
  122. var raw_passwd bytes.Buffer
  123. raw_passwd.WriteString("clientId" + clientId)
  124. raw_passwd.WriteString("deviceName")
  125. raw_passwd.WriteString(deviceName)
  126. raw_passwd.WriteString("productKey")
  127. raw_passwd.WriteString(productKey)
  128. raw_passwd.WriteString("timestamp")
  129. raw_passwd.WriteString(timeStamp)
  130. fmt.Println(raw_passwd.String())
  131. // hmac, use sha1
  132. mac := hmac.New(sha1.New, []byte(deviceSecret))
  133. mac.Write([]byte(raw_passwd.String()))
  134. password := fmt.Sprintf("%02x", mac.Sum(nil))
  135. fmt.Println(password)
  136. username := deviceName + "&" + productKey
  137. var MQTTClientId bytes.Buffer
  138. MQTTClientId.WriteString(clientId)
  139. // hmac, use sha1; securemode=2 means TLS connection
  140. MQTTClientId.WriteString("|securemode=2,_v=paho-go-1.0.0,signmethod=hmacsha1,timestamp=")
  141. MQTTClientId.WriteString(timeStamp)
  142. MQTTClientId.WriteString("|")
  143. auth := AuthInfo{password: password, username: username, mqttClientId: MQTTClientId.String()}
  144. return auth
  145. }
  146. func feedtempletPush(c MQTT.Client, pubTopic string) {
  147. tx := restful.Engine.NewSession()
  148. defer tx.Close()
  149. dataList, err := tx.SQL(`SELECT
  150. f.id AS recipeId,
  151. f.tname recipeName,
  152. ft.id ingId,
  153. ifnull(fd.fname,fdy.fname) ingName,
  154. if(fd.fname is not null, ft.fweight,fty.fweight) afQty,
  155. ft.sort mixNo,
  156. ifnull(fd.allowratio,fdy.allowratio) allowableError,
  157. ifnull(fd.fclass,fdy.fclass) ingType,
  158. if(fd.fname is not null,ft.fweight * ( fd.dry / 100 ), fty.fweight * ( fdy.dry / 100 )) dmQty,
  159. NULL recipeCost
  160. FROM
  161. feedtemplet f
  162. JOIN ftdetail ft ON ft.ftid = f.id
  163. left JOIN feed fd ON fd.id = ft.fid
  164. left join feedtemplet fy on fy.id = ft.preftid
  165. left join ftdetail fty on fty.ftid = fy.id
  166. left JOIN feed fdy ON fdy.id = fty.fid
  167. `).Query().List()
  168. if err != nil {
  169. logs.Error("feedtempletPush-error-1:", err)
  170. return
  171. }
  172. pushStr := `{
  173. "apiId": "getKPTData",
  174. "param": {
  175. "farmId": %s,
  176. "method":"getfeedtempletinfo",
  177. "rowCount": "1",
  178. "resultData":%s
  179. }
  180. }`
  181. if len(dataList) > 0 {
  182. b, _ := json.Marshal(dataList)
  183. pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
  184. // c.Publish(pubTopic, 2, false, pushStr)
  185. token := c.Publish(pubTopic, 2, false, pushStr)
  186. fmt.Println("publish msg: ", pushStr, token.Error())
  187. token.Wait()
  188. // time.Sleep(2 * time.Second)
  189. }
  190. }
  191. func stirPush(c MQTT.Client, pubTopic string) {
  192. tx := restful.Engine.NewSession()
  193. defer tx.Close()
  194. dataList, err := tx.SQL(`SELECT
  195. d.mydate dropDate,
  196. d.projname tmrNo,
  197. d.times loadShift,
  198. d.tempid recipeId,
  199. d.templetname recipeName,
  200. f.feedcode ingId,
  201. d1.fname ingName,
  202. f.fclass ingType,
  203. f.dry dmPct,
  204. d1.sort mixNo,
  205. d1.feedallowratio allowableError,
  206. d1.lweight expWeight,
  207. d1.actualweightminus actualWeight,
  208. d1.begintime startTime,
  209. d1.intime endTime
  210. FROM
  211. downloadedplan d
  212. JOIN downloadplandtl1 d1 ON d1.pid = d.id
  213. JOIN feed f ON f.feedcode = d1.feedcode
  214. AND f.pastureid = d.pastureid
  215. WHERE
  216. DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List()
  217. if err != nil {
  218. logs.Error("feedtempletPush-error-1:", err)
  219. return
  220. }
  221. pushStr := `{
  222. "apiId": "getKPTData",
  223. "param": {
  224. "farmId": %s,
  225. "method":"uploadadddata",
  226. "rowCount": "1",
  227. "resultData":%s
  228. }
  229. }`
  230. if len(dataList) > 0 {
  231. b, _ := json.Marshal(dataList)
  232. pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
  233. token := c.Publish(pubTopic, 2, false, pushStr)
  234. fmt.Println("publish msg: ", pushStr, token.Error())
  235. // token.Wait()
  236. // time.Sleep(2 * time.Second)
  237. }
  238. }
  239. // 撒料信息
  240. func dustingPush(c MQTT.Client, pubTopic string) {
  241. tx := restful.Engine.NewSession()
  242. defer tx.Close()
  243. dataList, err := tx.SQL(`SELECT
  244. d.mydate dropDate,
  245. d.projname tmrNo,
  246. d.times loadShift,
  247. d2.fbarid penId,
  248. b.bcode penName,
  249. d2.cowcount cowCount,
  250. d2.sort feedingNo,
  251. d2.lweight expWeight,
  252. d2.actualweightminus actualWeight,
  253. d2.begintime startTime,
  254. d2.intime endTime,ifnull(driver.drivername,'')tmrName
  255. FROM
  256. downloadedplan d
  257. JOIN downloadplandtl2 d2 ON d2.pid = d.id
  258. JOIN bar b ON b.id = d2.fbarid
  259. join tmr t on t.id = d.tmrid
  260. left join driver on driver.drivercode = t.eqcode
  261. WHERE
  262. DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1) ,'%Y-%m-%d' )`).Query().List()
  263. if err != nil {
  264. logs.Error("feedtempletPush-error-1:", err)
  265. return
  266. }
  267. pushStr := `{
  268. "apiId": "getKPTData",
  269. "param": {
  270. "farmId": %s,
  271. "method":"uploaddiliverdata",
  272. "rowCount": "1",
  273. "resultData":%s
  274. }
  275. }`
  276. if len(dataList) > 0 {
  277. b, _ := json.Marshal(dataList)
  278. pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
  279. token := c.Publish(pubTopic, 2, false, pushStr)
  280. fmt.Println("publish msg: ", pushStr, token.Error())
  281. // token.Wait()
  282. // time.Sleep(2 * time.Second)
  283. }
  284. }
  285. //设备心跳
  286. func deviceHeartbeat(c MQTT.Client, pubTopic string) {
  287. pushStr := fmt.Sprintf(`{"data_collect_number":%s,"status":true,"model_type":"heartbeat"}`, setting.YynserverSetting.HeartBeat)
  288. token := c.Publish(pubTopic, 2, false, pushStr)
  289. fmt.Println("publish msg: ", pushStr, token.Error())
  290. // token.Wait()
  291. // go func() {
  292. duetimecst2, _ := time.ParseInLocation("15:04:05", "00:01:00", time.Local)
  293. duetimecst3, _ := time.ParseInLocation("15:04:05", "00:00:00", time.Local)
  294. spec1 := fmt.Sprintf("@every %v", duetimecst2.Sub(duetimecst3))
  295. // for {
  296. device := cron.New()
  297. device.AddFunc(spec1, func() {
  298. token := c.Publish(pubTopic, 2, false, pushStr)
  299. fmt.Println("publish msg: ", pushStr, token.Error(), time.Now())
  300. token.Wait()
  301. })
  302. // }
  303. device.Start()
  304. // }()
  305. }
  306. // 准确率
  307. func equipmentAccuracyPush(c MQTT.Client, pubTopic string) {
  308. tx := restful.Engine.NewSession()
  309. defer tx.Close()
  310. dataList, err := tx.SQL(`SELECT
  311. t.tname tname,
  312. 1-abs (
  313. sum( d1.actualweightminus )- sum( d1.lweight ))/ sum( d1.lweight ) rate,
  314. d.mydate rateDate
  315. FROM
  316. downloadedplan d
  317. JOIN downloadplandtl1 d1 ON d1.pid = d.id
  318. JOIN tmr t ON t.datacaptureno = d.datacaptureno
  319. WHERE
  320. DATE_FORMAT( d.mydate, '%Y-%m-%d' ) = DATE_FORMAT( subdate( now(), 1 ), '%Y-%m-%d' ) and d.lpplantype in(0,1)
  321. GROUP BY
  322. d.datacaptureno`).Query().List()
  323. if err != nil {
  324. logs.Error("feedtempletPush-error-1:", err)
  325. return
  326. }
  327. pushStr := `{
  328. "apiId": "getKPTData",
  329. "param": {
  330. "resultData": %s,
  331. "farmId": %s,
  332. "method": "uploadrate",
  333. "rowCount": %d
  334. }
  335. }`
  336. if len(dataList) > 0 {
  337. b, _ := json.Marshal(dataList)
  338. pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId, len(dataList))
  339. token := c.Publish(pubTopic, 2, false, pushStr)
  340. fmt.Println("publish msg: ", pushStr, token.Error())
  341. // token.Wait()
  342. // time.Sleep(2 * time.Second)
  343. }
  344. }
  345. // 完成重量
  346. func finishedWeightPush(c MQTT.Client, pubTopic string) {
  347. tx := restful.Engine.NewSession()
  348. defer tx.Close()
  349. dataList, err := tx.SQL(`SELECT
  350. sum( d1.actualweightminus ) CompleteWeught,
  351. sum( d1.lweight ) planWeight,
  352. d.mydate weightDate
  353. FROM
  354. downloadedplan d
  355. JOIN downloadplandtl1 d1 ON d1.pid = d.id
  356. JOIN tmr t ON t.datacaptureno = d.datacaptureno
  357. WHERE
  358. DATE_FORMAT( d.mydate, '%Y-%m-%d' ) = DATE_FORMAT(
  359. subdate( now(), 1 ),
  360. '%Y-%m-%d'
  361. ) and lpplantype in(0,1)`).Query().List()
  362. if err != nil {
  363. logs.Error("feedtempletPush-error-1:", err)
  364. return
  365. }
  366. pushStr := `{
  367. "apiId": "getKPTData",
  368. "param": {
  369. "resultData": %s,
  370. "farmId": %s,
  371. "method": "uploadweight",
  372. "rowCount": "1"
  373. }
  374. }`
  375. if len(dataList) > 0 {
  376. b, _ := json.Marshal(dataList)
  377. pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
  378. token := c.Publish(pubTopic, 2, false, pushStr)
  379. fmt.Println("publish msg: ", pushStr, token.Error())
  380. // token.Wait()
  381. // time.Sleep(2 * time.Second)
  382. }
  383. }
  384. // 完成车次
  385. func CompletedTrainNumberPush(c MQTT.Client, pubTopic string) {
  386. tx := restful.Engine.NewSession()
  387. defer tx.Close()
  388. dataList, err := tx.SQL(` select (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) and lpplantype in(0,1) ) planCar,
  389. (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) and iscompleted = 1 and lpplantype in(0,1)) CompleteCar ,
  390. DATE_FORMAT(subdate(now(),1),'%Y-%m-%d' ) carDate`).Query().List()
  391. if err != nil {
  392. logs.Error("feedtempletPush-error-1:", err)
  393. return
  394. }
  395. pushStr := `{
  396. "apiId": "getKPTData",
  397. "param": {
  398. "resultData": %s,
  399. "farmId": %s,
  400. "method": "uploadcarnumber",
  401. "rowCount": "1"
  402. }
  403. }`
  404. if len(dataList) > 0 {
  405. b, _ := json.Marshal(dataList)
  406. pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
  407. token := c.Publish(pubTopic, 2, false, pushStr)
  408. fmt.Println("publish msg: ", pushStr, token.Error())
  409. // token.Wait()
  410. // time.Sleep(2 * time.Second)
  411. }
  412. }
  413. func feedHeatwatch(client MQTT.Client, msg MQTT.Message) {
  414. tx := restful.Engine.NewSession()
  415. defer tx.Close()
  416. data := make(map[string]interface{})
  417. json.Unmarshal(msg.Payload(), &data)
  418. if _, ok := data["feedData"]; ok {
  419. for _, item := range data["feedData"].([]map[string]interface{}) {
  420. tx.SQL(` insert into feed(pastureid,feedcode,fname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS
  421. WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)
  422. ON DUPLICATE KEY UPDATE feedcode = ?,fname = ? `, item["feedCode"], item["feedName"], item["feedCode"], item["feedName"]).Execute()
  423. }
  424. } else if _, ok := data["barData"]; ok {
  425. for _, item := range data["barData"].([]map[string]interface{}) {
  426. tx.SQL(` insert into bar(pastureid,bcode,bname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS
  427. WHERE table_name = 'recweight' AND table_schema = 'tmrwatch3' AND column_name = 'pastureid'),?,?)
  428. ON DUPLICATE KEY UPDATE bcode = ?,bname = ? `, item["barCode"], item["barName"], item["barCode"], item["barName"]).Execute()
  429. }
  430. }
  431. }