mqtt_handle.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. package mqtt
  2. import (
  3. "encoding/json"
  4. "kpt-pasture/model"
  5. "kpt-pasture/util"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/jinzhu/copier"
  11. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  12. "gitee.com/xuyiping_admin/pkg/xerr"
  13. "go.uber.org/zap"
  14. "gorm.io/gorm"
  15. )
  16. type DataInsertNeckRingLog struct {
  17. NeckRingOriginalData []*model.NeckRingOriginal
  18. NeckRingErrorData []*model.NeckRingError
  19. }
  20. var (
  21. DSMLog = &DataInsertNeckRingLog{
  22. NeckRingOriginalData: make([]*model.NeckRingOriginal, 0),
  23. NeckRingErrorData: make([]*model.NeckRingError, 0),
  24. }
  25. mu sync.Mutex
  26. receiverMap map[string]int64 // 接收器数据
  27. receiverMutex sync.Mutex
  28. )
  29. // InitReceiverMapUpdater 2小时更新接收器数据
  30. func (e *Entry) InitReceiverMapUpdater() {
  31. go func() {
  32. ticker := time.NewTicker(2 * time.Hour)
  33. defer ticker.Stop()
  34. for {
  35. select {
  36. case <-ticker.C:
  37. receiverMutex.Lock()
  38. receiverMap = e.FindAppPastureReceiver()
  39. receiverMutex.Unlock()
  40. zaplog.Info("ReceiverMap updated successfully")
  41. }
  42. }
  43. }()
  44. }
  45. func (e *Entry) NeckRingHandle(data []byte) {
  46. receiverMutex.Lock()
  47. newReceiverMap := receiverMap
  48. receiverMutex.Unlock()
  49. newData := e.MsgDataFormat2(data, newReceiverMap)
  50. if newData == nil {
  51. return
  52. }
  53. if len(newData.NeckRingErrorData) > 0 || len(newData.NeckRingOriginalData) > 0 {
  54. // 写入数据
  55. if err := e.CreatedData(newData, data); err != nil {
  56. zaplog.Error("Failed to create data", zap.Any("err", err), zap.Any("dataList", newData))
  57. }
  58. }
  59. }
  60. func (e *Entry) FindAppPastureReceiver() map[string]int64 {
  61. appPastureReceiverList := make([]*model.AppPastureReceiver, 0)
  62. if err := e.DB.Model(new(model.AppPastureReceiver)).
  63. Find(&appPastureReceiverList).Error; err != nil {
  64. zaplog.Error("FindAppPastureReceiver", zap.Any("err", err))
  65. }
  66. newReceiverMap := make(map[string]int64)
  67. for _, v := range appPastureReceiverList {
  68. newReceiverMap[v.ReceiverNumber] = v.PastureId
  69. }
  70. return newReceiverMap
  71. }
  72. // 处理批量数据
  73. func (e *Entry) processBatch(batchList []*model.NeckRingOriginal) {
  74. // 初始化分类数据
  75. var (
  76. errorData []*model.NeckRingError
  77. originalData []*model.NeckRingOriginal
  78. )
  79. // 分类数据
  80. for _, batch := range batchList {
  81. // 异常脖环数据
  82. if ok := util.IsValidFrameId(batch.Frameid); !ok {
  83. var ed model.NeckRingError
  84. err := copier.Copy(&ed, &batch)
  85. if err != nil {
  86. zaplog.Error("processBatch", zap.Any("copier", err), zap.Any("data", batch))
  87. continue
  88. }
  89. errorData = append(errorData, &ed)
  90. } else {
  91. originalData = append(originalData, batch)
  92. }
  93. }
  94. // 更新日志
  95. DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, errorData...)
  96. DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, originalData...)
  97. // 写入数据
  98. if err := e.CreatedData(DSMLog, []byte{}); err != nil {
  99. zaplog.Error("Failed to create data", zap.Any("err", err), zap.Any("dataList", DSMLog))
  100. }
  101. // 清空日志
  102. DSMLog.NeckRingErrorData = DSMLog.NeckRingErrorData[:0]
  103. DSMLog.NeckRingOriginalData = DSMLog.NeckRingOriginalData[:0]
  104. }
  105. func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog, originalData []byte) error {
  106. if err := e.DB.Transaction(func(tx *gorm.DB) error {
  107. if len(DSMLog.NeckRingErrorData) > 0 {
  108. if err := e.DB.Model(new(model.NeckRingError)).
  109. Create(DSMLog.NeckRingErrorData).Error; err != nil {
  110. return xerr.WithStack(err)
  111. }
  112. }
  113. if len(DSMLog.NeckRingOriginalData) > 0 {
  114. // 批量写入数据
  115. if err := e.DB.Model(new(model.NeckRingOriginal)).
  116. Create(DSMLog.NeckRingOriginalData).Error; err != nil {
  117. return xerr.WithStack(err)
  118. }
  119. }
  120. return nil
  121. }); err != nil {
  122. return xerr.WithStack(err)
  123. }
  124. return nil
  125. }
  126. func (e *Entry) MsgDataFormat2(msg []byte, receiverMap map[string]int64) *DataInsertNeckRingLog {
  127. mu.Lock()
  128. defer mu.Unlock()
  129. // 深拷贝 msg,防止底层数据被修改
  130. msgCopy := make([]byte, len(msg))
  131. copy(msgCopy, msg)
  132. neckLogList := &model.NeckRingWrapper{}
  133. if err := json.Unmarshal(msgCopy, neckLogList); err != nil {
  134. zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msgCopy)))
  135. return nil
  136. }
  137. if neckLogList.Type == "heartbeat" {
  138. return nil
  139. }
  140. normalOriginal := make([]*model.NeckRingOriginal, 0)
  141. errorOriginal := make([]*model.NeckRingError, 0)
  142. for _, neckLog := range neckLogList.NeckRing.NeckPck {
  143. pastureId, ok := receiverMap[neckLog.Imei]
  144. if !ok {
  145. continue
  146. }
  147. newOriginal := model.NewNeckRingOriginal(neckLog, pastureId)
  148. if ok = util.IsValidFrameId(neckLog.Frameid); !ok {
  149. var ed model.NeckRingError
  150. if err := copier.Copy(&ed, &newOriginal); err != nil {
  151. zaplog.Error("MsgDataFormat2", zap.Any("copier", err), zap.Any("neckLog", neckLog))
  152. continue
  153. }
  154. errorOriginal = append(errorOriginal, &ed)
  155. } else {
  156. activeDate, hours := util.GetNeckRingActiveTimer(neckLog.Frameid)
  157. newOriginal.ActiveDate = activeDate
  158. newOriginal.Hours = int32(hours)
  159. normalOriginal = append(normalOriginal, newOriginal)
  160. }
  161. }
  162. return &DataInsertNeckRingLog{
  163. NeckRingErrorData: errorOriginal,
  164. NeckRingOriginalData: normalOriginal,
  165. }
  166. }
  167. func (e *Entry) MsgDataFormat(msg []byte) []*model.NeckRingOriginal {
  168. msgData := make(map[string]interface{})
  169. pairs := strings.Split(util.MsgFormat(string(msg)), " ")
  170. for _, pair := range pairs {
  171. parts := strings.SplitN(pair, ":", 2)
  172. if len(parts) != 2 {
  173. continue
  174. }
  175. key, value := parts[0], parts[1]
  176. if len(key) == 0 {
  177. continue
  178. }
  179. msgData[key] = value
  180. }
  181. softVer := int64(0)
  182. if softVerInter, ok := msgData["SOFT_VER"]; ok {
  183. if softVerstr, ok := softVerInter.(string); ok {
  184. softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
  185. }
  186. }
  187. if softVer <= 0 {
  188. if softVerInter, ok := msgData["soft_ver"]; ok {
  189. if softVerstr, ok := softVerInter.(string); ok {
  190. softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
  191. }
  192. }
  193. }
  194. uuid := ""
  195. if uuidInter, ok := msgData["uuid"]; ok {
  196. if uuidStr, ok := uuidInter.(string); ok {
  197. uuid = uuidStr
  198. }
  199. }
  200. frameId := int64(0)
  201. if frameIdInter, ok := msgData["frameid"]; ok {
  202. if frameId64, ok := frameIdInter.(string); ok {
  203. frameId, _ = strconv.ParseInt(frameId64, 10, 64)
  204. }
  205. }
  206. temp := float64(0)
  207. if tempInter, ok := msgData["Temp"]; ok {
  208. if tempFloat, ok := tempInter.(string); ok {
  209. temp, _ = strconv.ParseFloat(tempFloat, 64)
  210. }
  211. }
  212. if temp <= 0 {
  213. if tempInter, ok := msgData["temp"]; ok {
  214. if tempFloat, ok := tempInter.(string); ok {
  215. temp, _ = strconv.ParseFloat(tempFloat, 64)
  216. }
  217. }
  218. }
  219. imei := ""
  220. if imeiInter, ok := msgData["imei"]; ok {
  221. if imeiStr, ok := imeiInter.(string); ok {
  222. imei = imeiStr
  223. }
  224. }
  225. active := int64(0)
  226. if activeInter, ok := msgData["active"]; ok {
  227. if active32, ok := activeInter.(string); ok {
  228. active, _ = strconv.ParseInt(active32, 10, 64)
  229. }
  230. }
  231. inAction := int64(0)
  232. if inActionInter, ok := msgData["inactive"]; ok {
  233. if inAction32, ok := inActionInter.(string); ok {
  234. inAction, _ = strconv.ParseInt(inAction32, 10, 64)
  235. }
  236. }
  237. ruMina := int64(0)
  238. if ruMinaInter, ok := msgData["Rumina"]; ok {
  239. if ruMina32, ok := ruMinaInter.(string); ok {
  240. ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
  241. }
  242. }
  243. if ruMina <= 0 {
  244. if ruMinaInter, ok := msgData["rumina"]; ok {
  245. if ruMina32, ok := ruMinaInter.(string); ok {
  246. ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
  247. }
  248. }
  249. }
  250. intake := int64(0)
  251. if intakeInter, ok := msgData["Intake"]; ok {
  252. if intake32, ok := intakeInter.(string); ok {
  253. intake, _ = strconv.ParseInt(intake32, 10, 64)
  254. }
  255. }
  256. if intake <= 0 {
  257. if intakeInter, ok := msgData["intake"]; ok {
  258. if intake32, ok := intakeInter.(string); ok {
  259. intake, _ = strconv.ParseInt(intake32, 10, 64)
  260. }
  261. }
  262. }
  263. gasp := int64(0)
  264. if gaspInter, ok := msgData["gasp"]; ok {
  265. if gasp32, ok := gaspInter.(string); ok {
  266. gasp, _ = strconv.ParseInt(gasp32, 10, 64)
  267. }
  268. }
  269. reMain := int64(0)
  270. if reMainInter, ok := msgData["Remain"]; ok {
  271. if reMain32, ok := reMainInter.(string); ok {
  272. reMain, _ = strconv.ParseInt(reMain32, 10, 64)
  273. }
  274. }
  275. if ruMina <= 0 {
  276. if reMainInter, ok := msgData["remain"]; ok {
  277. if reMain32, ok := reMainInter.(string); ok {
  278. reMain, _ = strconv.ParseInt(reMain32, 10, 64)
  279. }
  280. }
  281. }
  282. /*cowId := ""
  283. if cowIdInter, ok := msgData["cowid"]; ok {
  284. if cowIdStr, ok := cowIdInter.(string); ok {
  285. cowId = cowIdStr
  286. }
  287. }
  288. csq := int64(0)
  289. if csqInter, ok := msgData["csq"]; ok {
  290. if csq32, ok := csqInter.(string); ok {
  291. csq, _ = strconv.ParseInt(csq32, 10, 64)
  292. }
  293. }
  294. other := int64(0)
  295. if otherInter, ok := msgData["other"]; ok {
  296. if other32, ok := otherInter.(string); ok {
  297. other, _ = strconv.ParseInt(other32, 10, 64)
  298. }
  299. }
  300. nccId := ""
  301. if nccIdInter, ok := msgData["nccid"]; ok {
  302. if nccIdStr, ok := nccIdInter.(string); ok {
  303. nccId = nccIdStr
  304. }
  305. }
  306. */
  307. return []*model.NeckRingOriginal{
  308. {
  309. FirmwareVersion: int32(softVer),
  310. Uuid: uuid,
  311. Frameid: int32(frameId),
  312. ReceiveNumber: imei,
  313. Active: int32(active),
  314. Inactive: int32(inAction),
  315. Rumina: int32(ruMina),
  316. Intake: int32(intake),
  317. Gasp: int32(gasp),
  318. Remain: int32(reMain),
  319. },
  320. }
  321. }