mqtt_handle.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. package mqtt
  2. import (
  3. "encoding/json"
  4. "kpt-pasture/model"
  5. "kpt-pasture/util"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/jinzhu/copier"
  11. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  12. "gitee.com/xuyiping_admin/pkg/xerr"
  13. "go.uber.org/zap"
  14. "gorm.io/gorm"
  15. )
  16. type DataInsertNeckRingLog struct {
  17. NeckRingOriginalData []*model.NeckRingOriginal
  18. NeckRingErrorData []*model.NeckRingError
  19. }
  20. var (
  21. DSMLog = &DataInsertNeckRingLog{
  22. NeckRingOriginalData: make([]*model.NeckRingOriginal, 0),
  23. NeckRingErrorData: make([]*model.NeckRingError, 0),
  24. }
  25. pastureMqttMap = make(map[string]int64)
  26. isFindPastureMqttMap bool
  27. mu sync.Mutex
  28. )
  29. func (e *Entry) NeckRingHandle(data []byte) {
  30. newData := e.MsgDataFormat2(data)
  31. if newData == nil {
  32. return
  33. }
  34. if len(newData.NeckRingErrorData) > 0 || len(newData.NeckRingOriginalData) > 0 {
  35. // 写入数据
  36. if err := e.CreatedData(newData); err != nil {
  37. zaplog.Error("Failed to create data", zap.Any("err", err), zap.Any("dataList", newData))
  38. }
  39. }
  40. return
  41. }
  42. func (e *Entry) FindPastureMqttMap() map[string]int64 {
  43. if isFindPastureMqttMap {
  44. return pastureMqttMap
  45. }
  46. appMqttList := make([]*model.NeckRing, 0)
  47. if err := e.DB.Model(new(model.AppMqtt)).Find(&appMqttList).Error; err != nil {
  48. zaplog.Error("FindPastureMqttMap", zap.Any("err", err))
  49. }
  50. for _, v := range appMqttList {
  51. pastureMqttMap[v.NeckRingNumber] = v.PastureId
  52. }
  53. isFindPastureMqttMap = true
  54. return pastureMqttMap
  55. }
  56. // 处理批量数据
  57. func (e *Entry) processBatch(batchList []*model.NeckRingOriginal) {
  58. // 初始化分类数据
  59. var (
  60. errorData []*model.NeckRingError
  61. originalData []*model.NeckRingOriginal
  62. )
  63. // 分类数据
  64. for _, batch := range batchList {
  65. // 异常脖环数据
  66. if ok := util.IsValidFrameId(batch.Frameid); !ok {
  67. var ed model.NeckRingError
  68. err := copier.Copy(&ed, &batch)
  69. if err != nil {
  70. zaplog.Error("processBatch", zap.Any("copier", err), zap.Any("data", batch))
  71. continue
  72. }
  73. errorData = append(errorData, &ed)
  74. } else {
  75. originalData = append(originalData, batch)
  76. }
  77. }
  78. // 更新日志
  79. DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, errorData...)
  80. DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, originalData...)
  81. // 写入数据
  82. if err := e.CreatedData(DSMLog); err != nil {
  83. zaplog.Error("Failed to create data", zap.Any("err", err), zap.Any("dataList", DSMLog))
  84. }
  85. // 清空日志
  86. DSMLog.NeckRingErrorData = DSMLog.NeckRingErrorData[:0]
  87. DSMLog.NeckRingOriginalData = DSMLog.NeckRingOriginalData[:0]
  88. }
  89. func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
  90. if err := e.DB.Transaction(func(tx *gorm.DB) error {
  91. if len(DSMLog.NeckRingErrorData) > 0 {
  92. if err := e.DB.Model(new(model.NeckRingError)).Create(DSMLog.NeckRingErrorData).Error; err != nil {
  93. return xerr.WithStack(err)
  94. }
  95. }
  96. if len(DSMLog.NeckRingOriginalData) > 0 {
  97. // 批量写入数据
  98. if len(DSMLog.NeckRingOriginalData) > 50 {
  99. if err := e.DB.Model(new(model.NeckRingOriginal)).Create(DSMLog.NeckRingOriginalData[:50]).Error; err != nil {
  100. return xerr.WithStack(err)
  101. }
  102. time.Sleep(100 * time.Millisecond)
  103. if err := e.DB.Model(new(model.NeckRingOriginal)).Create(DSMLog.NeckRingOriginalData[50:]).Error; err != nil {
  104. return xerr.WithStack(err)
  105. }
  106. } else {
  107. if err := e.DB.Model(new(model.NeckRingOriginal)).Create(DSMLog.NeckRingOriginalData).Error; err != nil {
  108. return xerr.WithStack(err)
  109. }
  110. }
  111. }
  112. return nil
  113. }); err != nil {
  114. return xerr.WithStack(err)
  115. }
  116. return nil
  117. }
  118. func (e *Entry) MsgDataFormat2(msg []byte) *DataInsertNeckRingLog {
  119. mu.Lock()
  120. defer mu.Unlock()
  121. neckLogList := &model.NeckRingWrapper{}
  122. if err := json.Unmarshal(msg, neckLogList); err != nil {
  123. zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
  124. return nil
  125. }
  126. if neckLogList.Type == "heartbeat" {
  127. return nil
  128. }
  129. normalOriginal := make([]*model.NeckRingOriginal, 0)
  130. errorOriginal := make([]*model.NeckRingError, 0)
  131. pastureMqttMap = e.FindPastureMqttMap()
  132. for _, neckLog := range neckLogList.NeckRing.NeckPck {
  133. newOriginal := model.NewNeckRingOriginal(neckLog)
  134. if ok := util.IsValidFrameId(neckLog.Frameid); !ok {
  135. var ed model.NeckRingError
  136. if err := copier.Copy(&ed, &newOriginal); err != nil {
  137. zaplog.Error("MsgDataFormat2", zap.Any("copier", err), zap.Any("neckLog", neckLog))
  138. continue
  139. }
  140. errorOriginal = append(errorOriginal, &ed)
  141. } else {
  142. activeDate, hours := util.GetNeckRingActiveTimer(neckLog.Frameid)
  143. newOriginal.ActiveDate = activeDate
  144. newOriginal.Hours = int32(hours)
  145. normalOriginal = append(normalOriginal, newOriginal)
  146. }
  147. }
  148. return &DataInsertNeckRingLog{
  149. NeckRingErrorData: errorOriginal,
  150. NeckRingOriginalData: normalOriginal,
  151. }
  152. }
  153. func (e *Entry) MsgDataFormat(msg []byte) []*model.NeckRingOriginal {
  154. msgData := make(map[string]interface{})
  155. pairs := strings.Split(util.MsgFormat(string(msg)), " ")
  156. for _, pair := range pairs {
  157. parts := strings.SplitN(pair, ":", 2)
  158. if len(parts) != 2 {
  159. continue
  160. }
  161. key, value := parts[0], parts[1]
  162. if len(key) == 0 {
  163. continue
  164. }
  165. msgData[key] = value
  166. }
  167. softVer := int64(0)
  168. if softVerInter, ok := msgData["SOFT_VER"]; ok {
  169. if softVerstr, ok := softVerInter.(string); ok {
  170. softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
  171. }
  172. }
  173. if softVer <= 0 {
  174. if softVerInter, ok := msgData["soft_ver"]; ok {
  175. if softVerstr, ok := softVerInter.(string); ok {
  176. softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
  177. }
  178. }
  179. }
  180. uuid := ""
  181. if uuidInter, ok := msgData["uuid"]; ok {
  182. if uuidStr, ok := uuidInter.(string); ok {
  183. uuid = uuidStr
  184. }
  185. }
  186. frameId := int64(0)
  187. if frameIdInter, ok := msgData["frameid"]; ok {
  188. if frameId64, ok := frameIdInter.(string); ok {
  189. frameId, _ = strconv.ParseInt(frameId64, 10, 64)
  190. }
  191. }
  192. temp := float64(0)
  193. if tempInter, ok := msgData["Temp"]; ok {
  194. if tempFloat, ok := tempInter.(string); ok {
  195. temp, _ = strconv.ParseFloat(tempFloat, 64)
  196. }
  197. }
  198. if temp <= 0 {
  199. if tempInter, ok := msgData["temp"]; ok {
  200. if tempFloat, ok := tempInter.(string); ok {
  201. temp, _ = strconv.ParseFloat(tempFloat, 64)
  202. }
  203. }
  204. }
  205. imei := ""
  206. if imeiInter, ok := msgData["imei"]; ok {
  207. if imeiStr, ok := imeiInter.(string); ok {
  208. imei = imeiStr
  209. }
  210. }
  211. active := int64(0)
  212. if activeInter, ok := msgData["active"]; ok {
  213. if active32, ok := activeInter.(string); ok {
  214. active, _ = strconv.ParseInt(active32, 10, 64)
  215. }
  216. }
  217. inAction := int64(0)
  218. if inActionInter, ok := msgData["inactive"]; ok {
  219. if inAction32, ok := inActionInter.(string); ok {
  220. inAction, _ = strconv.ParseInt(inAction32, 10, 64)
  221. }
  222. }
  223. ruMina := int64(0)
  224. if ruMinaInter, ok := msgData["Rumina"]; ok {
  225. if ruMina32, ok := ruMinaInter.(string); ok {
  226. ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
  227. }
  228. }
  229. if ruMina <= 0 {
  230. if ruMinaInter, ok := msgData["rumina"]; ok {
  231. if ruMina32, ok := ruMinaInter.(string); ok {
  232. ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
  233. }
  234. }
  235. }
  236. intake := int64(0)
  237. if intakeInter, ok := msgData["Intake"]; ok {
  238. if intake32, ok := intakeInter.(string); ok {
  239. intake, _ = strconv.ParseInt(intake32, 10, 64)
  240. }
  241. }
  242. if intake <= 0 {
  243. if intakeInter, ok := msgData["intake"]; ok {
  244. if intake32, ok := intakeInter.(string); ok {
  245. intake, _ = strconv.ParseInt(intake32, 10, 64)
  246. }
  247. }
  248. }
  249. gasp := int64(0)
  250. if gaspInter, ok := msgData["gasp"]; ok {
  251. if gasp32, ok := gaspInter.(string); ok {
  252. gasp, _ = strconv.ParseInt(gasp32, 10, 64)
  253. }
  254. }
  255. reMain := int64(0)
  256. if reMainInter, ok := msgData["Remain"]; ok {
  257. if reMain32, ok := reMainInter.(string); ok {
  258. reMain, _ = strconv.ParseInt(reMain32, 10, 64)
  259. }
  260. }
  261. if ruMina <= 0 {
  262. if reMainInter, ok := msgData["remain"]; ok {
  263. if reMain32, ok := reMainInter.(string); ok {
  264. reMain, _ = strconv.ParseInt(reMain32, 10, 64)
  265. }
  266. }
  267. }
  268. /*cowId := ""
  269. if cowIdInter, ok := msgData["cowid"]; ok {
  270. if cowIdStr, ok := cowIdInter.(string); ok {
  271. cowId = cowIdStr
  272. }
  273. }
  274. csq := int64(0)
  275. if csqInter, ok := msgData["csq"]; ok {
  276. if csq32, ok := csqInter.(string); ok {
  277. csq, _ = strconv.ParseInt(csq32, 10, 64)
  278. }
  279. }
  280. other := int64(0)
  281. if otherInter, ok := msgData["other"]; ok {
  282. if other32, ok := otherInter.(string); ok {
  283. other, _ = strconv.ParseInt(other32, 10, 64)
  284. }
  285. }
  286. nccId := ""
  287. if nccIdInter, ok := msgData["nccid"]; ok {
  288. if nccIdStr, ok := nccIdInter.(string); ok {
  289. nccId = nccIdStr
  290. }
  291. }
  292. */
  293. return []*model.NeckRingOriginal{
  294. {
  295. FirmwareVersion: int32(softVer),
  296. Uuid: uuid,
  297. Frameid: int32(frameId),
  298. ReceiveNumber: imei,
  299. Active: int32(active),
  300. Inactive: int32(inAction),
  301. Rumina: int32(ruMina),
  302. Intake: int32(intake),
  303. Gasp: int32(gasp),
  304. Remain: int32(reMain),
  305. },
  306. }
  307. }