Yi 2 mesiacov pred
rodič
commit
886529b609

+ 3 - 3
config/app.develop.yaml

@@ -10,9 +10,9 @@ neck_ring_limit: 100
 store:
   show_sql: true
   driver_name: mysql
-  kpt_rw: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"
-  kpt_migr: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"
-
+  kpt_rw: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=600s&readTimeout=600s&writeTimeout=600s"
+  kpt_migr: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=600s&readTimeout=600s&writeTimeout=600s"
+  kpt_mqtt: "kpt_mqtt:kpt_mqtt!1234@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=600s&readTimeout=600s&writeTimeout=600s"
 redis_setting:
   cache_redis:
     addr: '47.92.95.119:6389'

+ 1 - 0
config/app.go

@@ -85,6 +85,7 @@ type StoreSetting struct {
 	ShowSQL    bool   `yaml:"show_sql" json:"show_sql"`
 	KptRW      string `yaml:"kpt_rw" json:"kpt_rw"`
 	KptMigr    string `yaml:"kpt_migr" json:"kpt_migr"`
+	KptMqtt    string `yaml:"kpt_mqtt" json:"kpt_mqtt"`
 }
 
 type RedisSetting struct {

+ 1 - 0
config/app.test.yaml

@@ -11,6 +11,7 @@ store:
   driver_name: mysql
   kpt_rw: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"
   kpt_migr: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"
+  kpt_mqtt: "kpt_mqtt:kpt_mqtt!1234@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=600s&readTimeout=600s&writeTimeout=600s"
 
 redis_setting:
   cache_redis:

+ 2 - 0
dep/dep.go

@@ -11,6 +11,7 @@ import (
 	"kpt-pasture/service/sso"
 	"kpt-pasture/service/wechat"
 	"kpt-pasture/store/kptstore"
+	"kpt-pasture/store/mqttstore"
 
 	"gitee.com/xuyiping_admin/pkg/di"
 )
@@ -38,5 +39,6 @@ func Options() []di.HubOption {
 		redis.Module,
 		crontab.Module,
 		moduleMqtt.Module,
+		mqttstore.Module,
 	}
 }

+ 1 - 1
dep/di_crontab.go

@@ -92,7 +92,7 @@ func EntryCrontab(dependency CrontabDependency) *cron.Crontab {
 		panic(err)
 	}
 
-	err = newCrontab.Bind("NeckRingOriginalMergeData", cs.NeckRingMerge, dependency.CrontabHub.NeckRingOriginalMergeData)
+	err = newCrontab.Bind("NeckRingMerge", cs.NeckRingMerge, dependency.CrontabHub.NeckRingOriginalMergeData)
 	if err != nil {
 		zaplog.Error("EntryCrontab", zap.Any("NeckRingOriginalMergeData", err))
 		panic(err)

+ 1 - 1
http/handler/cow/cow.go

@@ -81,5 +81,5 @@ func BehaviorCurve(c *gin.Context) {
 		apierr.ClassifiedAbort(c, err)
 		return
 	}
-	ginutil.JSONResp(c, res)
+	c.JSON(http.StatusOK, res)
 }

+ 7 - 6
model/cow.go

@@ -484,10 +484,11 @@ type CowBehaviorCurveResponse struct {
 }
 
 type CowBehaviorCurveData struct {
-	OriginalDataList []int32             `json:"originalDataList"` // 原始行为数据
-	ChangeDataList   []int32             `json:"changeDataList"`   // 变化数据
-	SumDataList      []int32             `json:"sumDataList"`      // 累计24小时数据
-	DataTimeList     []string            `json:"dataTimeList"`     // 时间数据
-	EstrusList       []string            `json:"estrusList"`       // 发情预警
-	EventList        map[string][]string `json:"eventList"`        // 事件数据
+	OriginalDataList []int32                                 `json:"originalDataList"` // 原始行为数据
+	ChangeDataList   []int32                                 `json:"changeDataList"`   // 变化数据
+	SumDataList      []int32                                 `json:"sumDataList"`      // 累计24小时数据
+	DataTimeList     []string                                `json:"dataTimeList"`     // 时间数据
+	EstrusList       map[pasturePb.EstrusLevel_Kind][]string `json:"estrusList"`       // 发情预警
+	EventList        map[string][]string                     `json:"eventList"`        // 事件数据
+	EventMap         map[pasturePb.EventType_Kind]string     `json:"eventMap"`         // 所有事件
 }

+ 2 - 1
model/neck_active_habit.go

@@ -106,8 +106,9 @@ func (n NeckActiveHabitSlice) ToPB(curveName string) *CowBehaviorCurveData {
 		ChangeDataList:   make([]int32, 0),
 		SumDataList:      make([]int32, 0),
 		DataTimeList:     make([]string, 0),
-		EstrusList:       make([]string, 0),
+		EstrusList:       make(map[pasturePb.EstrusLevel_Kind][]string, 0),
 		EventList:        make(map[string][]string),
+		EventMap:         make(map[pasturePb.EventType_Kind]string),
 	}
 
 	for _, v := range n {

+ 2 - 7
model/neck_ring_original.go

@@ -40,19 +40,14 @@ func (n *NeckRingOriginal) TableName() string {
 	return "neck_ring_original"
 }
 
-func NewNeckRingOriginal(neckLog *Behavior, pastureMap map[string]int64) *NeckRingOriginal {
+func NewNeckRingOriginal(neckLog *Behavior) *NeckRingOriginal {
 	activeDateTimeType := pasturePb.ActiveTimeType_Twenty_Minutes
-	pastureId := int64(0)
 	if neckLog.Frameid%10 == 8 {
 		activeDateTimeType = pasturePb.ActiveTimeType_Two_Hours
 	}
-
-	if pasture, ok := pastureMap[neckLog.Imei]; ok {
-		pastureId = pasture
-	}
 	return &NeckRingOriginal{
 		Uuid:            neckLog.UUID,
-		PastureId:       pastureId,
+		PastureId:       1,
 		NeckRingNumber:  fmt.Sprintf("%d", neckLog.Ecowid),
 		Frameid:         neckLog.Frameid,
 		Rumina:          neckLog.Rumina,

+ 1 - 0
model/system_role.go

@@ -25,6 +25,7 @@ const (
 	LayoutDate  = "20060102"
 	LayoutDate2 = "2006-01-02"
 	LayoutMonth = "2006-01"
+	LayoutHour  = "2006-01-02 15"
 )
 
 type SystemRoleSlice []*SystemRole

+ 16 - 2
module/backend/cow.go

@@ -2,6 +2,7 @@ package backend
 
 import (
 	"context"
+	"fmt"
 	"kpt-pasture/model"
 	"kpt-pasture/util"
 	"net/http"
@@ -187,9 +188,14 @@ func (s *StoreEntry) BehaviorCurve(ctx context.Context, req *pasturePb.CowBehavi
 	}
 
 	data := model.NeckActiveHabitSlice(neckActiveHabitList).ToPB(req.CurveName)
+	data.EventMap = s.EventTypeMap()
+
 	for _, v := range eventLogList {
 		eventAt := time.Unix(v.EventAt, 0)
-		data.EventList[v.EventTypeName] = append(data.EventList[v.EventTypeName], eventAt.Format(model.LayoutDate2))
+		if data.EventList[v.EventTypeName] == nil {
+			data.EventList[v.EventTypeName] = make([]string, 0)
+		}
+		data.EventList[v.EventTypeName] = append(data.EventList[v.EventTypeName], fmt.Sprintf("%s 09", eventAt.Format(model.LayoutDate2)))
 	}
 
 	// 发情数据
@@ -205,7 +211,15 @@ func (s *StoreEntry) BehaviorCurve(ctx context.Context, req *pasturePb.CowBehavi
 	}
 
 	for _, v := range estrusList {
-		data.EstrusList = append(data.EstrusList, v.EstrusStartDate)
+		if data.EstrusList[v.Level] == nil {
+			data.EstrusList[v.Level] = make([]string, 0)
+		}
+		if len(v.EstrusStartDate) > 0 {
+			parsedTime, _ := time.Parse(model.LayoutTime, v.EstrusStartDate)
+			// 格式化为到小时的字符串
+			hourStr := parsedTime.Format(model.LayoutHour)
+			data.EstrusList[v.Level] = append(data.EstrusList[v.Level], hourStr)
+		}
 	}
 
 	return &model.CowBehaviorCurveResponse{

+ 12 - 7
module/crontab/neck_ring_estrus.go

@@ -38,14 +38,19 @@ func (e *Entry) UpdateCowEstrus() (err error) {
 }
 
 func (e *Entry) EntryCowEstrus(pastureId int64) (err error) {
-	activeLow, _ := e.GetSystemConfigure(pastureId, model.ActiveLow)
-	activeMiddle, _ := e.GetSystemConfigure(pastureId, model.ActiveMiddle)
-	activeHigh, _ := e.GetSystemConfigure(pastureId, model.ActiveHigh)
-	xToday := &XToday{
-		ActiveLow:    int32(activeLow.Value),
-		ActiveMiddle: int32(activeMiddle.Value),
-		ActiveHigh:   int32(activeHigh.Value),
+	xToday := &XToday{}
+	systemConfigureList, err := e.GetSystemConfigure(pastureId)
+	for _, v := range systemConfigureList {
+		switch v.Name {
+		case model.ActiveLow:
+			xToday.ActiveLow = int32(v.Value)
+		case model.ActiveMiddle:
+			xToday.ActiveMiddle = int32(v.Value)
+		case model.ActiveHigh:
+			xToday.ActiveHigh = int32(v.Value)
+		}
 	}
+
 	if err = e.CowEstrusWarning(pastureId, xToday); err != nil {
 		zaplog.Error("EntryCowEstrus", zap.Any("CowEstrusWarning", err), zap.Any("xToday", xToday))
 	}

+ 63 - 60
module/crontab/neck_ring_handle.go

@@ -29,19 +29,22 @@ var (
 
 // NeckRingOriginalMergeData 把脖环数据合并成2个小时的
 func (e *Entry) NeckRingOriginalMergeData() error {
-	var err error
+	var (
+		err error
+	)
 	limit := e.Cfg.NeckRingLimit
 	if limit <= 0 {
 		limit = defaultLimit
 	}
 	newTime := time.Now()
-	createdAt := newTime.Add(-1 * time.Hour)
+	//createdAt := newTime.Add(-1 * time.Hour)
 
 	neckRingList := make([]*model.NeckRingOriginal, 0)
 	if err = e.DB.Model(new(model.NeckRingOriginal)).
-		Where("is_show <= ?", pasturePb.IsShow_No).
-		Where("created_at <= ?", createdAt.Unix()).
-		Order("neck_ring_number,active_date,frameid").
+		Where("is_show = ?", pasturePb.IsShow_No).
+		Where("neck_ring_number IN (?)", "10027").
+		//Where("created_at <= ?", createdAt.Unix()).
+		Order("active_date,neck_ring_number,frameid").
 		Limit(int(limit)).Find(&neckRingList).Error; err != nil {
 		return xerr.WithStack(err)
 	}
@@ -55,6 +58,9 @@ func (e *Entry) NeckRingOriginalMergeData() error {
 			e.DB.Model(new(model.NeckRingOriginal)).
 				Where("created_at < ?", newTime.AddDate(0, 0, -15).Unix()).
 				Delete(new(model.NeckRingOriginal))
+			e.DB.Model(new(model.NeckRingProcess)).
+				Where("created_at < ?", newTime.AddDate(0, 0, -15).Unix()).
+				Delete(new(model.NeckRingProcess))
 			// 活动数据删除6个月前的数据
 			e.DB.Model(new(model.NeckActiveHabit)).
 				Where("created_at < ?", newTime.AddDate(0, -6, 0).Unix()).
@@ -68,25 +74,31 @@ func (e *Entry) NeckRingOriginalMergeData() error {
 		return nil
 	}
 
-	// 批量插入和更新
-	var newNeckRingProcessList []*model.NeckRingProcess
-	for _, neckActiveHabit := range neckActiveHabitList {
+	for _, habit := range neckActiveHabitList {
 		//更新脖环牛只相关信息 新数据直接插入
-		historyNeckActiveHabit, ct := e.IsExistNeckActiveHabit(neckActiveHabit.NeckRingNumber, neckActiveHabit.HeatDate, neckActiveHabit.Frameid)
+		historyNeckActiveHabit, ct := e.IsExistNeckActiveHabit(habit.NeckRingNumber, habit.HeatDate, habit.Frameid)
 		if ct <= 0 {
-			if err = e.DB.Create(neckActiveHabit).Error; err != nil {
-				zaplog.Info("NeckRingOriginalMergeData-1", zap.Any("err", err), zap.Any("neckActiveHabit", neckActiveHabit))
+			if err = e.DB.Create(habit).Error; err != nil {
+				zaplog.Info("NeckRingOriginalMergeData-1",
+					zap.Any("err", err),
+					zap.Any("neckActiveHabit", habit),
+				)
 			}
-			newNeckRingProcess := model.NewNeckRingProcess(neckActiveHabit)
-			newNeckRingProcessList = append(newNeckRingProcessList, newNeckRingProcess)
-			if err = e.UpdateNeckRingOriginalIsShow(neckActiveHabit); err != nil {
-				zaplog.Error("NeckRingOriginalMergeData-3", zap.Any("err", err), zap.Any("neckActiveHabit", neckActiveHabit))
+			if err = e.UpdateNeckRingOriginalIsShow(habit); err != nil {
+				zaplog.Error("NeckRingOriginalMergeData-2",
+					zap.Any("err", err),
+					zap.Any("neckActiveHabit", habit),
+				)
 			}
 			continue
 		}
 
 		if historyNeckActiveHabit == nil || historyNeckActiveHabit.Id <= 0 {
-			zaplog.Error("NeckRingOriginalMergeData-4", zap.Any("historyNeckActiveHabit", historyNeckActiveHabit), zap.Any("ct", ct), zap.Any("neckActiveHabit", neckActiveHabit))
+			zaplog.Error("NeckRingOriginalMergeData-3",
+				zap.Any("ct", ct),
+				zap.Any("historyNeckActiveHabit", historyNeckActiveHabit),
+				zap.Any("neckActiveHabit", habit),
+			)
 			continue
 		}
 
@@ -97,16 +109,15 @@ func (e *Entry) NeckRingOriginalMergeData() error {
 				Select("rumina", "intake", "inactive", "gasp", "other", "high", "active").
 				Where("id = ?", historyNeckActiveHabit.Id).
 				Updates(newNeckActiveHabit).Error; err != nil {
-				zaplog.Error("NeckRingOriginalMergeData-5", zap.Any("historyNeckActiveHabit", historyNeckActiveHabit), zap.Any("ct", ct), zap.Any("newNeckActiveHabit", newNeckActiveHabit))
+				zaplog.Error("NeckRingOriginalMergeData-5",
+					zap.Any("err", err),
+					zap.Any("ct", ct),
+					zap.Any("historyNeckActiveHabit", historyNeckActiveHabit),
+					zap.Any("newNeckActiveHabit", newNeckActiveHabit),
+				)
 			}
 		}
 	}
-
-	if len(newNeckRingProcessList) > 0 {
-		if err = e.DB.Create(newNeckRingProcessList).Error; err != nil {
-			zaplog.Error("newNeckRingProcessList", zap.Any("err", err))
-		}
-	}
 	return nil
 }
 
@@ -142,7 +153,7 @@ func (e *Entry) recalculate(neckRingList []*model.NeckRingOriginal) []*model.Nec
 	// 算平均值
 	for k, v := range originalMapData {
 		// 过滤掉合并后不满6条数据
-		if v.RecordCount != 6 {
+		if v.RecordCount < 6 {
 			currMaxXframeId := util.FrameIdMapReverse[int32(currTime.Hour())]
 			activeDateString := fmt.Sprintf("%s %02d:00:00", v.ActiveDate, v.XframeId*2+1)
 			activeDate, _ := time.Parse(model.LayoutTime, activeDateString)
@@ -150,6 +161,10 @@ func (e *Entry) recalculate(neckRingList []*model.NeckRingOriginal) []*model.Nec
 				delete(originalMapData, k)
 			}
 		}
+		if v.RecordCount > 6 {
+			zaplog.Error("recalculate", zap.Any("k", k), zap.Any("v", v))
+			delete(originalMapData, k)
+		}
 		v.SumAvg()
 	}
 	return model.NeckRingOriginalMap(originalMapData).ForMatData()
@@ -174,24 +189,43 @@ func (e *Entry) againRecalculate(data *model.NeckActiveHabit) *model.NeckActiveH
 }
 
 func (e *Entry) EntryUpdateActiveHabit(pastureId int64) (err error) {
-	lastMaxHabitData, err := e.GetSystemConfigure(pastureId, model.MaxHabit)
+	// 获取这段执行数据内最大日期和最小日期
+	xToday := &XToday{}
+	systemConfigureList, err := e.GetSystemConfigure(pastureId)
 	if err != nil {
 		return xerr.WithStack(err)
 	}
+	for _, v := range systemConfigureList {
+		switch v.Name {
+		case model.MaxHabit:
+			xToday.LastMaxHabitId = v.Value
+		case model.High:
+			xToday.High = int32(v.Value)
+		case model.Rumina:
+			xToday.Rumina = int32(v.Value)
+		case model.XRuminaDisc:
+			xToday.XRuminaDisc = int32(v.Value)
+		case model.XChangeDiscount:
+			xToday.XChangeDiscount = int32(v.Value)
+		case model.WeeklyActive:
+			xToday.WeeklyActive = int32(v.Value)
+		}
+	}
 
-	lastMaxHabitId := lastMaxHabitData.Value
 	currMaxHabit := &model.NeckActiveHabit{}
 	if err = e.DB.Model(new(model.NeckActiveHabit)).
-		Where("id > ?", lastMaxHabitId).
+		Where("id > ?", xToday.LastMaxHabitId).
 		Where("pasture_id = ?", pastureId).
 		Where("is_show = ?", pasturePb.IsShow_No).
 		Order("id desc").First(currMaxHabit).Error; err != nil {
 		return xerr.WithStack(err)
 	}
 
-	if currMaxHabit.Id <= 0 || currMaxHabit.Id <= lastMaxHabitId {
+	if currMaxHabit.Id <= 0 || currMaxHabit.Id <= xToday.LastMaxHabitId {
 		return nil
 	}
+
+	xToday.CurrMaxHabitId = currMaxHabit.Id
 	defer func() {
 		// 更新最后一次执行的id值
 		if err == nil {
@@ -201,37 +235,6 @@ func (e *Entry) EntryUpdateActiveHabit(pastureId int64) (err error) {
 				Update("value", currMaxHabit.Id)
 		}
 	}()
-	// 获取这段执行数据内最大日期和最小日期
-	xToday := &XToday{}
-	activeLowest, err := e.GetSystemConfigure(pastureId, model.High)
-	if err != nil {
-		return xerr.WithStack(err)
-	}
-	ruminaLowest, err := e.GetSystemConfigure(pastureId, model.Rumina)
-	if err != nil {
-		return xerr.WithStack(err)
-	}
-	xRuminaDisc, err := e.GetSystemConfigure(pastureId, model.XRuminaDisc)
-	if err != nil {
-		return err
-	}
-	xChangeDiscount, err := e.GetSystemConfigure(pastureId, model.XChangeDiscount)
-	if err != nil {
-		return err
-	}
-
-	weeklyActive, err := e.GetSystemConfigure(pastureId, model.WeeklyActive)
-	if err != nil {
-		return xerr.WithStack(err)
-	}
-
-	xToday.High = int32(activeLowest.Value)
-	xToday.Rumina = int32(ruminaLowest.Value)
-	xToday.XRuminaDisc = int32(xRuminaDisc.Value)
-	xToday.XChangeDiscount = int32(xChangeDiscount.Value)
-	xToday.CurrMaxHabitId = currMaxHabit.Id
-	xToday.LastMaxHabitId = lastMaxHabitId
-	xToday.WeeklyActive = int32(weeklyActive.Value)
 
 	// 更新活动滤波
 	if err = e.FirstFilterUpdate(pastureId, xToday); err != nil {
@@ -290,7 +293,7 @@ func (e *Entry) FirstFilterUpdate(pastureId int64, xToDay *XToday) (err error) {
 		Where("is_show = ?", pasturePb.IsShow_No).
 		Where("change_filter = ?", model.InitChangeFilter).
 		Where(e.DB.Where("high >= ?", xToDay.High).Or("rumina >= ?", xToDay.Rumina)).
-		Order("neck_ring_number,heat_date,frameid").
+		Order("heat_date,neck_ring_number,frameid").
 		Limit(int(defaultLimit)).Find(&newNeckActiveHabitList).Error; err != nil {
 		return xerr.WithStack(err)
 	}

+ 8 - 35
module/crontab/sql.go

@@ -103,51 +103,24 @@ func (e *Entry) IsExistEventEstrus(pastureId, cowId int64) *model.EventEstrus {
 
 func (e *Entry) IsExistNeckActiveHabit(neckRingNumber, heatDate string, frameId int32) (*model.NeckActiveHabit, int64) {
 	count := int64(0)
-	neckRingProcess := &model.NeckRingProcess{}
-	if err := e.DB.Model(new(model.NeckRingProcess)).
+	neckActiveHabit := &model.NeckActiveHabit{}
+	if err := e.DB.Model(new(model.NeckActiveHabit)).
 		Where("neck_ring_number = ?", neckRingNumber).
-		Where("active_date = ?", heatDate).
+		Where("heat_date = ?", heatDate).
 		Where("frameid = ?", frameId).
 		Count(&count).
-		First(neckRingProcess).Error; err != nil {
+		First(neckActiveHabit).Error; err != nil {
 		return nil, 0
 	}
-
-	res := &model.NeckActiveHabit{}
-	if neckRingProcess != nil {
-		if neckRingProcess.HabitId > 0 {
-			if err := e.DB.Model(new(model.NeckActiveHabit)).
-				Where("id = ?", neckRingProcess.HabitId).
-				First(res).Error; err != nil {
-				return nil, 0
-			}
-		} else {
-			if err := e.DB.Model(new(model.NeckActiveHabit)).
-				Where("heat_date = ?", heatDate).
-				Where("neck_ring_number = ?", neckRingNumber).
-				Where("frameid = ?", frameId).
-				First(res).Error; err != nil {
-				return nil, 0
-			}
-
-			if err := e.DB.Model(new(model.NeckRingProcess)).
-				Where("id = ?", neckRingProcess.Id).
-				Where("frameid = ?", frameId).
-				Update("habit_id", res.Id).Error; err != nil {
-				return nil, 0
-			}
-		}
-	}
-	return res, count
+	return neckActiveHabit, count
 }
 
-func (e *Entry) GetSystemConfigure(pastureId int64, name string) (*model.SystemConfigure, error) {
-	res := &model.SystemConfigure{}
+func (e *Entry) GetSystemConfigure(pastureId int64) ([]*model.SystemConfigure, error) {
+	res := make([]*model.SystemConfigure, 0)
 	if err := e.DB.Model(new(model.SystemConfigure)).
-		Where("name = ?", name).
 		Where("pasture_id = ?", pastureId).
 		Where("is_show = ?", pasturePb.IsShow_Ok).
-		First(res).Error; err != nil {
+		Find(&res).Error; err != nil {
 		return nil, xerr.WithStack(err)
 	}
 	return res, nil

+ 2 - 2
module/mqtt/interface.go

@@ -2,7 +2,7 @@ package mqtt
 
 import (
 	"kpt-pasture/config"
-	"kpt-pasture/store/kptstore"
+	"kpt-pasture/store/mqttstore"
 
 	"gitee.com/xuyiping_admin/pkg/di"
 	"go.uber.org/dig"
@@ -17,7 +17,7 @@ func NewMqtt(entry Entry) DataHandle {
 type Entry struct {
 	dig.In
 	Cfg *config.AppConfig
-	DB  *kptstore.DB
+	DB  *mqttstore.DB
 }
 
 type DataHandle interface {

+ 21 - 5
module/mqtt/mqtt_handle.go

@@ -6,6 +6,8 @@ import (
 	"kpt-pasture/util"
 	"strconv"
 	"strings"
+	"sync"
+	"time"
 
 	"github.com/jinzhu/copier"
 
@@ -27,6 +29,7 @@ var (
 	}
 	pastureMqttMap       = make(map[string]int64)
 	isFindPastureMqttMap bool
+	mu                   sync.Mutex
 )
 
 func (e *Entry) NeckRingHandle(data []byte) {
@@ -47,12 +50,12 @@ func (e *Entry) FindPastureMqttMap() map[string]int64 {
 	if isFindPastureMqttMap {
 		return pastureMqttMap
 	}
-	appMqttList := make([]*model.AppMqtt, 0)
+	appMqttList := make([]*model.NeckRing, 0)
 	if err := e.DB.Model(new(model.AppMqtt)).Find(&appMqttList).Error; err != nil {
 		zaplog.Error("FindPastureMqttMap", zap.Any("err", err))
 	}
 	for _, v := range appMqttList {
-		pastureMqttMap[v.ReceiveNumber] = v.PastureId
+		pastureMqttMap[v.NeckRingNumber] = v.PastureId
 	}
 	isFindPastureMqttMap = true
 	return pastureMqttMap
@@ -105,8 +108,19 @@ func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
 		}
 
 		if len(DSMLog.NeckRingOriginalData) > 0 {
-			if err := e.DB.Model(new(model.NeckRingOriginal)).Create(DSMLog.NeckRingOriginalData).Error; err != nil {
-				return xerr.WithStack(err)
+			// 批量写入数据
+			if len(DSMLog.NeckRingOriginalData) > 50 {
+				if err := e.DB.Model(new(model.NeckRingOriginal)).Create(DSMLog.NeckRingOriginalData[:50]).Error; err != nil {
+					return xerr.WithStack(err)
+				}
+				time.Sleep(100 * time.Millisecond)
+				if err := e.DB.Model(new(model.NeckRingOriginal)).Create(DSMLog.NeckRingOriginalData[50:]).Error; err != nil {
+					return xerr.WithStack(err)
+				}
+			} else {
+				if err := e.DB.Model(new(model.NeckRingOriginal)).Create(DSMLog.NeckRingOriginalData).Error; err != nil {
+					return xerr.WithStack(err)
+				}
 			}
 		}
 		return nil
@@ -117,6 +131,8 @@ func (e *Entry) CreatedData(DSMLog *DataInsertNeckRingLog) error {
 }
 
 func (e *Entry) MsgDataFormat2(msg []byte) *DataInsertNeckRingLog {
+	mu.Lock()
+	defer mu.Unlock()
 	neckLogList := &model.NeckRingWrapper{}
 	if err := json.Unmarshal(msg, neckLogList); err != nil {
 		zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
@@ -131,7 +147,7 @@ func (e *Entry) MsgDataFormat2(msg []byte) *DataInsertNeckRingLog {
 	pastureMqttMap = e.FindPastureMqttMap()
 
 	for _, neckLog := range neckLogList.NeckRing.NeckPck {
-		newOriginal := model.NewNeckRingOriginal(neckLog, pastureMqttMap)
+		newOriginal := model.NewNeckRingOriginal(neckLog)
 		if ok := util.IsValidFrameId(neckLog.Frameid); !ok {
 			var ed model.NeckRingError
 			if err := copier.Copy(&ed, &newOriginal); err != nil {

+ 3 - 3
store/kptstore/rw_store.go

@@ -55,9 +55,9 @@ func MustNewStore(cfg *config.AppConfig) *DB {
 		db.Logger.LogMode(logger.Info)
 	}
 	sqlDb, _ := db.DB()
-	sqlDb.SetMaxOpenConns(100)             // 设置最大空闲连接数为10个
-	sqlDb.SetConnMaxIdleTime(10)           // 设置可打开的最大连接数为 100 个
-	sqlDb.SetConnMaxLifetime(300000000000) // 5分钟 设置一个连接空闲后在多长时间内可复用,上面配置文件里设置的是300000000000, 因为Go的time.Duration底层类型是int64, 一秒是1000000000,这个大家可设置一个适当的时间,一般5~15分钟,不要太长
+	sqlDb.SetMaxOpenConns(100)                // 设置最大空闲连接数为10个
+	sqlDb.SetConnMaxIdleTime(10)              // 设置可打开的最大连接数为 100 个
+	sqlDb.SetConnMaxLifetime(5 * time.Minute) // 5分钟 设置一个连接空闲后在多长时间内可复用,上面配置文件里设置的是300000000000, 因为Go的time.Duration底层类型是int64, 一秒是1000000000,这个大家可设置一个适当的时间,一般5~15分钟,不要太长
 	if err = sqlDb.Ping(); err != nil {
 		panic(xerr.WithStack(err))
 	}

+ 89 - 0
store/mqttstore/mqtt_store.go

@@ -0,0 +1,89 @@
+package mqttstore
+
+import (
+	"kpt-pasture/config"
+	"time"
+
+	"gitee.com/xuyiping_admin/pkg/logger/logrus"
+	"gitee.com/xuyiping_admin/pkg/xerr"
+
+	"gorm.io/driver/mysql"
+	"gorm.io/gorm"
+	"gorm.io/gorm/logger"
+
+	"gitee.com/xuyiping_admin/pkg/di"
+)
+
+var Module = di.Options(
+	di.Provide(MustNewStore),
+)
+
+type DB struct {
+	*gorm.DB
+}
+
+func NewStore(engine *gorm.DB) *DB {
+	return &DB{engine}
+}
+
+type goRmLog struct {
+}
+
+func (g goRmLog) Printf(s string, i ...interface{}) {
+	logrus.Infof(s, i...)
+}
+
+var newLogger = logger.New(
+	goRmLog{},
+	logger.Config{
+		SlowThreshold: 5 * time.Second,
+		LogLevel:      logger.Info,
+	},
+)
+
+func MustNewStore(cfg *config.AppConfig) *DB {
+	db, err := gorm.Open(mysql.New(mysql.Config{
+		DriverName: cfg.StoreSetting.DriverName,
+		DSN:        cfg.StoreSetting.KptMqtt}),
+		&gorm.Config{Logger: newLogger},
+	)
+	if err != nil {
+		panic(xerr.WithStack(err))
+	}
+
+	if cfg.StoreSetting.ShowSQL {
+		db.Logger.LogMode(logger.Info)
+	}
+	sqlDb, _ := db.DB()
+	sqlDb.SetMaxOpenConns(100)                // 设置最大空闲连接数为10个
+	sqlDb.SetConnMaxIdleTime(10)              // 设置可打开的最大连接数为 100 个
+	sqlDb.SetConnMaxLifetime(5 * time.Minute) // 5分钟 设置一个连接空闲后在多长时间内可复用,上面配置文件里设置的是300000000000, 因为Go的time.Duration底层类型是int64, 一秒是1000000000,这个大家可设置一个适当的时间,一般5~15分钟,不要太长
+	if err = sqlDb.Ping(); err != nil {
+		panic(xerr.WithStack(err))
+	}
+
+	return NewStore(db)
+}
+
+func MustMigrateStore(cfg *config.AppConfig) *gorm.DB {
+	db, err := gorm.Open(mysql.New(mysql.Config{
+		DriverName: cfg.StoreSetting.DriverName,
+		DSN:        cfg.StoreSetting.KptMigr}),
+		&gorm.Config{Logger: newLogger},
+	)
+	if err != nil {
+		panic(xerr.WithStack(err))
+	}
+
+	if cfg.StoreSetting.ShowSQL {
+		db.Logger.LogMode(logger.Info)
+	}
+	sqlDb, _ := db.DB()
+	sqlDb.SetMaxOpenConns(100)             // 设置最大空闲连接数为10个
+	sqlDb.SetConnMaxIdleTime(10)           // 设置可打开的最大连接数为 100 个
+	sqlDb.SetConnMaxLifetime(300000000000) // 5分钟 设置一个连接空闲后在多长时间内可复用,上面配置文件里设置的是300000000000, 因为Go的time.Duration底层类型是int64, 一秒是1000000000,这个大家可设置一个适当的时间,一般5~15分钟,不要太长
+	if err = sqlDb.Ping(); err != nil {
+		panic(xerr.WithStack(err))
+	}
+	return db
+}

+ 4 - 1
util/util.go

@@ -439,7 +439,10 @@ func XFrameId(frameid int32) int32 {
 // FrameIds 获取FrameIds
 func FrameIds(xFrameId int32) []int32 {
 	frameIds := make([]int32, 0)
-	for i := 1; i <= 6; i++ {
+	for i := 1; i <= 8; i++ {
+		if i == 7 {
+			continue
+		}
 		frameIds = append(frameIds, xFrameId*10+int32(i))
 	}
 	return frameIds

+ 7 - 2
util/util_test.go

@@ -2,7 +2,6 @@ package util
 
 import (
 	"fmt"
-	"math"
 	"testing"
 	"time"
 
@@ -522,5 +521,11 @@ type XToday struct {
 }
 
 func Test_demo(t *testing.T) {
-	fmt.Println(math.Round(float64(44-0) / float64(0) * 100))
+	res := make([]int64, 0)
+	for i := 1; i <= 100; i++ {
+		res = append(res, int64(i))
+	}
+
+	fmt.Println(res[:50])
+	fmt.Println(res[50:])
 }