package mqtt import ( "context" "encoding/json" "kpt-pasture/model" "kpt-pasture/util" "os" "os/signal" "strconv" "sync" "syscall" "time" pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "gitee.com/xuyiping_admin/pkg/xerr" "go.uber.org/zap" "gorm.io/gorm" ) type DataInsertNeckRingLog struct { NeckRingOriginalData []*model.NeckRingOriginal NeckRingErrorData []*model.NeckRingOriginal NeckRingUnRegisterData []*model.NeckRingOriginal Mx *sync.RWMutex } var FrameId = map[int32]int32{ 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 8: 8, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 18: 18, 21: 21, 22: 22, 23: 23, 24: 24, 25: 25, 26: 26, 28: 28, 31: 31, 32: 32, 33: 33, 34: 34, 35: 35, 36: 36, 38: 38, 41: 41, 42: 42, 43: 43, 44: 44, 45: 45, 46: 46, 48: 48, 51: 51, 52: 52, 53: 53, 54: 54, 55: 55, 56: 56, 58: 58, 61: 61, 62: 62, 63: 63, 64: 64, 65: 65, 66: 66, 68: 68, 71: 71, 72: 72, 73: 73, 74: 74, 75: 75, 76: 76, 78: 78, 81: 81, 82: 82, 83: 83, 84: 84, 85: 85, 86: 86, 88: 88, 91: 91, 92: 92, 93: 93, 94: 94, 95: 95, 96: 96, 98: 98, 101: 101, 102: 102, 103: 103, 104: 104, 105: 105, 106: 106, 108: 108, 111: 111, 112: 112, 113: 113, 114: 114, 115: 115, 116: 116, 118: 118, } func (e *Entry) Subscribe(ctx context.Context, msg <-chan []byte) { DSMLog := &DataInsertNeckRingLog{ NeckRingOriginalData: make([]*model.NeckRingOriginal, 0), NeckRingErrorData: make([]*model.NeckRingOriginal, 0), NeckRingUnRegisterData: make([]*model.NeckRingOriginal, 0), Mx: &sync.RWMutex{}, } batchSize := 20 batchList := make([]*model.NeckRingOriginal, 0, batchSize) sc := make(chan os.Signal, 1) signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) // 设置5分钟超时 tc := time.After(5 * time.Minute) for { select { case data := <-msg: newData, err := e.MsgDataFormat(data) if err != nil { continue } batchList = append(batchList, newData...) if len(batchList) >= batchSize { DSMLog.Mx.Lock() for _, batch := range batchList { // 异常脖环数据 if _, ok := FrameId[batch.FrameId]; !ok { DSMLog.NeckRingErrorData = append(DSMLog.NeckRingErrorData, batch) continue } // 未佩戴的脖环数据 if ok := e.NeckRingIsBind(batch.Imei); !ok { DSMLog.NeckRingUnRegisterData = append(DSMLog.NeckRingUnRegisterData, batch) continue } // 正常脖环数据 DSMLog.NeckRingOriginalData = append(DSMLog.NeckRingOriginalData, batch) } if err = e.CreatedData(DSMLog); err != nil { zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog)) } DSMLog.Mx.Unlock() DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0) DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0) DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0) batchList = batchList[:0] } // 优雅退出 case <-sc: if err := e.CreatedData(DSMLog); err != nil { zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog)) } case <-tc: if err := e.CreatedData(DSMLog); err != nil { zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("dataList", DSMLog)) } zaplog.Info("subMsgChan-os", zap.Any("success", DSMLog.NeckRingOriginalData)) DSMLog.NeckRingErrorData = make([]*model.NeckRingOriginal, 0) DSMLog.NeckRingOriginalData = make([]*model.NeckRingOriginal, 0) DSMLog.NeckRingUnRegisterData = make([]*model.NeckRingOriginal, 0) } } } func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error { if err := e.DB.Transaction(func(tx *gorm.DB) error { if len(DSMLog.NeckRingUnRegisterData) > 0 { if err := e.DB.Create(DSMLog.NeckRingErrorData).Error; err != nil { return xerr.WithStack(err) } } if len(DSMLog.NeckRingOriginalData) > 0 { if err := e.DB.Create(DSMLog.NeckRingOriginalData).Error; err != nil { return xerr.WithStack(err) } } if len(DSMLog.NeckRingUnRegisterData) > 0 { if err := e.DB.Create(DSMLog.NeckRingUnRegisterData).Error; err != nil { return xerr.WithStack(err) } } return nil }); err != nil { return xerr.WithStack(err) } return nil } func (e *Entry) MsgDataFormat(msg []byte) ([]*model.NeckRingOriginal, error) { neckLog := &NeckRingWrapper{} if err := json.Unmarshal(msg, neckLog); err != nil { zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg))) } batchList := make([]*model.NeckRingOriginal, 0) if len(neckLog.NeckRing.NeckPck) > 0 { for _, v := range neckLog.NeckRing.NeckPck { // 存储到数据库 activeDate, hours := util.GetNeckRingActiveTimer(v.FrameId) voltage, _ := strconv.ParseInt(strconv.FormatInt(int64(v.BAT), 16), 10, 64) activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes if v.FrameId%10 == 8 { activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours } newData := &model.NeckRingOriginal{ Uuid: v.UUID, Imei: v.ECowId, ActiveDate: activeDate, Hours: int32(hours), FrameId: v.FrameId, Rumina: v.RuMina, Intake: v.Intake, Inactive: v.Inactive, Other: v.Other, High: v.Activitys, Active: v.High, Voltage: int32(voltage), Version: v.Sver, Remain: v.Remain, ReceiveNumber: v.Imei, ActiveDateType: activeDateTimeType, } batchList = append(batchList, newData) } } return batchList, nil }