handle.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. package mqtt
  2. import (
  3. "encoding/json"
  4. "kpt-pasture/model"
  5. "kpt-pasture/util"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
  10. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  11. "gitee.com/xuyiping_admin/pkg/xerr"
  12. "go.uber.org/zap"
  13. "gorm.io/gorm"
  14. )
  15. type DataInsertNeckRingLog struct {
  16. NeckRingOriginalData []*model.NeckRingOriginal
  17. NeckRingErrorData []*model.NeckRingOriginal
  18. NeckRingUnRegisterData []*model.NeckRingOriginal
  19. Mx *sync.RWMutex
  20. }
  21. var FrameId = map[int32]int32{
  22. 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 8: 8,
  23. 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 18: 18,
  24. 21: 21, 22: 22, 23: 23, 24: 24, 25: 25, 26: 26, 28: 28,
  25. 31: 31, 32: 32, 33: 33, 34: 34, 35: 35, 36: 36, 38: 38,
  26. 41: 41, 42: 42, 43: 43, 44: 44, 45: 45, 46: 46, 48: 48,
  27. 51: 51, 52: 52, 53: 53, 54: 54, 55: 55, 56: 56, 58: 58,
  28. 61: 61, 62: 62, 63: 63, 64: 64, 65: 65, 66: 66, 68: 68,
  29. 71: 71, 72: 72, 73: 73, 74: 74, 75: 75, 76: 76, 78: 78,
  30. 81: 81, 82: 82, 83: 83, 84: 84, 85: 85, 86: 86, 88: 88,
  31. 91: 91, 92: 92, 93: 93, 94: 94, 95: 95, 96: 96, 98: 98,
  32. 101: 101, 102: 102, 103: 103, 104: 104, 105: 105, 106: 106, 108: 108,
  33. 111: 111, 112: 112, 113: 113, 114: 114, 115: 115, 116: 116, 118: 118,
  34. }
  35. var (
  36. batchSize = 10
  37. batchList = make([]*model.NeckRingOriginal, 0, batchSize)
  38. )
  39. func (e *Entry) NeckRingHandle(data []byte) {
  40. DSMLog := &DataInsertNeckRingLog{
  41. NeckRingOriginalData: make([]*model.NeckRingOriginal, 0),
  42. NeckRingErrorData: make([]*model.NeckRingOriginal, 0),
  43. NeckRingUnRegisterData: make([]*model.NeckRingOriginal, 0),
  44. Mx: &sync.RWMutex{},
  45. }
  46. newData := e.MsgDataFormat(data)
  47. if newData != nil {
  48. batchList = append(batchList, newData)
  49. if len(batchList) >= batchSize {
  50. DSMLog.Mx.Lock()
  51. for _, batch := range batchList {
  52. // 异常脖环数据
  53. if _, ok := FrameId[batch.FrameId]; !ok {
  54. DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, batch)
  55. continue
  56. }
  57. // 未佩戴的脖环数据
  58. if ok := e.NeckRingIsBind(batch.Imei); !ok {
  59. DSMLog.NeckRingUnRegisterData = append(DSMLog.NeckRingUnRegisterData, batch)
  60. continue
  61. }
  62. // 正常脖环数据
  63. DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, batch)
  64. }
  65. if err := e.CreatedData(DSMLog); err != nil {
  66. zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
  67. }
  68. DSMLog.Mx.Unlock()
  69. DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0)
  70. DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0)
  71. DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0)
  72. batchList = batchList[:0]
  73. }
  74. }
  75. }
  76. func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
  77. if err := e.DB.Transaction(func(tx *gorm.DB) error {
  78. if len(DSMLog.NeckRingErrorData) > 0 {
  79. if err := e.DB.Create(DSMLog.NeckRingErrorData).Error; err != nil {
  80. return xerr.WithStack(err)
  81. }
  82. }
  83. if len(DSMLog.NeckRingOriginalData) > 0 {
  84. if err := e.DB.Create(DSMLog.NeckRingOriginalData).Error; err != nil {
  85. return xerr.WithStack(err)
  86. }
  87. }
  88. if len(DSMLog.NeckRingUnRegisterData) > 0 {
  89. if err := e.DB.Create(DSMLog.NeckRingUnRegisterData).Error; err != nil {
  90. return xerr.WithStack(err)
  91. }
  92. }
  93. return nil
  94. }); err != nil {
  95. return xerr.WithStack(err)
  96. }
  97. return nil
  98. }
  99. func (e *Entry) MsgDataFormat(msg []byte) *model.NeckRingOriginal {
  100. msgData := make(map[string]interface{})
  101. pairs := strings.Split(util.MsgFormat(string(msg)), " ")
  102. for _, pair := range pairs {
  103. parts := strings.SplitN(pair, ":", 2)
  104. if len(parts) != 2 {
  105. continue
  106. }
  107. key, value := parts[0], parts[1]
  108. if len(key) == 0 {
  109. continue
  110. }
  111. msgData[key] = value
  112. }
  113. softVer := int64(0)
  114. if softVerInter, ok := msgData["SOFT_VER"]; ok {
  115. if softVerstr, ok := softVerInter.(string); ok {
  116. softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
  117. }
  118. }
  119. if softVer <= 0 {
  120. if softVerInter, ok := msgData["soft_ver"]; ok {
  121. if softVerstr, ok := softVerInter.(string); ok {
  122. softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
  123. }
  124. }
  125. }
  126. uuid := ""
  127. if uuidInter, ok := msgData["uuid"]; ok {
  128. if uuidStr, ok := uuidInter.(string); ok {
  129. uuid = uuidStr
  130. }
  131. }
  132. frameId := int64(0)
  133. if frameIdInter, ok := msgData["frameid"]; ok {
  134. if frameId64, ok := frameIdInter.(string); ok {
  135. frameId, _ = strconv.ParseInt(frameId64, 10, 64)
  136. }
  137. }
  138. cowId := ""
  139. if cowIdInter, ok := msgData["cowid"]; ok {
  140. if cowIdStr, ok := cowIdInter.(string); ok {
  141. cowId = cowIdStr
  142. }
  143. }
  144. csq := int64(0)
  145. if csqInter, ok := msgData["csq"]; ok {
  146. if csq32, ok := csqInter.(string); ok {
  147. csq, _ = strconv.ParseInt(csq32, 10, 64)
  148. }
  149. }
  150. temp := float64(0)
  151. if tempInter, ok := msgData["Temp"]; ok {
  152. if tempFloat, ok := tempInter.(string); ok {
  153. temp, _ = strconv.ParseFloat(tempFloat, 64)
  154. }
  155. }
  156. if temp <= 0 {
  157. if tempInter, ok := msgData["temp"]; ok {
  158. if tempFloat, ok := tempInter.(string); ok {
  159. temp, _ = strconv.ParseFloat(tempFloat, 64)
  160. }
  161. }
  162. }
  163. imei := ""
  164. if imeiInter, ok := msgData["imei"]; ok {
  165. if imeiStr, ok := imeiInter.(string); ok {
  166. imei = imeiStr
  167. }
  168. }
  169. active := int64(0)
  170. if activeInter, ok := msgData["active"]; ok {
  171. if active32, ok := activeInter.(string); ok {
  172. active, _ = strconv.ParseInt(active32, 10, 64)
  173. }
  174. }
  175. inAction := int64(0)
  176. if inActionInter, ok := msgData["inactive"]; ok {
  177. if inAction32, ok := inActionInter.(string); ok {
  178. inAction, _ = strconv.ParseInt(inAction32, 10, 64)
  179. }
  180. }
  181. ruMina := int64(0)
  182. if ruMinaInter, ok := msgData["Rumina"]; ok {
  183. if ruMina32, ok := ruMinaInter.(string); ok {
  184. ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
  185. }
  186. }
  187. if ruMina <= 0 {
  188. if ruMinaInter, ok := msgData["rumina"]; ok {
  189. if ruMina32, ok := ruMinaInter.(string); ok {
  190. ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
  191. }
  192. }
  193. }
  194. intake := int64(0)
  195. if intakeInter, ok := msgData["Intake"]; ok {
  196. if intake32, ok := intakeInter.(string); ok {
  197. intake, _ = strconv.ParseInt(intake32, 10, 64)
  198. }
  199. }
  200. if intake <= 0 {
  201. if intakeInter, ok := msgData["intake"]; ok {
  202. if intake32, ok := intakeInter.(string); ok {
  203. intake, _ = strconv.ParseInt(intake32, 10, 64)
  204. }
  205. }
  206. }
  207. gasp := int64(0)
  208. if gaspInter, ok := msgData["gasp"]; ok {
  209. if gasp32, ok := gaspInter.(string); ok {
  210. gasp, _ = strconv.ParseInt(gasp32, 10, 64)
  211. }
  212. }
  213. other := int64(0)
  214. if otherInter, ok := msgData["other"]; ok {
  215. if other32, ok := otherInter.(string); ok {
  216. other, _ = strconv.ParseInt(other32, 10, 64)
  217. }
  218. }
  219. reMain := int64(0)
  220. if reMainInter, ok := msgData["Remain"]; ok {
  221. if reMain32, ok := reMainInter.(string); ok {
  222. reMain, _ = strconv.ParseInt(reMain32, 10, 64)
  223. }
  224. }
  225. if ruMina <= 0 {
  226. if reMainInter, ok := msgData["remain"]; ok {
  227. if reMain32, ok := reMainInter.(string); ok {
  228. reMain, _ = strconv.ParseInt(reMain32, 10, 64)
  229. }
  230. }
  231. }
  232. nccId := ""
  233. if nccIdInter, ok := msgData["nccid"]; ok {
  234. if nccIdStr, ok := nccIdInter.(string); ok {
  235. nccId = nccIdStr
  236. }
  237. }
  238. return &model.NeckRingOriginal{
  239. Version: int32(softVer),
  240. Uuid: uuid,
  241. CowId: cowId,
  242. FrameId: int32(frameId),
  243. Csq: int32(csq),
  244. Temp: int32(temp * 100),
  245. Imei: imei,
  246. Active: int32(active),
  247. Inactive: int32(inAction),
  248. Rumina: int32(ruMina),
  249. Intake: int32(intake),
  250. Gasp: int32(gasp),
  251. Other: int32(other),
  252. Remain: int32(reMain),
  253. Nccid: nccId,
  254. }
  255. }
  256. func (e *Entry) MsgDataFormat2(msg []byte) *model.NeckRingOriginal {
  257. neckLog := &Behavior{}
  258. if err := json.Unmarshal(msg, neckLog); err != nil {
  259. zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
  260. }
  261. if neckLog.Imei != "" {
  262. // 存储到数据库
  263. activeDate, hours := util.GetNeckRingActiveTimer(neckLog.FrameId)
  264. voltage, _ := strconv.ParseInt(strconv.FormatInt(int64(neckLog.BAT), 16), 10, 64)
  265. activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes
  266. if neckLog.FrameId%10 == 8 {
  267. activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours
  268. }
  269. return &model.NeckRingOriginal{
  270. Uuid: neckLog.UUID,
  271. Imei: neckLog.ECowId,
  272. ActiveDate: activeDate,
  273. Hours: int32(hours),
  274. FrameId: neckLog.FrameId,
  275. Rumina: neckLog.RuMina,
  276. Intake: neckLog.Intake,
  277. Inactive: neckLog.Inactive,
  278. Other: neckLog.Other,
  279. High: neckLog.Activitys,
  280. Active: neckLog.High,
  281. Voltage: int32(voltage),
  282. Version: neckLog.Sver,
  283. Remain: neckLog.Remain,
  284. ReceiveNumber: neckLog.Imei,
  285. ActiveDateType: activeDateTimeType,
  286. }
  287. }
  288. return nil
  289. }