handle.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package mqtt
  2. import (
  3. "encoding/json"
  4. "kpt-pasture/model"
  5. "kpt-pasture/util"
  6. "strconv"
  7. "sync"
  8. pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
  9. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  10. "gitee.com/xuyiping_admin/pkg/xerr"
  11. "go.uber.org/zap"
  12. "gorm.io/gorm"
  13. )
  14. type DataInsertNeckRingLog struct {
  15. NeckRingOriginalData []*model.NeckRingOriginal
  16. NeckRingErrorData []*model.NeckRingOriginal
  17. NeckRingUnRegisterData []*model.NeckRingOriginal
  18. Mx *sync.RWMutex
  19. }
  20. var FrameId = map[int32]int32{
  21. 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 8: 8,
  22. 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 18: 18,
  23. 21: 21, 22: 22, 23: 23, 24: 24, 25: 25, 26: 26, 28: 28,
  24. 31: 31, 32: 32, 33: 33, 34: 34, 35: 35, 36: 36, 38: 38,
  25. 41: 41, 42: 42, 43: 43, 44: 44, 45: 45, 46: 46, 48: 48,
  26. 51: 51, 52: 52, 53: 53, 54: 54, 55: 55, 56: 56, 58: 58,
  27. 61: 61, 62: 62, 63: 63, 64: 64, 65: 65, 66: 66, 68: 68,
  28. 71: 71, 72: 72, 73: 73, 74: 74, 75: 75, 76: 76, 78: 78,
  29. 81: 81, 82: 82, 83: 83, 84: 84, 85: 85, 86: 86, 88: 88,
  30. 91: 91, 92: 92, 93: 93, 94: 94, 95: 95, 96: 96, 98: 98,
  31. 101: 101, 102: 102, 103: 103, 104: 104, 105: 105, 106: 106, 108: 108,
  32. 111: 111, 112: 112, 113: 113, 114: 114, 115: 115, 116: 116, 118: 118,
  33. }
  34. var (
  35. batchSize = 10
  36. batchList = make([]*model.NeckRingOriginal, 0, batchSize)
  37. )
  38. func (e *Entry) NeckRingHandle(data []byte) {
  39. DSMLog := &DataInsertNeckRingLog{
  40. NeckRingOriginalData: make([]*model.NeckRingOriginal, 0),
  41. NeckRingErrorData: make([]*model.NeckRingOriginal, 0),
  42. NeckRingUnRegisterData: make([]*model.NeckRingOriginal, 0),
  43. Mx: &sync.RWMutex{},
  44. }
  45. newData := e.MsgDataFormat(data)
  46. if newData != nil {
  47. batchList = append(batchList, newData)
  48. if len(batchList) >= batchSize {
  49. DSMLog.Mx.Lock()
  50. for _, batch := range batchList {
  51. // 异常脖环数据
  52. if _, ok := FrameId[batch.FrameId]; !ok {
  53. DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, batch)
  54. continue
  55. }
  56. // 未佩戴的脖环数据
  57. if ok := e.NeckRingIsBind(batch.Imei); !ok {
  58. DSMLog.NeckRingUnRegisterData = append(DSMLog.NeckRingUnRegisterData, batch)
  59. continue
  60. }
  61. // 正常脖环数据
  62. DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, batch)
  63. }
  64. if err := e.CreatedData(DSMLog); err != nil {
  65. zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
  66. }
  67. DSMLog.Mx.Unlock()
  68. DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0)
  69. DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0)
  70. DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0)
  71. batchList = batchList[:0]
  72. }
  73. }
  74. }
  75. func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
  76. if err := e.DB.Transaction(func(tx *gorm.DB) error {
  77. if len(DSMLog.NeckRingUnRegisterData) > 0 {
  78. if err := e.DB.Create(DSMLog.NeckRingErrorData).Error; err != nil {
  79. return xerr.WithStack(err)
  80. }
  81. }
  82. if len(DSMLog.NeckRingOriginalData) > 0 {
  83. if err := e.DB.Create(DSMLog.NeckRingOriginalData).Error; err != nil {
  84. return xerr.WithStack(err)
  85. }
  86. }
  87. if len(DSMLog.NeckRingUnRegisterData) > 0 {
  88. if err := e.DB.Create(DSMLog.NeckRingUnRegisterData).Error; err != nil {
  89. return xerr.WithStack(err)
  90. }
  91. }
  92. return nil
  93. }); err != nil {
  94. return xerr.WithStack(err)
  95. }
  96. return nil
  97. }
  98. func (e *Entry) MsgDataFormat(msg []byte) *model.NeckRingOriginal {
  99. neckLog := &Behavior{}
  100. if err := json.Unmarshal(msg, neckLog); err != nil {
  101. zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
  102. }
  103. if neckLog.Imei != "" {
  104. // 存储到数据库
  105. activeDate, hours := util.GetNeckRingActiveTimer(neckLog.FrameId)
  106. voltage, _ := strconv.ParseInt(strconv.FormatInt(int64(neckLog.BAT), 16), 10, 64)
  107. activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes
  108. if neckLog.FrameId%10 == 8 {
  109. activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours
  110. }
  111. return &model.NeckRingOriginal{
  112. Uuid: neckLog.UUID,
  113. Imei: neckLog.ECowId,
  114. ActiveDate: activeDate,
  115. Hours: int32(hours),
  116. FrameId: neckLog.FrameId,
  117. Rumina: neckLog.RuMina,
  118. Intake: neckLog.Intake,
  119. Inactive: neckLog.Inactive,
  120. Other: neckLog.Other,
  121. High: neckLog.Activitys,
  122. Active: neckLog.High,
  123. Voltage: int32(voltage),
  124. Version: neckLog.Sver,
  125. Remain: neckLog.Remain,
  126. ReceiveNumber: neckLog.Imei,
  127. ActiveDateType: activeDateTimeType,
  128. }
  129. }
  130. return nil
  131. }