handle.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package mqtt
  2. import (
  3. "encoding/json"
  4. "kpt-pasture/model"
  5. "kpt-pasture/util"
  6. "strconv"
  7. "sync"
  8. pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
  9. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  10. "gitee.com/xuyiping_admin/pkg/xerr"
  11. "go.uber.org/zap"
  12. "gorm.io/gorm"
  13. )
  14. type DataInsertNeckRingLog struct {
  15. NeckRingOriginalData []*model.NeckRingOriginal
  16. NeckRingErrorData []*model.NeckRingOriginal
  17. NeckRingUnRegisterData []*model.NeckRingOriginal
  18. Mx *sync.RWMutex
  19. }
  20. var FrameId = map[int32]int32{
  21. 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 8: 8,
  22. 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 18: 18,
  23. 21: 21, 22: 22, 23: 23, 24: 24, 25: 25, 26: 26, 28: 28,
  24. 31: 31, 32: 32, 33: 33, 34: 34, 35: 35, 36: 36, 38: 38,
  25. 41: 41, 42: 42, 43: 43, 44: 44, 45: 45, 46: 46, 48: 48,
  26. 51: 51, 52: 52, 53: 53, 54: 54, 55: 55, 56: 56, 58: 58,
  27. 61: 61, 62: 62, 63: 63, 64: 64, 65: 65, 66: 66, 68: 68,
  28. 71: 71, 72: 72, 73: 73, 74: 74, 75: 75, 76: 76, 78: 78,
  29. 81: 81, 82: 82, 83: 83, 84: 84, 85: 85, 86: 86, 88: 88,
  30. 91: 91, 92: 92, 93: 93, 94: 94, 95: 95, 96: 96, 98: 98,
  31. 101: 101, 102: 102, 103: 103, 104: 104, 105: 105, 106: 106, 108: 108,
  32. 111: 111, 112: 112, 113: 113, 114: 114, 115: 115, 116: 116, 118: 118,
  33. }
  34. var (
  35. batchSize = 10
  36. batchList = make([]*model.NeckRingOriginal, 0, batchSize)
  37. )
  38. func (e *Entry) NeckRingHandle(data []byte) {
  39. DSMLog := &DataInsertNeckRingLog{
  40. NeckRingOriginalData: make([]*model.NeckRingOriginal, 0),
  41. NeckRingErrorData: make([]*model.NeckRingOriginal, 0),
  42. NeckRingUnRegisterData: make([]*model.NeckRingOriginal, 0),
  43. Mx: &sync.RWMutex{},
  44. }
  45. newData, _ := e.MsgDataFormat(data)
  46. if newData != nil && len(newData) > 0 {
  47. batchList = append(batchList, newData...)
  48. if len(batchList) >= batchSize {
  49. DSMLog.Mx.Lock()
  50. for _, batch := range batchList {
  51. // 异常脖环数据
  52. if _, ok := FrameId[batch.FrameId]; !ok {
  53. DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, batch)
  54. continue
  55. }
  56. // 未佩戴的脖环数据
  57. if ok := e.NeckRingIsBind(batch.Imei); !ok {
  58. DSMLog.NeckRingUnRegisterData = append(DSMLog.NeckRingUnRegisterData, batch)
  59. continue
  60. }
  61. // 正常脖环数据
  62. DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, batch)
  63. }
  64. if err := e.CreatedData(DSMLog); err != nil {
  65. zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
  66. }
  67. DSMLog.Mx.Unlock()
  68. DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0)
  69. DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0)
  70. DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0)
  71. batchList = batchList[:0]
  72. }
  73. }
  74. }
  75. func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
  76. if err := e.DB.Transaction(func(tx *gorm.DB) error {
  77. if len(DSMLog.NeckRingUnRegisterData) > 0 {
  78. if err := e.DB.Create(DSMLog.NeckRingErrorData).Error; err != nil {
  79. return xerr.WithStack(err)
  80. }
  81. }
  82. if len(DSMLog.NeckRingOriginalData) > 0 {
  83. if err := e.DB.Create(DSMLog.NeckRingOriginalData).Error; err != nil {
  84. return xerr.WithStack(err)
  85. }
  86. }
  87. if len(DSMLog.NeckRingUnRegisterData) > 0 {
  88. if err := e.DB.Create(DSMLog.NeckRingUnRegisterData).Error; err != nil {
  89. return xerr.WithStack(err)
  90. }
  91. }
  92. return nil
  93. }); err != nil {
  94. return xerr.WithStack(err)
  95. }
  96. return nil
  97. }
  98. func (e *Entry) MsgDataFormat(msg []byte) ([]*model.NeckRingOriginal, error) {
  99. neckLog := &NeckRingWrapper{}
  100. if err := json.Unmarshal(msg, neckLog); err != nil {
  101. zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
  102. }
  103. batchList = make([]*model.NeckRingOriginal, 0)
  104. if len(neckLog.NeckRing.NeckPck) > 0 {
  105. for _, v := range neckLog.NeckRing.NeckPck {
  106. // 存储到数据库
  107. activeDate, hours := util.GetNeckRingActiveTimer(v.FrameId)
  108. voltage, _ := strconv.ParseInt(strconv.FormatInt(int64(v.BAT), 16), 10, 64)
  109. activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes
  110. if v.FrameId%10 == 8 {
  111. activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours
  112. }
  113. newData := &model.NeckRingOriginal{
  114. Uuid: v.UUID,
  115. Imei: v.ECowId,
  116. ActiveDate: activeDate,
  117. Hours: int32(hours),
  118. FrameId: v.FrameId,
  119. Rumina: v.RuMina,
  120. Intake: v.Intake,
  121. Inactive: v.Inactive,
  122. Other: v.Other,
  123. High: v.Activitys,
  124. Active: v.High,
  125. Voltage: int32(voltage),
  126. Version: v.Sver,
  127. Remain: v.Remain,
  128. ReceiveNumber: v.Imei,
  129. ActiveDateType: activeDateTimeType,
  130. }
  131. batchList = append(batchList, newData)
  132. }
  133. }
  134. return batchList, nil
  135. }