package mqtt import ( "encoding/json" "kpt-pasture/model" "kpt-pasture/util" "strconv" "sync" pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "gitee.com/xuyiping_admin/pkg/xerr" "go.uber.org/zap" "gorm.io/gorm" ) type DataInsertNeckRingLog struct { NeckRingOriginalData []*model.NeckRingOriginal NeckRingErrorData []*model.NeckRingOriginal NeckRingUnRegisterData []*model.NeckRingOriginal Mx *sync.RWMutex } var FrameId = map[int32]int32{ 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 8: 8, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 18: 18, 21: 21, 22: 22, 23: 23, 24: 24, 25: 25, 26: 26, 28: 28, 31: 31, 32: 32, 33: 33, 34: 34, 35: 35, 36: 36, 38: 38, 41: 41, 42: 42, 43: 43, 44: 44, 45: 45, 46: 46, 48: 48, 51: 51, 52: 52, 53: 53, 54: 54, 55: 55, 56: 56, 58: 58, 61: 61, 62: 62, 63: 63, 64: 64, 65: 65, 66: 66, 68: 68, 71: 71, 72: 72, 73: 73, 74: 74, 75: 75, 76: 76, 78: 78, 81: 81, 82: 82, 83: 83, 84: 84, 85: 85, 86: 86, 88: 88, 91: 91, 92: 92, 93: 93, 94: 94, 95: 95, 96: 96, 98: 98, 101: 101, 102: 102, 103: 103, 104: 104, 105: 105, 106: 106, 108: 108, 111: 111, 112: 112, 113: 113, 114: 114, 115: 115, 116: 116, 118: 118, } var ( batchSize = 10 batchList = make([]*model.NeckRingOriginal, 0, batchSize) ) func (e *Entry) NeckRingHandle(data []byte) { DSMLog := &DataInsertNeckRingLog{ NeckRingOriginalData: make([]*model.NeckRingOriginal, 0), NeckRingErrorData: make([]*model.NeckRingOriginal, 0), NeckRingUnRegisterData: make([]*model.NeckRingOriginal, 0), Mx: &sync.RWMutex{}, } newData := e.MsgDataFormat(data) if newData != nil { batchList = append(batchList, newData) if len(batchList) >= batchSize { DSMLog.Mx.Lock() for _, batch := range batchList { // 异常脖环数据 if _, ok := FrameId[batch.FrameId]; !ok { DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, batch) continue } // 未佩戴的脖环数据 if ok := e.NeckRingIsBind(batch.Imei); !ok { DSMLog.NeckRingUnRegisterData = append(DSMLog.NeckRingUnRegisterData, batch) continue } // 正常脖环数据 DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, batch) } if err := e.CreatedData(DSMLog); err != nil { zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog)) } DSMLog.Mx.Unlock() DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0) DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0) DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0) batchList = batchList[:0] } } } func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error { if err := e.DB.Transaction(func(tx *gorm.DB) error { if len(DSMLog.NeckRingUnRegisterData) > 0 { if err := e.DB.Create(DSMLog.NeckRingErrorData).Error; err != nil { return xerr.WithStack(err) } } if len(DSMLog.NeckRingOriginalData) > 0 { if err := e.DB.Create(DSMLog.NeckRingOriginalData).Error; err != nil { return xerr.WithStack(err) } } if len(DSMLog.NeckRingUnRegisterData) > 0 { if err := e.DB.Create(DSMLog.NeckRingUnRegisterData).Error; err != nil { return xerr.WithStack(err) } } return nil }); err != nil { return xerr.WithStack(err) } return nil } func (e *Entry) MsgDataFormat(msg []byte) *model.NeckRingOriginal { neckLog := &Behavior{} if err := json.Unmarshal(msg, neckLog); err != nil { zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg))) } if neckLog.Imei != "" { // 存储到数据库 activeDate, hours := util.GetNeckRingActiveTimer(neckLog.FrameId) voltage, _ := strconv.ParseInt(strconv.FormatInt(int64(neckLog.BAT), 16), 10, 64) activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes if neckLog.FrameId%10 == 8 { activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours } return &model.NeckRingOriginal{ Uuid: neckLog.UUID, Imei: neckLog.ECowId, ActiveDate: activeDate, Hours: int32(hours), FrameId: neckLog.FrameId, Rumina: neckLog.RuMina, Intake: neckLog.Intake, Inactive: neckLog.Inactive, Other: neckLog.Other, High: neckLog.Activitys, Active: neckLog.High, Voltage: int32(voltage), Version: neckLog.Sver, Remain: neckLog.Remain, ReceiveNumber: neckLog.Imei, ActiveDateType: activeDateTimeType, } } return nil }