mqtt.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package mqtt
  2. import (
  3. "context"
  4. "encoding/json"
  5. "kpt-pasture/model"
  6. "kpt-pasture/util"
  7. "os"
  8. "os/signal"
  9. "strconv"
  10. "sync"
  11. "syscall"
  12. "time"
  13. pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
  14. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  15. "gitee.com/xuyiping_admin/pkg/xerr"
  16. "go.uber.org/zap"
  17. "gorm.io/gorm"
  18. )
  19. type DataInsertNeckRingLog struct {
  20. NeckRingOriginalData []*model.NeckRingOriginal
  21. NeckRingErrorData []*model.NeckRingOriginal
  22. NeckRingUnRegisterData []*model.NeckRingOriginal
  23. Mx *sync.RWMutex
  24. }
  25. var FrameId = map[int32]int32{
  26. 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 8: 8,
  27. 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 18: 18,
  28. 21: 21, 22: 22, 23: 23, 24: 24, 25: 25, 26: 26, 28: 28,
  29. 31: 31, 32: 32, 33: 33, 34: 34, 35: 35, 36: 36, 38: 38,
  30. 41: 41, 42: 42, 43: 43, 44: 44, 45: 45, 46: 46, 48: 48,
  31. 51: 51, 52: 52, 53: 53, 54: 54, 55: 55, 56: 56, 58: 58,
  32. 61: 61, 62: 62, 63: 63, 64: 64, 65: 65, 66: 66, 68: 68,
  33. 71: 71, 72: 72, 73: 73, 74: 74, 75: 75, 76: 76, 78: 78,
  34. 81: 81, 82: 82, 83: 83, 84: 84, 85: 85, 86: 86, 88: 88,
  35. 91: 91, 92: 92, 93: 93, 94: 94, 95: 95, 96: 96, 98: 98,
  36. 101: 101, 102: 102, 103: 103, 104: 104, 105: 105, 106: 106, 108: 108,
  37. 111: 111, 112: 112, 113: 113, 114: 114, 115: 115, 116: 116, 118: 118,
  38. }
  39. func (e *Entry) Subscribe(ctx context.Context, msg <-chan []byte) {
  40. DSMLog := &DataInsertNeckRingLog{
  41. NeckRingOriginalData: make([]*model.NeckRingOriginal, 0),
  42. NeckRingErrorData: make([]*model.NeckRingOriginal, 0),
  43. NeckRingUnRegisterData: make([]*model.NeckRingOriginal, 0),
  44. Mx: &sync.RWMutex{},
  45. }
  46. batchSize := 20
  47. batchList := make([]*model.NeckRingOriginal, 0, batchSize)
  48. sc := make(chan os.Signal, 1)
  49. signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  50. // 设置5分钟超时
  51. tc := time.After(5 * time.Minute)
  52. for {
  53. select {
  54. case data := <-msg:
  55. newData, err := e.MsgDataFormat(data)
  56. if err != nil {
  57. continue
  58. }
  59. batchList = append(batchList, newData...)
  60. if len(batchList) >= batchSize {
  61. DSMLog.Mx.Lock()
  62. for _, batch := range batchList {
  63. // 异常脖环数据
  64. if _, ok := FrameId[batch.FrameId]; !ok {
  65. DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, batch)
  66. continue
  67. }
  68. // 未佩戴的脖环数据
  69. if ok := e.NeckRingIsBind(batch.Imei); !ok {
  70. DSMLog.NeckRingUnRegisterData = append(DSMLog.NeckRingUnRegisterData, batch)
  71. continue
  72. }
  73. // 正常脖环数据
  74. DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, batch)
  75. }
  76. if err = e.CreatedData(DSMLog); err != nil {
  77. zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
  78. }
  79. DSMLog.Mx.Unlock()
  80. DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0)
  81. DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0)
  82. DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0)
  83. batchList = batchList[:0]
  84. }
  85. // 优雅退出
  86. case <-sc:
  87. if err := e.CreatedData(DSMLog); err != nil {
  88. zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
  89. }
  90. case <-tc:
  91. if err := e.CreatedData(DSMLog); err != nil {
  92. zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
  93. }
  94. zaplog.Info("subMsgChan-os", zap.Any("success", DSMLog.NeckRingOriginalData))
  95. DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0)
  96. DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0)
  97. DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0)
  98. }
  99. }
  100. }
  101. func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
  102. if err := e.DB.Transaction(func(tx *gorm.DB) error {
  103. if len(DSMLog.NeckRingUnRegisterData) > 0 {
  104. if err := e.DB.Create(DSMLog.NeckRingErrorData).Error; err != nil {
  105. return xerr.WithStack(err)
  106. }
  107. }
  108. if len(DSMLog.NeckRingOriginalData) > 0 {
  109. if err := e.DB.Create(DSMLog.NeckRingOriginalData).Error; err != nil {
  110. return xerr.WithStack(err)
  111. }
  112. }
  113. if len(DSMLog.NeckRingUnRegisterData) > 0 {
  114. if err := e.DB.Create(DSMLog.NeckRingUnRegisterData).Error; err != nil {
  115. return xerr.WithStack(err)
  116. }
  117. }
  118. return nil
  119. }); err != nil {
  120. return xerr.WithStack(err)
  121. }
  122. return nil
  123. }
  124. func (e *Entry) MsgDataFormat(msg []byte) ([]*model.NeckRingOriginal, error) {
  125. neckLog := &NeckRingWrapper{}
  126. if err := json.Unmarshal(msg, neckLog); err != nil {
  127. zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
  128. }
  129. batchList := make([]*model.NeckRingOriginal, 0)
  130. if len(neckLog.NeckRing.NeckPck) > 0 {
  131. for _, v := range neckLog.NeckRing.NeckPck {
  132. // 存储到数据库
  133. activeDate, hours := util.GetNeckRingActiveTimer(v.FrameId)
  134. voltage, _ := strconv.ParseInt(strconv.FormatInt(int64(v.BAT), 16), 10, 64)
  135. activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes
  136. if v.FrameId%10 == 8 {
  137. activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours
  138. }
  139. newData := &model.NeckRingOriginal{
  140. Uuid: v.UUID,
  141. Imei: v.ECowId,
  142. ActiveDate: activeDate,
  143. Hours: int32(hours),
  144. FrameId: v.FrameId,
  145. Rumina: v.RuMina,
  146. Intake: v.Intake,
  147. Inactive: v.Inactive,
  148. Other: v.Other,
  149. High: v.Activitys,
  150. Active: v.High,
  151. Voltage: int32(voltage),
  152. Version: v.Sver,
  153. Remain: v.Remain,
  154. ReceiveNumber: v.Imei,
  155. ActiveDateType: activeDateTimeType,
  156. }
  157. batchList = append(batchList, newData)
  158. }
  159. }
  160. return batchList, nil
  161. }