mqtt.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782
  1. package api
  2. import (
  3. "bytes"
  4. "crypto/hmac"
  5. "crypto/sha1"
  6. "crypto/tls"
  7. "crypto/x509"
  8. "encoding/json"
  9. "fmt"
  10. "io/ioutil"
  11. "net/http"
  12. "os"
  13. "strconv"
  14. "time"
  15. "tmr-watch/conf/setting"
  16. "tmr-watch/http/handle/restful"
  17. "tmr-watch/models"
  18. "tmr-watch/pkg/app"
  19. "tmr-watch/pkg/e"
  20. "tmr-watch/pkg/logging"
  21. "github.com/Anderson-Lu/gofasion/gofasion"
  22. "github.com/astaxie/beego/logs"
  23. MQTT "github.com/eclipse/paho.mqtt.golang"
  24. mqtt "github.com/eclipse/paho.mqtt.golang"
  25. "github.com/gin-gonic/gin"
  26. "github.com/robfig/cron"
  27. )
  28. var c mqtt.Client
  29. var pubTopic string
  30. func InitMqttClient() {
  31. if setting.YynserverSetting.FarmId != "" {
  32. c, pubTopic = MqttClient()
  33. deviceHeartbeat(c, pubTopic)
  34. // GetFeedDataFromApi()
  35. // getBarDataFromApi()
  36. // i := 50
  37. // for {
  38. // i--
  39. // if i == 0 {
  40. // break
  41. // }
  42. // now := time.Now().AddDate(0, 0, -i).Format("2006-01-02")
  43. // // now := "2024-05-23"
  44. // fmt.Println(now, time.Now())
  45. // stirPush(c, pubTopic, now)
  46. // dustingPush(c, pubTopic, now)
  47. // // equipmentAccuracyPush(c, pubTopic, now)
  48. // // finishedWeightPush(c, pubTopic, now)
  49. // // feedtempletPush(c, pubTopic)
  50. // // CompletedTrainNumberPush(c, pubTopic, now)
  51. // }
  52. // now := "2024-05-01"
  53. // stirPush(c, pubTopic, now)
  54. // dustingPush(c, pubTopic, now)
  55. // equipmentAccuracyPush(c, pubTopic, now)
  56. // finishedWeightPush(c, pubTopic, now)
  57. // feedtempletPush(c, pubTopic)
  58. // CompletedTrainNumberPush(c, pubTopic, now)
  59. // stirPush(c, pubTopic, "2024-07-01")
  60. // equipmentAccuracyPush(c, pubTopic, "2024-07-01")
  61. // finishedWeightPush(c, pubTopic, "2024-07-01")
  62. // feedtempletPush(c, pubTopic)
  63. // CompletedTrainNumberPush(c, pubTopic, "2024-07-01")
  64. mqttCron := cron.New()
  65. mqttCron.AddFunc("10 07 * * *", func() {
  66. now := time.Now().AddDate(0, 0, -1).Format("2006-01-02")
  67. GetFeedDataFromApi()
  68. stirPush(c, pubTopic, now)
  69. dustingPush(c, pubTopic, now)
  70. equipmentAccuracyPush(c, pubTopic, now)
  71. finishedWeightPush(c, pubTopic, now)
  72. feedtempletPush(c, pubTopic)
  73. CompletedTrainNumberPush(c, pubTopic, now)
  74. getBarDataFromApi()
  75. })
  76. mqttCron.Start()
  77. }
  78. }
  79. func MqttClient() (MQTT.Client, string) {
  80. // set the device info, include product key, device name, and device secret
  81. // var productKey string = "a1NmXfrjL8M"
  82. // var deviceName string = "4776_p_breed"
  83. // var deviceSecret string = "c2591b89adff22e1c9f0fc03363f56a4"
  84. // ProductKey
  85. // DeviceName
  86. // DeviceSecret
  87. var productKey string = setting.YynserverSetting.ProductKey
  88. var deviceName string = setting.YynserverSetting.DeviceName
  89. var deviceSecret string = setting.YynserverSetting.DeviceSecret
  90. // product_key =k03txxLKFae
  91. // device_name =4623_p_breed
  92. // device_secret =d06ababb2b10ba25bca3041e35ac604d
  93. // host = iot-010a5xth.mqtt.iothub.aliyuncs.com
  94. // farmId=1830004623
  95. // heartBeat=18300046234623_p_breed
  96. // TopicName=/k03txxLKFae/4623_p_breed/user/heatwatch/tmrBreed/post
  97. var timeStamp string = strconv.FormatInt(time.Now().UnixNano(), 10)
  98. var clientId string = "go" + setting.YynserverSetting.FarmId
  99. var subTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/get"
  100. var pubTopic string = "/" + productKey + "/" + deviceName + "/user/heatwatch/tmrBreed/post"
  101. // set the login broker url
  102. var raw_broker bytes.Buffer
  103. raw_broker.WriteString("tcp://")
  104. raw_broker.WriteString("iot-010a5xth.mqtt.iothub.aliyuncs.com:1883")
  105. opts := MQTT.NewClientOptions().AddBroker(raw_broker.String())
  106. // calculate the login auth info, and set it into the connection options
  107. auth := calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp)
  108. opts.SetClientID(auth.mqttClientId)
  109. opts.SetUsername(auth.username)
  110. opts.SetPassword(auth.password)
  111. opts.SetMaxReconnectInterval(1 * time.Second)
  112. opts.AutoReconnect = true
  113. // opts.SetKeepAlive(60 * 2 * time.Second)
  114. opts.OnConnect = func(c MQTT.Client) {
  115. if token := c.Subscribe(subTopic, 0, feedHeatwatch); token.Wait() && token.Error() != nil {
  116. logging.Error("mqtt Subscribe err: ", token.Error())
  117. os.Exit(1)
  118. }
  119. }
  120. c := mqtt.NewClient(opts)
  121. if token := c.Connect(); token.Wait() && token.Error() != nil {
  122. fmt.Println(token.Error())
  123. os.Exit(1)
  124. }
  125. fmt.Print("Connect aliyun IoT Cloud Sucess\n")
  126. return c, pubTopic
  127. }
  128. func NewTLSConfig() *tls.Config {
  129. // Import trusted certificates from CAfile.pem.
  130. // Alternatively, manually add CA certificates to default openssl CA bundle.
  131. certpool := x509.NewCertPool()
  132. pemCerts, err := ioutil.ReadFile("./x509/root.pem")
  133. if err != nil {
  134. fmt.Println("0. read file error, game over!!")
  135. }
  136. certpool.AppendCertsFromPEM(pemCerts)
  137. // Create tls.Config with desired tls properties
  138. return &tls.Config{
  139. // RootCAs = certs used to verify server cert.
  140. RootCAs: certpool,
  141. // ClientAuth = whether to request cert from server.
  142. // Since the server is set up for SSL, this happens
  143. // anyways.
  144. ClientAuth: tls.NoClientCert,
  145. // ClientCAs = certs used to validate client cert.
  146. ClientCAs: nil,
  147. // InsecureSkipVerify = verify that cert contents
  148. // match server. IP matches what is in cert etc.
  149. InsecureSkipVerify: false,
  150. // Certificates = list of certs client sends to server.
  151. // Certificates: []tls.Certificate{cert},
  152. }
  153. }
  154. // define a function for the default message handler
  155. var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
  156. fmt.Printf("TOPIC: %s\n", msg.Topic())
  157. fmt.Printf("MSG: %s\n", msg.Payload())
  158. }
  159. type AuthInfo struct {
  160. password, username, mqttClientId string
  161. }
  162. func calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp string) AuthInfo {
  163. var raw_passwd bytes.Buffer
  164. raw_passwd.WriteString("clientId" + clientId)
  165. raw_passwd.WriteString("deviceName")
  166. raw_passwd.WriteString(deviceName)
  167. raw_passwd.WriteString("productKey")
  168. raw_passwd.WriteString(productKey)
  169. raw_passwd.WriteString("timestamp")
  170. raw_passwd.WriteString(timeStamp)
  171. fmt.Println(raw_passwd.String())
  172. // hmac, use sha1
  173. mac := hmac.New(sha1.New, []byte(deviceSecret))
  174. mac.Write([]byte(raw_passwd.String()))
  175. password := fmt.Sprintf("%02x", mac.Sum(nil))
  176. fmt.Println(password)
  177. username := deviceName + "&" + productKey
  178. var MQTTClientId bytes.Buffer
  179. MQTTClientId.WriteString(clientId)
  180. // hmac, use sha1; securemode=2 means TLS connection
  181. MQTTClientId.WriteString("|securemode=2,_v=paho-go-1.0.0,signmethod=hmacsha1,timestamp=")
  182. MQTTClientId.WriteString(timeStamp)
  183. MQTTClientId.WriteString("|")
  184. auth := AuthInfo{password: password, username: username, mqttClientId: MQTTClientId.String()}
  185. return auth
  186. }
  187. func feedtempletPush(c MQTT.Client, pubTopic string) {
  188. tx := restful.Engine.NewSession()
  189. defer tx.Close()
  190. dataList, err := tx.SQL(`SELECT
  191. f.id AS recipeId,
  192. f.tname recipeName,
  193. ft.id ingId,
  194. ifnull(fd.fname,fdy.fname) ingName,
  195. if(fd.fname is not null, ft.fweight,fty.fweight) afQty,
  196. ft.sort mixNo,
  197. ifnull(fd.allowratio,fdy.allowratio) allowableError,
  198. ifnull(fd.fclass,fdy.fclass) ingType,
  199. if(fd.fname is not null,ft.fweight * ( fd.dry / 100 ), fty.fweight * ( fdy.dry / 100 )) dmQty,
  200. '' recipeCost
  201. FROM
  202. feedtemplet f
  203. JOIN ftdetail ft ON ft.ftid = f.id
  204. left JOIN feed fd ON fd.id = ft.fid
  205. left join feedtemplet fy on fy.id = ft.preftid
  206. left join ftdetail fty on fty.ftid = fy.id
  207. left JOIN feed fdy ON fdy.id = fty.fid
  208. `).QueryString()
  209. if err != nil {
  210. logs.Error("feedtempletPush-error-1:", err)
  211. return
  212. }
  213. pushStr := `{
  214. "apiId": "getKPTData",
  215. "param": {
  216. "farmId": "%s",
  217. "method":"getfeedtempletinfo",
  218. "rowCount": "1",
  219. "resultData":%s
  220. }
  221. }`
  222. if len(dataList) > 0 {
  223. b, _ := json.Marshal(dataList)
  224. pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, string(b))
  225. // c.Publish(pubTopic, 2, false, pushStr)
  226. token := c.Publish(pubTopic, 2, false, pushStr)
  227. fmt.Println("publish msg: ", pushStr, token.Error())
  228. token.Wait()
  229. // time.Sleep(2 * time.Second)
  230. }
  231. }
  232. func stirPush(c MQTT.Client, pubTopic, date string) {
  233. tx := restful.Engine.NewSession()
  234. defer tx.Close()
  235. //f.sapCode ingId,
  236. dataList, err := tx.SQL(`SELECT
  237. DATE_FORMAT(d.mydate,'%Y-%m-%d') loadDate,
  238. d.sort tmrNo,
  239. d.times loadShift,
  240. d.tempid recipeId,
  241. d.templetname recipeName,
  242. if( f.backup2 <> '',f.backup2 ,f.feedcode) ingId,
  243. d1.fname ingName,
  244. 12 ingType,
  245. f.dry dmPct,
  246. d1.sort mixNo,
  247. d1.feedallowratio allowable_error,
  248. d1.lweight expWeight,
  249. d1.actualweightminus actualWeight,
  250. if((select count(1) from downloadplandtl1 where pid = d.id and sort < d1.sort order by sort desc) >0 ,(select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadplandtl1 where pid = d.id and sort < d1.sort order by sort desc limit 1),DATE_FORMAT(d.intime,'%Y-%m-%d %H:%i:%s') ) startTime,
  251. DATE_FORMAT(d1.intime,'%Y-%m-%d %H:%i:%s') endTime , ifnull(if(d.driverId !=0 ,(select drivername from driver where id = d.driverId),(SELECT dr.driver FROM dutyrecord dr
  252. WHERE dr.pastureid = d.pastureid AND dr.eqid = d.tmrid and dr.times= d.times AND dr.operatetime <=d.mydate
  253. ORDER BY dr.operatetime DESC LIMIT 1)),'')tmrName
  254. FROM
  255. downloadedplan d
  256. JOIN downloadplandtl1 d1 ON d1.pid = d.id
  257. JOIN feed f ON f.id = d1.fid
  258. AND f.pastureid = d.pastureid
  259. WHERE
  260. DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = ? and d1.type = 0 and (f.feedcode LIKE '%00000000%' or f.backup2 <> '' ) `, date).QueryString()
  261. // DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = ? and f.sapCode is not null and f.sapCode != '' `, date).QueryString()
  262. if err != nil {
  263. logs.Error("feedtempletPush-error-1:", err)
  264. return
  265. }
  266. pushStr := `{
  267. "apiId": "getKPTData",
  268. "param": {
  269. "farmId": "%s",
  270. "method":"uploadadddata",
  271. "rowCount": "%d",
  272. "resultData":%s
  273. }
  274. }`
  275. if len(dataList) > 0 {
  276. b, _ := json.Marshal(dataList)
  277. pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, len(dataList), string(b))
  278. fmt.Println(pushStr)
  279. token := c.Publish(pubTopic, 2, false, pushStr)
  280. fmt.Println("publish msg: ", pushStr, token.Error())
  281. // token.Wait()
  282. // time.Sleep(2 * time.Second)
  283. }
  284. }
  285. // 撒料信息
  286. func dustingPush(c MQTT.Client, pubTopic, date string) {
  287. tx := restful.Engine.NewSession()
  288. defer tx.Close()
  289. dataList, err := tx.SQL(`SELECT
  290. ifnull(if(d.driverId !=0 ,(select drivername from driver where id = d.driverId),(SELECT dr.driver FROM dutyrecord dr
  291. WHERE dr.pastureid = d.pastureid AND dr.eqid = d.tmrid and dr.times= d.times AND dr.operatetime <=d.mydate
  292. ORDER BY dr.operatetime DESC LIMIT 1)),'')tmrName ,
  293. DATE_FORMAT(d.mydate,'%Y-%m-%d') dropDate,
  294. d.sort tmrNo,
  295. d.times dropShift,
  296. d2.fbarid penId,
  297. b.bname penName,
  298. fp.ccount cowCount,
  299. d2.sort feedingNo,
  300. d2.lweight expWeight,
  301. d2.actualweightminus actualWeight,
  302. if((select count(1) from downloadplandtl2 where pid = d.id and sort < d2.sort order by sort desc) >0 ,(select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadplandtl2 where pid = d.id and sort < d2.sort order by sort desc limit 1), (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadplandtl1 where pid = d.id order by sort desc limit 1) ) startTime,
  303. DATE_FORMAT(d2.intime,'%Y-%m-%d %H:%i:%s') endTime
  304. FROM
  305. downloadedplan d
  306. JOIN downloadplandtl2 d2 ON d2.pid = d.id
  307. JOIN bar b ON b.id = d2.fbarid
  308. join feedp fp on fp.barid = b.id
  309. join tmr t on t.id = d.tmrid
  310. left join driver on driver.drivercode = t.eqcode
  311. WHERE
  312. DATE_FORMAT( d.mydate ,'%Y-%m-%d' ) = ? `, date).QueryString()
  313. if err != nil {
  314. logs.Error("feedtempletPush-error-1:", err)
  315. return
  316. }
  317. pushStr := `{
  318. "apiId": "getKPTData",
  319. "param": {
  320. "farmId": "%s",
  321. "method":"uploaddiliverdata",
  322. "rowCount": "%d",
  323. "resultData":%s
  324. }
  325. }`
  326. if len(dataList) > 0 {
  327. b, _ := json.Marshal(dataList)
  328. pushStr = fmt.Sprintf(pushStr, setting.YynserverSetting.FarmId, len(dataList), string(b))
  329. token := c.Publish(pubTopic, 2, false, pushStr)
  330. fmt.Println("publish msg: ", pushStr, token.Error())
  331. // token.Wait()
  332. // time.Sleep(2 * time.Second)
  333. }
  334. }
  335. //设备心跳
  336. func deviceHeartbeat(c MQTT.Client, pubTopic string) {
  337. pushStr := fmt.Sprintf(`{"data_collect_number":%s,"status":true,"model_type":"heartbeat"}`, setting.YynserverSetting.HeartBeat)
  338. c.Publish(pubTopic, 2, false, pushStr)
  339. // fmt.Println("publish msg: ", pushStr, token.Error())
  340. // token.Wait()
  341. // go func() {
  342. duetimecst2, _ := time.ParseInLocation("15:04:05", "00:01:00", time.Local)
  343. duetimecst3, _ := time.ParseInLocation("15:04:05", "00:00:00", time.Local)
  344. spec1 := fmt.Sprintf("@every %v", duetimecst2.Sub(duetimecst3))
  345. // for {
  346. device := cron.New()
  347. device.AddFunc(spec1, func() {
  348. token := c.Publish(pubTopic, 2, false, pushStr)
  349. fmt.Println("publish msg: ", pushStr, token.Error(), time.Now())
  350. token.Wait()
  351. })
  352. // }
  353. device.Start()
  354. // }()
  355. }
  356. // 准确率
  357. func equipmentAccuracyPush(c MQTT.Client, pubTopic, date string) {
  358. tx := restful.Engine.NewSession()
  359. defer tx.Close()
  360. dataList, err := tx.SQL(`SELECT
  361. t.tname Name,
  362. 1-abs (
  363. sum( d1.actualweightminus )- sum( d1.lweight ))/ sum( d1.lweight ) Rate,
  364. d.mydate RateDate , DATE_FORMAT(d.intime,'%Y-%m-%d %H:%i:%s') startTime,
  365. (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadplandtl2 where pid = d.id order by sort desc limit 1) endTime
  366. FROM
  367. downloadedplan d
  368. JOIN downloadplandtl1 d1 ON d1.pid = d.id
  369. JOIN tmr t ON t.datacaptureno = d.datacaptureno
  370. WHERE
  371. DATE_FORMAT( d.mydate, '%Y-%m-%d' ) = ? and d.lpplantype in(0,1)
  372. GROUP BY
  373. d.datacaptureno`, date).QueryString()
  374. if err != nil {
  375. logs.Error("feedtempletPush-error-1:", err)
  376. return
  377. }
  378. pushStr := `{
  379. "apiId": "getKPTData",
  380. "param": {
  381. "resultData": %s,
  382. "farmId": "%s",
  383. "method": "uploadrate",
  384. "rowCount": %d
  385. }
  386. }`
  387. if len(dataList) > 0 {
  388. b, _ := json.Marshal(dataList)
  389. pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId, len(dataList))
  390. token := c.Publish(pubTopic, 2, false, pushStr)
  391. // fmt.Println("publish msg: ", pushStr, token.Error())
  392. token.Wait()
  393. // time.Sleep(2 * time.Second)
  394. }
  395. }
  396. // 完成重量
  397. func finishedWeightPush(c MQTT.Client, pubTopic, date string) {
  398. tx := restful.Engine.NewSession()
  399. defer tx.Close()
  400. dataList, err := tx.SQL(`SELECT
  401. sum( d1.actualweightminus ) completeWeight,
  402. sum( d1.lweight ) planWeight,
  403. d.mydate weightDate , DATE_FORMAT(d.intime,'%Y-%m-%d %H:%i:%s') startTime,
  404. (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadedplan where mydate = d.mydate and intime is not null order by sort desc limit 1) endTime
  405. FROM
  406. downloadedplan d
  407. JOIN downloadplandtl1 d1 ON d1.pid = d.id
  408. JOIN tmr t ON t.datacaptureno = d.datacaptureno
  409. WHERE
  410. DATE_FORMAT( d.mydate, '%Y-%m-%d' ) = ? and lpplantype in(0,1)`, date).QueryString()
  411. if err != nil {
  412. logs.Error("feedtempletPush-error-1:", err)
  413. return
  414. }
  415. pushStr := `{
  416. "apiId": "getKPTData",
  417. "param": {
  418. "resultData": %s,
  419. "farmId": "%s",
  420. "method": "uploadweight",
  421. "rowCount": "1"
  422. }
  423. }`
  424. if len(dataList) > 0 {
  425. b, _ := json.Marshal(dataList)
  426. pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
  427. token := c.Publish(pubTopic, 2, false, pushStr)
  428. // fmt.Println("publish msg: ", pushStr, token.Error())
  429. token.Wait()
  430. // time.Sleep(2 * time.Second)
  431. }
  432. }
  433. // 完成车次
  434. func CompletedTrainNumberPush(c MQTT.Client, pubTopic, date string) {
  435. tx := restful.Engine.NewSession()
  436. defer tx.Close()
  437. dataList, err := tx.SQL(` select (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = ? and lpplantype in(0,1) ) planCar,
  438. (select count(1) from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = ? and iscompleted = 1 and lpplantype in(0,1)) CompleteCar ,
  439. ? carDate,
  440. (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = ? and iscompleted = 1 and lpplantype in(0,1) order by intime asc limit 1 ) startTime,
  441. (select DATE_FORMAT(intime,'%Y-%m-%d %H:%i:%s') from downloadedplan where DATE_FORMAT(mydate ,'%Y-%m-%d' ) = ? and iscompleted = 1 and lpplantype in(0,1) and intime is not null order by intime desc limit 1 ) endTime
  442. `, date, date, date, date, date).QueryString()
  443. if err != nil {
  444. logs.Error("feedtempletPush-error-1:", err)
  445. return
  446. }
  447. pushStr := `{
  448. "apiId": "getKPTData",
  449. "param": {
  450. "resultData": %s,
  451. "farmId": "%s",
  452. "method": "uploadcarnumber",
  453. "rowCount": "1"
  454. }
  455. }`
  456. if len(dataList) > 0 {
  457. b, _ := json.Marshal(dataList)
  458. pushStr = fmt.Sprintf(pushStr, string(b), setting.YynserverSetting.FarmId)
  459. token := c.Publish(pubTopic, 2, false, pushStr)
  460. // fmt.Println("publish msg: ", pushStr, token.Error())
  461. token.Wait()
  462. // time.Sleep(2 * time.Second)
  463. }
  464. }
  465. func feedHeatwatch(client MQTT.Client, msg MQTT.Message) {
  466. tx := restful.Engine.NewSession()
  467. defer tx.Close()
  468. data := make(map[string]interface{})
  469. json.Unmarshal(msg.Payload(), &data)
  470. if _, ok := data["feedData"]; ok {
  471. for _, item := range data["feedData"].([]map[string]interface{}) {
  472. tx.SQL(` insert into feed(pastureid,feedcode,fname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS
  473. WHERE table_name = 'recweight' AND table_schema = 'tmrwatch2' AND column_name = 'pastureid'),?,?)
  474. ON DUPLICATE KEY UPDATE feedcode = ?,fname = ? `, item["feedCode"], item["feedName"], item["feedCode"], item["feedName"]).Execute()
  475. }
  476. } else if _, ok := data["barData"]; ok {
  477. for _, item := range data["barData"].([]map[string]interface{}) {
  478. tx.SQL(` insert into bar(pastureid,bcode,bname)values((SELECT column_default INTO pastureidTem FROM information_schema.COLUMNS
  479. WHERE table_name = 'recweight' AND table_schema = 'tmrwatch2' AND column_name = 'pastureid'),?,?)
  480. ON DUPLICATE KEY UPDATE bcode = ?,bname = ? `, item["barCode"], item["barName"], item["barCode"], item["barName"]).Execute()
  481. }
  482. }
  483. }
  484. func LabourStirPush(cq *gin.Context) {
  485. appG := app.Gin{C: cq}
  486. dataByte, _ := ioutil.ReadAll(cq.Request.Body)
  487. fsion := gofasion.NewFasion(string(dataByte))
  488. date := fsion.Get("date").ValueStr()
  489. stirPush(c, pubTopic, date)
  490. appG.Response(http.StatusOK, e.SUCCESS, true)
  491. }
  492. func LabourDustingPush(cq *gin.Context) {
  493. appG := app.Gin{C: cq}
  494. dataByte, _ := ioutil.ReadAll(cq.Request.Body)
  495. fsion := gofasion.NewFasion(string(dataByte))
  496. date := fsion.Get("date").ValueStr()
  497. dustingPush(c, pubTopic, date)
  498. appG.Response(http.StatusOK, e.SUCCESS, true)
  499. }
  500. func getApiToken() (string, error) {
  501. url := "https://zhmc-api.yunyangniu.com/oauth/oauth/token"
  502. bodyMap := map[string]string{
  503. "grant_type": "client_credentials",
  504. "client_id": "HouseInfo",
  505. "client_secret": "secret",
  506. }
  507. // 构建带参数的 URL
  508. urlWithParams := url + "?"
  509. for key, value := range bodyMap {
  510. urlWithParams += fmt.Sprintf("%s=%s&", key, value)
  511. }
  512. // 移除末尾的 "&"
  513. urlWithParams = urlWithParams[:len(urlWithParams)-1]
  514. // 发送 POST 请求
  515. resp, err := http.Post(urlWithParams, "application/x-www-form-urlencoded", nil)
  516. if err != nil {
  517. return "", err
  518. }
  519. defer resp.Body.Close()
  520. // 读取响应内容
  521. body, err := ioutil.ReadAll(resp.Body)
  522. if err != nil {
  523. return "", err
  524. }
  525. // 解析 JSON 响应
  526. var tokenVo map[string]interface{}
  527. err = json.Unmarshal(body, &tokenVo)
  528. if err != nil {
  529. return "", err
  530. }
  531. // 获取 access_token
  532. token, ok := tokenVo["access_token"].(string)
  533. if !ok {
  534. return "", fmt.Errorf("access_token not found in response")
  535. }
  536. return token, nil
  537. }
  538. func GetFeedDataFromApi() {
  539. // mapResult := make(map[string]interface{})
  540. // 获取 API Token
  541. tokenString, err := getApiToken()
  542. if err != nil {
  543. return
  544. }
  545. if tokenString == "" {
  546. return
  547. }
  548. // 构建 URL
  549. urlString := fmt.Sprintf("https://zhmc-api.yunyangniu.com/duijie/v1/breed/synMaterialInfo?farmNum=%s&secret=%s", setting.YynserverSetting.FarmId, setting.YynserverSetting.YynSecret)
  550. // 构建请求头
  551. headerMap := map[string]string{
  552. "username": "KPT_USER",
  553. "Authorization": "Bearer " + tokenString,
  554. }
  555. // 发送 POST 请求
  556. reDataString, err := sendPost(urlString, headerMap, nil)
  557. if err != nil {
  558. return
  559. }
  560. if reDataString != "" {
  561. var resultMap map[string]interface{}
  562. err = json.Unmarshal([]byte(reDataString), &resultMap)
  563. if err != nil {
  564. return
  565. }
  566. msgCode, ok := resultMap["msgCode"].(string)
  567. if !ok || msgCode != "10000" {
  568. return
  569. }
  570. contentMap, ok := resultMap["content"].(map[string]interface{})
  571. if !ok {
  572. return
  573. }
  574. b, _ := json.Marshal(contentMap["materialInfoList"])
  575. feedList := make([]*models.YYNMaterialInfo, 0)
  576. json.Unmarshal(b, &feedList)
  577. tx := restful.Engine.NewSession()
  578. defer tx.Close()
  579. pastureinfo := new(udPastureInfo)
  580. err := tx.SQL(`select column_default as pastureid,(select werks from pasture where pastureid = column_default) werks from information_schema.COLUMNS
  581. WHERE table_name = 'recweight' AND table_schema = ? AND column_name = 'pastureid'`, setting.DatabaseSetting.Name).GetFirst(pastureinfo).Error
  582. if err != nil {
  583. logs.Error(err)
  584. return
  585. }
  586. for _, feed := range feedList {
  587. ids, err := setting.SnowIds.NextId()
  588. if err != nil {
  589. ids = time.Now().UnixNano()
  590. logging.Info("create SnowIds err", err)
  591. }
  592. _, err = tx.Exec(` insert into feedclass(id,pastureid,fcname,fccode,bigfeedclassid,bigfeedclassname)values(?,?,?,?,?,?) ON DUPLICATE KEY UPDATE backup1 = '云养牛'`,
  593. ids, pastureinfo.Pastureid, feed.CategoryName, feed.CategoryCode, ids, feed.CategoryName)
  594. if err != nil {
  595. logs.Error(err)
  596. return
  597. }
  598. ids1, err := setting.SnowIds.NextId()
  599. if err != nil {
  600. ids1 = time.Now().UnixNano()
  601. logging.Info("create SnowIds err", err)
  602. }
  603. _, err = tx.Exec(` insert into feed(id,pastureid,fname,feedcode,fclass,fclassid,source)
  604. values(?,?,?,?,?,(select id from feedclass where pastureid = ? and fccode = ? ),'云养牛') ON DUPLICATE KEY UPDATE source = '云养牛' `,
  605. ids1, pastureinfo.Pastureid, feed.MaterialName, feed.MaterialCode, feed.CategoryName, pastureinfo.Pastureid, feed.CategoryCode)
  606. fmt.Println(err)
  607. }
  608. }
  609. }
  610. func getBarDataFromApi() {
  611. // 获取 API Token
  612. tokenString, err := getApiToken()
  613. if err != nil {
  614. return
  615. }
  616. if tokenString == "" {
  617. return
  618. }
  619. // 构建 URL
  620. urlString := fmt.Sprintf("https://zhmc-api.yunyangniu.com/duijie/v1/breed/synCowHouseInfo?farmNum=%s", setting.YynserverSetting.FarmId)
  621. urlString += fmt.Sprintf("&secret=%s", setting.YynserverSetting.YynSecret)
  622. // 构建请求头
  623. headerMap := map[string]string{
  624. "username": "KPT_USER",
  625. "Authorization": "Bearer " + tokenString,
  626. }
  627. // 发送 POST 请求
  628. reDataString, err := sendPost(urlString, headerMap, nil)
  629. if err != nil {
  630. return
  631. }
  632. // fmt.Println(time.Now(), "牛舍接口返回:", reDataString)
  633. if reDataString != "" {
  634. var resultMap map[string]interface{}
  635. err = json.Unmarshal([]byte(reDataString), &resultMap)
  636. if err != nil {
  637. return
  638. }
  639. msgCode, ok := resultMap["msgCode"].(string)
  640. if !ok || msgCode != "10000" {
  641. return
  642. }
  643. contentMap, ok := resultMap["content"].(map[string]interface{})
  644. if !ok {
  645. return
  646. }
  647. cowHouseList, ok := contentMap["cowHouseList"].([]interface{})
  648. if !ok {
  649. return
  650. }
  651. fmt.Println(time.Now(), "牛舍列表返回:", cowHouseList)
  652. // cowAmount:57 farmName:吉林省牧硕养殖有限公司 farmNum:1830002720 farmUuid:737a6a60094011e9802143b3b3c94e01 houseName:1-3东蹄病
  653. // houseNum:131 houseType:lactation houseTypeName:泌乳牛舍 houseUuid:05f3e0a3127f11efbf42d1859aea01b1
  654. // _, err := tx.SQL(` insert into bar(pastureid,bcode,bname,sort,class,classcode,cattle,cattlecode,sapcode)
  655. // values(?,?,?,(select max(sort)+1 from bar b where b.pastureid = ? ),
  656. // (select distName from dist where distCode = ? and distType = '牛舍类型' ),?,
  657. // (select distName from dist where distCode = ? and distType = '牛群类别'),?,?)
  658. // ON DUPLICATE KEY UPDATE bname = ? ,
  659. // class = (select distName from dist where distCode = ? and distType = '牛舍类型' ) ,classcode = ?,
  660. // cattle = (select distName from dist where distCode = ? and distType = '牛群类别' ) ,cattlecode = ?
  661. // `,
  662. // pastureId, barMap["CHSNO"], fmt.Sprintf("%v_sap", barMap["CHSTX"]), pastureId, barMap["CHSTY"], barMap["CHSTY"],
  663. // barMap["FCWTS"], barMap["FCWTS"], barMap["CHSNO"],
  664. // fmt.Sprintf("%v_sap", barMap["CHSTX"]), barMap["CHSTY"], barMap["CHSTY"], barMap["FCWTS"], barMap["FCWTS"]).Execute()
  665. // if err != nil {
  666. // logs.Error(err)
  667. // return
  668. //}
  669. }
  670. }
  671. func sendPost(url string, headers map[string]string, body []byte) (string, error) {
  672. req, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
  673. if err != nil {
  674. return "", err
  675. }
  676. for key, value := range headers {
  677. req.Header.Set(key, value)
  678. }
  679. client := &http.Client{}
  680. resp, err := client.Do(req)
  681. if err != nil {
  682. return "", err
  683. }
  684. defer resp.Body.Close()
  685. respBody, err := ioutil.ReadAll(resp.Body)
  686. if err != nil {
  687. return "", err
  688. }
  689. return string(respBody), nil
  690. }