mqtt_handle.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. package mqtt
  2. import (
  3. "encoding/json"
  4. "kpt-pasture/model"
  5. "kpt-pasture/util"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/jinzhu/copier"
  11. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  12. "gitee.com/xuyiping_admin/pkg/xerr"
  13. "go.uber.org/zap"
  14. "gorm.io/gorm"
  15. )
  16. type DataInsertNeckRingLog struct {
  17. NeckRingOriginalData []*model.NeckRingOriginal
  18. NeckRingErrorData []*model.NeckRingError
  19. }
  20. var (
  21. DSMLog = &DataInsertNeckRingLog{
  22. NeckRingOriginalData: make([]*model.NeckRingOriginal, 0),
  23. NeckRingErrorData: make([]*model.NeckRingError, 0),
  24. }
  25. mu sync.Mutex
  26. receiverMap map[string]int64 // 接收器数据
  27. receiverMutex sync.Mutex
  28. )
  29. // InitReceiverMapUpdater 2小时更新接收器数据
  30. func (e *Entry) InitReceiverMapUpdater() {
  31. go func() {
  32. ticker := time.NewTicker(2 * time.Hour)
  33. defer ticker.Stop()
  34. zaplog.Info("ReceiverMap updated successfully1")
  35. for {
  36. select {
  37. case <-ticker.C:
  38. receiverMutex.Lock()
  39. receiverMap = e.FindAppPastureReceiver()
  40. receiverMutex.Unlock()
  41. zaplog.Info("ReceiverMap updated successfully2")
  42. }
  43. }
  44. }()
  45. }
  46. func (e *Entry) NeckRingHandle(data []byte) {
  47. newReceiverMap := receiverMap
  48. newData := e.MsgDataFormat2(data, newReceiverMap)
  49. if newData == nil {
  50. return
  51. }
  52. if len(newData.NeckRingErrorData) > 0 || len(newData.NeckRingOriginalData) > 0 {
  53. // 写入数据
  54. if err := e.CreatedData(newData, data); err != nil {
  55. zaplog.Error("Failed to create data", zap.Any("err", err), zap.Any("dataList", newData))
  56. }
  57. }
  58. }
  59. func (e *Entry) FindAppPastureReceiver() map[string]int64 {
  60. appPastureReceiverList := make([]*model.AppPastureReceiver, 0)
  61. if err := e.DB.Model(new(model.AppPastureReceiver)).
  62. Find(&appPastureReceiverList).Error; err != nil {
  63. zaplog.Error("FindAppPastureReceiver", zap.Any("err", err))
  64. }
  65. newReceiverMap := make(map[string]int64)
  66. for _, v := range appPastureReceiverList {
  67. newReceiverMap[v.ReceiverNumber] = v.PastureId
  68. }
  69. return newReceiverMap
  70. }
  71. // 处理批量数据
  72. func (e *Entry) processBatch(batchList []*model.NeckRingOriginal) {
  73. // 初始化分类数据
  74. var (
  75. errorData []*model.NeckRingError
  76. originalData []*model.NeckRingOriginal
  77. )
  78. // 分类数据
  79. for _, batch := range batchList {
  80. // 异常脖环数据
  81. if ok := util.IsValidFrameId(batch.Frameid); !ok {
  82. var ed model.NeckRingError
  83. err := copier.Copy(&ed, &batch)
  84. if err != nil {
  85. zaplog.Error("processBatch", zap.Any("copier", err), zap.Any("data", batch))
  86. continue
  87. }
  88. errorData = append(errorData, &ed)
  89. } else {
  90. originalData = append(originalData, batch)
  91. }
  92. }
  93. // 更新日志
  94. DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, errorData...)
  95. DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, originalData...)
  96. // 写入数据
  97. if err := e.CreatedData(DSMLog, []byte{}); err != nil {
  98. zaplog.Error("Failed to create data", zap.Any("err", err), zap.Any("dataList", DSMLog))
  99. }
  100. // 清空日志
  101. DSMLog.NeckRingErrorData = DSMLog.NeckRingErrorData[:0]
  102. DSMLog.NeckRingOriginalData = DSMLog.NeckRingOriginalData[:0]
  103. }
  104. func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog, originalData []byte) error {
  105. if err := e.DB.Transaction(func(tx *gorm.DB) error {
  106. if len(DSMLog.NeckRingErrorData) > 0 {
  107. if err := e.DB.Model(new(model.NeckRingError)).
  108. Create(DSMLog.NeckRingErrorData).Error; err != nil {
  109. return xerr.WithStack(err)
  110. }
  111. }
  112. if len(DSMLog.NeckRingOriginalData) > 0 {
  113. // 批量写入数据
  114. if err := e.DB.Model(new(model.NeckRingOriginal)).
  115. Create(DSMLog.NeckRingOriginalData).Error; err != nil {
  116. return xerr.WithStack(err)
  117. }
  118. }
  119. return nil
  120. }); err != nil {
  121. return xerr.WithStack(err)
  122. }
  123. return nil
  124. }
  125. func (e *Entry) MsgDataFormat2(msg []byte, receiverMap map[string]int64) *DataInsertNeckRingLog {
  126. mu.Lock()
  127. defer mu.Unlock()
  128. // 深拷贝 msg,防止底层数据被修改
  129. msgCopy := make([]byte, len(msg))
  130. copy(msgCopy, msg)
  131. neckLogList := &model.NeckRingWrapper{}
  132. if err := json.Unmarshal(msgCopy, neckLogList); err != nil {
  133. zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msgCopy)))
  134. return nil
  135. }
  136. if neckLogList.Type == "heartbeat" {
  137. return nil
  138. }
  139. normalOriginal := make([]*model.NeckRingOriginal, 0)
  140. errorOriginal := make([]*model.NeckRingError, 0)
  141. for _, neckLog := range neckLogList.NeckRing.NeckPck {
  142. pastureId, ok := receiverMap[neckLog.Imei]
  143. if !ok {
  144. continue
  145. }
  146. newOriginal := model.NewNeckRingOriginal(neckLog, pastureId)
  147. if ok = util.IsValidFrameId(neckLog.Frameid); !ok {
  148. var ed model.NeckRingError
  149. if err := copier.Copy(&ed, &newOriginal); err != nil {
  150. zaplog.Error("MsgDataFormat2", zap.Any("copier", err), zap.Any("neckLog", neckLog))
  151. continue
  152. }
  153. errorOriginal = append(errorOriginal, &ed)
  154. } else {
  155. activeDate, hours := util.GetNeckRingActiveTimer(neckLog.Frameid)
  156. newOriginal.ActiveDate = activeDate
  157. newOriginal.Hours = int32(hours)
  158. normalOriginal = append(normalOriginal, newOriginal)
  159. }
  160. }
  161. return &DataInsertNeckRingLog{
  162. NeckRingErrorData: errorOriginal,
  163. NeckRingOriginalData: normalOriginal,
  164. }
  165. }
  166. func (e *Entry) MsgDataFormat(msg []byte) []*model.NeckRingOriginal {
  167. msgData := make(map[string]interface{})
  168. pairs := strings.Split(util.MsgFormat(string(msg)), " ")
  169. for _, pair := range pairs {
  170. parts := strings.SplitN(pair, ":", 2)
  171. if len(parts) != 2 {
  172. continue
  173. }
  174. key, value := parts[0], parts[1]
  175. if len(key) == 0 {
  176. continue
  177. }
  178. msgData[key] = value
  179. }
  180. softVer := int64(0)
  181. if softVerInter, ok := msgData["SOFT_VER"]; ok {
  182. if softVerstr, ok := softVerInter.(string); ok {
  183. softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
  184. }
  185. }
  186. if softVer <= 0 {
  187. if softVerInter, ok := msgData["soft_ver"]; ok {
  188. if softVerstr, ok := softVerInter.(string); ok {
  189. softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
  190. }
  191. }
  192. }
  193. uuid := ""
  194. if uuidInter, ok := msgData["uuid"]; ok {
  195. if uuidStr, ok := uuidInter.(string); ok {
  196. uuid = uuidStr
  197. }
  198. }
  199. frameId := int64(0)
  200. if frameIdInter, ok := msgData["frameid"]; ok {
  201. if frameId64, ok := frameIdInter.(string); ok {
  202. frameId, _ = strconv.ParseInt(frameId64, 10, 64)
  203. }
  204. }
  205. temp := float64(0)
  206. if tempInter, ok := msgData["Temp"]; ok {
  207. if tempFloat, ok := tempInter.(string); ok {
  208. temp, _ = strconv.ParseFloat(tempFloat, 64)
  209. }
  210. }
  211. if temp <= 0 {
  212. if tempInter, ok := msgData["temp"]; ok {
  213. if tempFloat, ok := tempInter.(string); ok {
  214. temp, _ = strconv.ParseFloat(tempFloat, 64)
  215. }
  216. }
  217. }
  218. imei := ""
  219. if imeiInter, ok := msgData["imei"]; ok {
  220. if imeiStr, ok := imeiInter.(string); ok {
  221. imei = imeiStr
  222. }
  223. }
  224. active := int64(0)
  225. if activeInter, ok := msgData["active"]; ok {
  226. if active32, ok := activeInter.(string); ok {
  227. active, _ = strconv.ParseInt(active32, 10, 64)
  228. }
  229. }
  230. inAction := int64(0)
  231. if inActionInter, ok := msgData["inactive"]; ok {
  232. if inAction32, ok := inActionInter.(string); ok {
  233. inAction, _ = strconv.ParseInt(inAction32, 10, 64)
  234. }
  235. }
  236. ruMina := int64(0)
  237. if ruMinaInter, ok := msgData["Rumina"]; ok {
  238. if ruMina32, ok := ruMinaInter.(string); ok {
  239. ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
  240. }
  241. }
  242. if ruMina <= 0 {
  243. if ruMinaInter, ok := msgData["rumina"]; ok {
  244. if ruMina32, ok := ruMinaInter.(string); ok {
  245. ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
  246. }
  247. }
  248. }
  249. intake := int64(0)
  250. if intakeInter, ok := msgData["Intake"]; ok {
  251. if intake32, ok := intakeInter.(string); ok {
  252. intake, _ = strconv.ParseInt(intake32, 10, 64)
  253. }
  254. }
  255. if intake <= 0 {
  256. if intakeInter, ok := msgData["intake"]; ok {
  257. if intake32, ok := intakeInter.(string); ok {
  258. intake, _ = strconv.ParseInt(intake32, 10, 64)
  259. }
  260. }
  261. }
  262. gasp := int64(0)
  263. if gaspInter, ok := msgData["gasp"]; ok {
  264. if gasp32, ok := gaspInter.(string); ok {
  265. gasp, _ = strconv.ParseInt(gasp32, 10, 64)
  266. }
  267. }
  268. reMain := int64(0)
  269. if reMainInter, ok := msgData["Remain"]; ok {
  270. if reMain32, ok := reMainInter.(string); ok {
  271. reMain, _ = strconv.ParseInt(reMain32, 10, 64)
  272. }
  273. }
  274. if ruMina <= 0 {
  275. if reMainInter, ok := msgData["remain"]; ok {
  276. if reMain32, ok := reMainInter.(string); ok {
  277. reMain, _ = strconv.ParseInt(reMain32, 10, 64)
  278. }
  279. }
  280. }
  281. /*cowId := ""
  282. if cowIdInter, ok := msgData["cowid"]; ok {
  283. if cowIdStr, ok := cowIdInter.(string); ok {
  284. cowId = cowIdStr
  285. }
  286. }
  287. csq := int64(0)
  288. if csqInter, ok := msgData["csq"]; ok {
  289. if csq32, ok := csqInter.(string); ok {
  290. csq, _ = strconv.ParseInt(csq32, 10, 64)
  291. }
  292. }
  293. other := int64(0)
  294. if otherInter, ok := msgData["other"]; ok {
  295. if other32, ok := otherInter.(string); ok {
  296. other, _ = strconv.ParseInt(other32, 10, 64)
  297. }
  298. }
  299. nccId := ""
  300. if nccIdInter, ok := msgData["nccid"]; ok {
  301. if nccIdStr, ok := nccIdInter.(string); ok {
  302. nccId = nccIdStr
  303. }
  304. }
  305. */
  306. return []*model.NeckRingOriginal{
  307. {
  308. FirmwareVersion: int32(softVer),
  309. Uuid: uuid,
  310. Frameid: int32(frameId),
  311. ReceiveNumber: imei,
  312. Active: int32(active),
  313. Inactive: int32(inAction),
  314. Rumina: int32(ruMina),
  315. Intake: int32(intake),
  316. Gasp: int32(gasp),
  317. Remain: int32(reMain),
  318. },
  319. }
  320. }