123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- package mqtt
- import (
- "context"
- "encoding/json"
- "kpt-pasture/model"
- "kpt-pasture/util"
- "os"
- "os/signal"
- "strconv"
- "sync"
- "syscall"
- "time"
- pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"
- "gitee.com/xuyiping_admin/pkg/logger/zaplog"
- "gitee.com/xuyiping_admin/pkg/xerr"
- "go.uber.org/zap"
- "gorm.io/gorm"
- )
- type DataInsertNeckRingLog struct {
- NeckRingOriginalData []*model.NeckRingOriginal
- NeckRingErrorData []*model.NeckRingOriginal
- NeckRingUnRegisterData []*model.NeckRingOriginal
- Mx *sync.RWMutex
- }
- var FrameId = map[int32]int32{
- 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 8: 8,
- 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 18: 18,
- 21: 21, 22: 22, 23: 23, 24: 24, 25: 25, 26: 26, 28: 28,
- 31: 31, 32: 32, 33: 33, 34: 34, 35: 35, 36: 36, 38: 38,
- 41: 41, 42: 42, 43: 43, 44: 44, 45: 45, 46: 46, 48: 48,
- 51: 51, 52: 52, 53: 53, 54: 54, 55: 55, 56: 56, 58: 58,
- 61: 61, 62: 62, 63: 63, 64: 64, 65: 65, 66: 66, 68: 68,
- 71: 71, 72: 72, 73: 73, 74: 74, 75: 75, 76: 76, 78: 78,
- 81: 81, 82: 82, 83: 83, 84: 84, 85: 85, 86: 86, 88: 88,
- 91: 91, 92: 92, 93: 93, 94: 94, 95: 95, 96: 96, 98: 98,
- 101: 101, 102: 102, 103: 103, 104: 104, 105: 105, 106: 106, 108: 108,
- 111: 111, 112: 112, 113: 113, 114: 114, 115: 115, 116: 116, 118: 118,
- }
- func (e *Entry) Subscribe(ctx context.Context, msg <-chan []byte) {
- DSMLog := &DataInsertNeckRingLog{
- NeckRingOriginalData: make([]*model.NeckRingOriginal, 0),
- NeckRingErrorData: make([]*model.NeckRingOriginal, 0),
- NeckRingUnRegisterData: make([]*model.NeckRingOriginal, 0),
- Mx: &sync.RWMutex{},
- }
- batchSize := 20
- batchList := make([]*model.NeckRingOriginal, 0, batchSize)
- sc := make(chan os.Signal, 1)
- signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
- // 设置5分钟超时
- tc := time.After(5 * time.Minute)
- for {
- select {
- case data := <-msg:
- newData, err := e.MsgDataFormat(data)
- if err != nil {
- continue
- }
- batchList = append(batchList, newData...)
- if len(batchList) >= batchSize {
- DSMLog.Mx.Lock()
- for _, batch := range batchList {
- // 异常脖环数据
- if _, ok := FrameId[batch.FrameId]; !ok {
- DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, batch)
- continue
- }
- // 未佩戴的脖环数据
- if ok := e.NeckRingIsBind(batch.Imei); !ok {
- DSMLog.NeckRingUnRegisterData = append(DSMLog.NeckRingUnRegisterData, batch)
- continue
- }
- // 正常脖环数据
- DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, batch)
- }
- if err = e.CreatedData(DSMLog); err != nil {
- zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
- }
- DSMLog.Mx.Unlock()
- DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0)
- DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0)
- DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0)
- batchList = batchList[:0]
- }
- // 优雅退出
- case <-sc:
- if err := e.CreatedData(DSMLog); err != nil {
- zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
- }
- case <-tc:
- if err := e.CreatedData(DSMLog); err != nil {
- zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog))
- }
- zaplog.Info("subMsgChan-os", zap.Any("success", DSMLog.NeckRingOriginalData))
- DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0)
- DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0)
- DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0)
- }
- }
- }
- func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
- if err := e.DB.Transaction(func(tx *gorm.DB) error {
- if len(DSMLog.NeckRingUnRegisterData) > 0 {
- if err := e.DB.Create(DSMLog.NeckRingErrorData).Error; err != nil {
- return xerr.WithStack(err)
- }
- }
- if len(DSMLog.NeckRingOriginalData) > 0 {
- if err := e.DB.Create(DSMLog.NeckRingOriginalData).Error; err != nil {
- return xerr.WithStack(err)
- }
- }
- if len(DSMLog.NeckRingUnRegisterData) > 0 {
- if err := e.DB.Create(DSMLog.NeckRingUnRegisterData).Error; err != nil {
- return xerr.WithStack(err)
- }
- }
- return nil
- }); err != nil {
- return xerr.WithStack(err)
- }
- return nil
- }
- func (e *Entry) MsgDataFormat(msg []byte) ([]*model.NeckRingOriginal, error) {
- neckLog := &NeckRingWrapper{}
- if err := json.Unmarshal(msg, neckLog); err != nil {
- zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
- }
- batchList := make([]*model.NeckRingOriginal, 0)
- if len(neckLog.NeckRing.NeckPck) > 0 {
- for _, v := range neckLog.NeckRing.NeckPck {
- // 存储到数据库
- activeDate, hours := util.GetNeckRingActiveTimer(v.FrameId)
- voltage, _ := strconv.ParseInt(strconv.FormatInt(int64(v.BAT), 16), 10, 64)
- activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes
- if v.FrameId%10 == 8 {
- activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours
- }
- newData := &model.NeckRingOriginal{
- Uuid: v.UUID,
- Imei: v.ECowId,
- ActiveDate: activeDate,
- Hours: int32(hours),
- FrameId: v.FrameId,
- Rumina: v.RuMina,
- Intake: v.Intake,
- Inactive: v.Inactive,
- Other: v.Other,
- High: v.Activitys,
- Active: v.High,
- Voltage: int32(voltage),
- Version: v.Sver,
- Remain: v.Remain,
- ReceiveNumber: v.Imei,
- ActiveDateType: activeDateTimeType,
- }
- batchList = append(batchList, newData)
- }
- }
- return batchList, nil
- }
|