Browse Source

milk: hall 阿菲金奶厅数据同步

Yi 3 days ago
parent
commit
94f80ee45b

+ 25 - 0
cmd/milk.go

@@ -0,0 +1,25 @@
+package cmd
+
+import (
+	"kpt-pasture/config"
+	"kpt-pasture/service/milk"
+
+	"github.com/spf13/cobra"
+)
+
+var MilkCmd = &cobra.Command{
+	Use:   "milk",
+	Short: "Milk Hall Data",
+	Run: func(cmd *cobra.Command, args []string) {
+		bootMilk(config.Options())
+	},
+}
+
+func bootMilk(cfg *config.AppConfig) {
+	findMilkDataService := milk.NewMilkDataService(cfg.MilkHall)
+	files, err := findMilkDataService.ReadFiles()
+	if err != nil {
+		panic(err)
+	}
+	findMilkDataService.Run(files)
+}

+ 1 - 0
cmd/root.go

@@ -24,4 +24,5 @@ func init() {
 	RootCmd.AddCommand(CrontabCmd)
 	RootCmd.AddCommand(ConsumerCmd)
 	RootCmd.AddCommand(MqttCmd)
+	RootCmd.AddCommand(MilkCmd)
 }

+ 13 - 1
config/app.develop.yaml

@@ -54,7 +54,6 @@ cron:
   update_pen_behavior: "0 45 * * * ?"  # 更新栏舍行行为数据
   update_pen_behavior_daily: "0 05 2 * * ?"  # 更新栏舍饲养监测数据
 
-
 mqtt:
   broker: "kptyun.com:1983"
   username: "kptmqtt"
@@ -68,3 +67,16 @@ mqtt:
   reconnect_interval: 10
   work_number: 1
   merge_data_ticker: 2   # 2分钟合并一次数据
+
+milk_hall:
+  base_path: "/milk_hall"
+  file_path:
+    - "/file1"
+    - "/file2"
+  back_path: "http://your-api-endpoint"
+  url_path: "/api/v1/milk/hall/original"
+  brand: "afimilk"
+  farm_id: "c7357ce63cc7dddf6aa75c5baa37c507"
+
+
+

+ 10 - 0
config/app.go

@@ -44,6 +44,7 @@ type AppConfig struct {
 	SideWorkSetting SideWorkSetting `yaml:"side_work_setting"`
 	CronSetting     CronSetting     `json:"cron_setting" yaml:"cron"`
 	Mqtt            MqttSetting     `json:"mqtt"`
+	MilkHall        MilkHallSetting `json:"milkHall"`
 }
 
 type CronSetting struct {
@@ -183,6 +184,15 @@ type MqttSetting struct {
 	MergeDataTicker   int    `json:"mergeDataTicker" yaml:"merge_data_ticker"`
 }
 
+type MilkHallSetting struct {
+	BasePath string   `yaml:"base_path"`
+	FilePath []string `yaml:"file_path"`
+	BackPath string   `json:"back_path"`
+	UrlPath  string   `yaml:"url_path"`
+	Brand    string   `yaml:"brand"`
+	FarmId   string   `yaml:"farm_id"`
+}
+
 func (a *AppConfig) Name() string {
 	return fmt.Sprintf("%s-%s", a.AppName, a.AppEnv)
 }

+ 31 - 0
http/handler/milk/milk.go

@@ -0,0 +1,31 @@
+package milk
+
+import (
+	"kpt-pasture/http/middleware"
+	"net/http"
+
+	operationPb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/operation"
+	"gitee.com/xuyiping_admin/pkg/apierr"
+	"gitee.com/xuyiping_admin/pkg/ginutil"
+	"github.com/gin-gonic/gin"
+)
+
+func ProcessOriginal(c *gin.Context) {
+	// 获取请求体数据
+	body, err := c.GetRawData()
+	if err != nil {
+		apierr.ClassifiedAbort(c, err)
+		return
+	}
+
+	err = middleware.Dependency(c).StoreEventHub.OpsService.MilkHallOriginal(c, body)
+	if err != nil {
+		apierr.ClassifiedAbort(c, err)
+		return
+	}
+	ginutil.JSONResp(c, &operationPb.CommonOK{
+		Code: http.StatusOK,
+		Msg:  "ok",
+		Data: &operationPb.Success{Success: true},
+	})
+}

+ 18 - 0
http/route/milk_api.go

@@ -0,0 +1,18 @@
+package route
+
+import (
+	"kpt-pasture/http/handler/milk"
+
+	"github.com/gin-gonic/gin"
+)
+
+func MilkManageAPI(opts ...func(engine *gin.Engine)) func(s *gin.Engine) {
+	return func(s *gin.Engine) {
+		for _, opt := range opts {
+			opt(s)
+		}
+		// milk API 组  奶厅数据管理
+		pastureRoute := authRouteGroup(s, "/api/v1/milk/")
+		pastureRoute.POST("/hall/original", milk.ProcessOriginal)
+	}
+}

+ 1 - 0
http/route/route.go

@@ -16,6 +16,7 @@ func HTTPServerRoute(opts ...func(engine *gin.Engine)) func(s *gin.Engine) {
 		DashboardApi(opts...),
 		WorkOrderAPI(opts...),
 		FilesManageAPI(opts...),
+		MilkManageAPI(opts...),
 		TestAPI(opts...),
 	}
 

+ 48 - 0
model/milk_hall.go

@@ -0,0 +1,48 @@
+package model
+
+const (
+	AFI = "afimilk"
+	GEA = "gea"
+)
+
+type MilkHallBody struct {
+	MilkHallNumber string `json:"milkHallNumber"`
+	Brand          string `json:"brand"`
+	Content        []byte `json:"content"`
+	FarmId         string `json:"farmId"`
+}
+
+// AFIMilkHallOriginal 阿菲金奶厅原始数据结构
+type AFIMilkHallOriginal struct {
+	UID                     int     `json:"uid"`
+	SessionNumber           int32   `json:"sessionNumber"`
+	SessionDate             string  `json:"sessionDate"`
+	StallNumber             int64   `json:"stallNumber"`
+	AnimalID                string  `json:"animalID,omitempty"` // 使用 omitempty 忽略空值
+	GroupNumber             int     `json:"groupNumber,omitempty"`
+	Yield                   int64   `json:"yield"`
+	Yield2Minutes           int     `json:"yield2Minutes,omitempty"`
+	Yield2MinutesPercentage float64 `json:"yield2MinutesPercentage,omitempty"`
+	MilkingTime             int     `json:"milkingTime,omitempty"`
+	FlowRate0To15           int64   `json:"flowRate_0To15,omitempty"`
+	FlowRate15To30          int64   `json:"flowRate_15To30,omitempty"`
+	FlowRate30To60          int64   `json:"flowRate_30To60,omitempty"`
+	FlowRate60To120         int64   `json:"flowRate_60To120,omitempty"`
+	HasKickoffs             bool    `json:"hasKickoffs"`
+	TotalAttachments        int32   `json:"totalAttachments,omitempty"`
+	IrregularDetachments    int     `json:"irregularDetachments,omitempty"`
+	Amt1                    int32   `json:"amt1,omitempty"`
+	DetachmentReason1       int     `json:"detachmentReason1,omitempty"`
+	Yield1                  int     `json:"yield1,omitempty"`
+	Amt2                    int     `json:"amt2,omitempty"`
+	DetachmentReason2       int     `json:"detachmentReason2,omitempty"`
+	Yield2                  int     `json:"yield2,omitempty"`
+	Amt3                    int     `json:"amt3,omitempty"`
+	DetachmentReason3       int     `json:"detachmentReason3,omitempty"`
+	Yield3                  int     `json:"yield3,omitempty"`
+	LowFlowRateTime         int32   `json:"lowFlowRateTime,omitempty"`
+	RemovalFlowRate         int     `json:"removalFlowRate,omitempty"`
+	PeakFlowRateTime        int     `json:"peakFlowRateTime,omitempty"`
+	PeakFlowRate            int32   `json:"peakFlowRate,omitempty"`
+	MilkingBimodality       bool    `json:"milkingBimodality,omitempty"`
+}

+ 98 - 0
model/milk_original.go

@@ -0,0 +1,98 @@
+package model
+
+type MilkOriginal struct {
+	Id               int64   `json:"id"`
+	PastureId        int64   `json:"pastureId"`
+	CowId            int64   `json:"cowId"`
+	EarNumber        string  `json:"earNumber"`
+	EleEarNumber     string  `json:"eleEarNumber"`
+	PenId            int32   `json:"penId"`
+	PenName          string  `json:"penName"`
+	MilkDate         string  `json:"milkDate"`
+	MilkWeight       int64   `json:"milkWeight"`
+	StartTime        string  `json:"startTime"`
+	InitialTime      string  `json:"initialTime"`
+	AttachTime       string  `json:"attachTime"`
+	AttachAdjustTime string  `json:"attachAdjustTime"`
+	DetacherTime     string  `json:"detacherTime"`
+	EndTime          string  `json:"endTime"`
+	DetacherAddress  int64   `json:"detacherAddress"`
+	Conductivity     int32   `json:"conductivity"`
+	CowActivity      int32   `json:"cowActivity"`
+	Source           int8    `json:"source"`
+	MilkHallNumber   string  `json:"milkHallNumber"`
+	Shifts           int32   `json:"shifts"`
+	Load             int32   `json:"load"`
+	Nattach          int32   `json:"nattach"`
+	RecognitionTime  string  `json:"recognitionTime"`
+	IsYieldLow       int8    `json:"isYieldLow"`
+	PeakFlow         float64 `json:"peakFlow"`
+	AvgFlow          float64 `json:"avgFlow"`
+	Duration         float64 `json:"duration"`
+	PearFlowTime     int32   `json:"pearFlowTime"`
+	LowFlowTime      int32   `json:"lowFlowTime"`
+	YieldPercentage  int32   `json:"yieldPercentage"`
+	ActualMilkTime   string  `json:"actualMilkTime"`
+	KickOffs         bool    `json:"kickOffs"`
+	Blocks           int8    `json:"blocks"`
+	Slips            int8    `json:"slips"`
+	ManualDetach     int8    `json:"manualDetach"`
+	TakeOffFlow      float64 `json:"takeOffFlow"`
+	LowMilkFlowPc    int64   `json:"lowMilkFlowPc"`
+	Flow0To15        int64   `json:"flow0To15"`
+	Flow15To30       int64   `json:"flow15To30"`
+	Flow30To60       int64   `json:"flow30To60"`
+	Flow60To120      int64   `json:"flow60To120"`
+	CreatedAt        int64   `json:"createdAt"`
+	UpdatedAt        int64   `json:"updatedAt"`
+}
+
+func (m *MilkOriginal) tableName() string {
+	return "milk_original"
+}
+
+func NewAFIMilkOriginal(pastureId int64, milkHallNumber string, req *AFIMilkHallOriginal) *MilkOriginal {
+	return &MilkOriginal{
+		PastureId:        pastureId,
+		CowId:            0,
+		EarNumber:        req.AnimalID,
+		EleEarNumber:     "",
+		PenId:            0,
+		PenName:          "",
+		MilkDate:         req.SessionDate,
+		MilkWeight:       req.Yield,
+		StartTime:        "",
+		InitialTime:      "",
+		AttachTime:       "",
+		AttachAdjustTime: "",
+		DetacherTime:     "",
+		EndTime:          "",
+		DetacherAddress:  req.StallNumber,
+		Conductivity:     req.Amt1,
+		CowActivity:      0,
+		Source:           0,
+		MilkHallNumber:   milkHallNumber,
+		Shifts:           req.SessionNumber,
+		Load:             0,
+		Nattach:          req.TotalAttachments,
+		RecognitionTime:  "",
+		IsYieldLow:       0,
+		PeakFlow:         float64(req.PeakFlowRate) / 1000,
+		PearFlowTime:     req.PeakFlowRate,
+		AvgFlow:          0,
+		Duration:         float64(req.MilkingTime) / 60,
+		LowFlowTime:      req.LowFlowRateTime,
+		YieldPercentage:  0,
+		ActualMilkTime:   "",
+		KickOffs:         req.HasKickoffs,
+		Blocks:           0,
+		Slips:            0,
+		ManualDetach:     0,
+		TakeOffFlow:      float64(req.RemovalFlowRate) / 1000,
+		LowMilkFlowPc:    0,
+		Flow0To15:        req.FlowRate0To15,
+		Flow15To30:       req.FlowRate15To30,
+		Flow30To60:       req.FlowRate30To60,
+		Flow60To120:      req.FlowRate60To120,
+	}
+}

+ 5 - 0
module/backend/interface.go

@@ -52,6 +52,7 @@ type KptService interface {
 	AnalyseService       // 分析相关
 	DashboardService     // 牧场统计相关
 	WorkService          // 日常工作相关
+	MilkHallService      // 奶厅数据相关
 	TestService          // 测试相关
 }
 
@@ -321,6 +322,10 @@ type WorkService interface {
 	EstrusOrAbortionCowList(ctx context.Context, req *pasturePb.WarningItemsRequest, pagination *pasturePb.PaginationModel) (*pasturePb.EstrusResponse, error)
 }
 
+type MilkHallService interface {
+	MilkHallOriginal(ctx context.Context, req []byte) error
+}
+
 type TestService interface {
 	CowNeckRingNumberBound(ctx context.Context, pagination *pasturePb.PaginationModel) error
 	CowNeckRingNumberBound2(ctx context.Context, pagination *pasturePb.PaginationModel) error

+ 27 - 0
module/backend/milk_afimilk.go

@@ -0,0 +1,27 @@
+package backend
+
+import (
+	"context"
+	"encoding/json"
+	"kpt-pasture/model"
+
+	"gitee.com/xuyiping_admin/pkg/xerr"
+)
+
+// AFIMILK 阿菲金品牌实现
+type AFIMILK struct {
+	store *StoreEntry
+}
+
+func (a *AFIMILK) SaveData(ctx context.Context, body *model.MilkHallBody) error {
+	content := body.Content
+	if len(content) <= 0 {
+		return nil
+	}
+
+	afiMilkList := make([]*model.AFIMilkHallOriginal, 0)
+	if err := json.Unmarshal(content, &afiMilkList); err != nil {
+		return xerr.WithStack(err)
+	}
+	return nil
+}

+ 16 - 0
module/backend/milk_gea.go

@@ -0,0 +1,16 @@
+package backend
+
+import (
+	"context"
+	"kpt-pasture/model"
+)
+
+// GEAMILK gea品牌实现
+type GEAMILK struct {
+	store *StoreEntry
+}
+
+func (a *GEAMILK) SaveData(ctx context.Context, body *model.MilkHallBody) error {
+	// TODO: 实现品牌1的数据保存逻辑
+	return nil
+}

+ 33 - 0
module/backend/milk_hall.go

@@ -0,0 +1,33 @@
+package backend
+
+import (
+	"context"
+	"encoding/json"
+	"kpt-pasture/model"
+
+	"gitee.com/xuyiping_admin/pkg/xerr"
+)
+
+// MilkBrand 奶厅品牌接口
+type MilkBrand interface {
+	SaveData(ctx context.Context, body *model.MilkHallBody) error
+}
+
+func (s *StoreEntry) MilkHallOriginal(ctx context.Context, req []byte) error {
+	body := &model.MilkHallBody{}
+	if err := json.Unmarshal(req, body); err != nil {
+		return xerr.WithStack(err)
+	}
+	var milkBrand MilkBrand
+	switch body.Brand {
+	case model.AFI:
+		milkBrand = &AFIMILK{store: s}
+	case model.GEA:
+		milkBrand = &GEAMILK{store: s}
+	default:
+		return xerr.Customf("不支持的品牌: %s", body.Brand)
+	}
+
+	// 调用品牌的 SaveData 方法
+	return milkBrand.SaveData(ctx, body)
+}

+ 161 - 0
service/milk/find_date_service.go

@@ -0,0 +1,161 @@
+package milk
+
+import (
+	"bytes"
+	"encoding/json"
+	"io"
+	"kpt-pasture/config"
+	"kpt-pasture/model"
+	"net/http"
+	"os"
+	"path/filepath"
+	"strings"
+	"sync"
+	"time"
+
+	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
+	"go.uber.org/zap"
+)
+
+type FindDataService struct {
+	fileCache sync.Map // 用于存储文件信息
+	cfg       config.MilkHallSetting
+}
+
+// FileInfo 存储文件信息
+type FileInfo struct {
+	ModTime time.Time
+	Size    int64
+}
+
+func NewMilkDataService(cfg config.MilkHallSetting) *FindDataService {
+	return &FindDataService{
+		cfg: cfg,
+	}
+}
+
+// ReadFiles 读取指定路径下的所有文件
+func (s *FindDataService) ReadFiles() ([]string, error) {
+	var files []string
+	for _, path := range s.cfg.FilePath {
+		fullPath := filepath.Join(s.cfg.BasePath, path)
+		if err := filepath.Walk(fullPath, func(path string, info os.FileInfo, err error) error {
+			if err != nil {
+				return err
+			}
+			if !info.IsDir() {
+				files = append(files, path)
+			}
+			return nil
+		}); err != nil {
+			return files, err
+		}
+	}
+
+	return files, nil
+}
+
+func (s *FindDataService) Run(fileList []string) {
+	ticker := time.NewTicker(30 * time.Second) // 增加间隔到30秒
+	defer ticker.Stop()
+
+	// 创建退出通道
+	done := make(chan struct{})
+	defer close(done)
+
+	for {
+		select {
+		case <-ticker.C:
+			for _, file := range fileList {
+				// 检查文件是否存在
+				info, err := os.Stat(file)
+				if err != nil {
+					continue
+				}
+
+				// 检查文件大小,如果超过10MB则跳过
+				/*if info.Size() > 10*1024*1024 {
+					continue
+				}*/
+
+				// 检查文件是否被修改
+				if cachedInfo, ok := s.fileCache.Load(file); ok {
+					if cachedInfo.(FileInfo).ModTime.Equal(info.ModTime()) {
+						continue // 文件未修改,跳过
+					}
+				}
+
+				if len(info.Name()) < 30 || strings.Index(info.Name(), "done.") < 0 || strings.Index(info.Name(), "Parlor.json") < 0 {
+					continue
+				}
+				// 读取文件内容
+				content, err := os.ReadFile(file)
+				if err != nil {
+					continue
+				}
+
+				// 更新文件缓存
+				s.fileCache.Store(file, FileInfo{
+					ModTime: info.ModTime(),
+					Size:    info.Size(),
+				})
+
+				s.HandleFileContent(content)
+			}
+		case <-done:
+			return
+		}
+	}
+}
+
+func (s *FindDataService) HandleFileContent(contentBody []byte) {
+	requestBody := model.MilkHallBody{
+		Brand:   s.cfg.Brand,
+		FarmId:  s.cfg.FarmId,
+		Content: contentBody,
+	}
+	// 将内容转换为 JSON 字节
+	jsonData, err := json.Marshal(requestBody)
+	if err != nil {
+		zaplog.Error("HandleFileContent", zap.Any("error", err))
+		return
+	}
+
+	// 创建 HTTP 请求
+	req, err := http.NewRequest("POST", strings.Join([]string{s.cfg.BackPath, s.cfg.UrlPath}, ""), bytes.NewBuffer(jsonData))
+	if err != nil {
+		zaplog.Error("HandleFileContent", zap.Any("error", err))
+		return
+	}
+
+	// 设置请求头
+	req.Header.Set("Content-Type", "application/json")
+	req.Header.Set("Authorization", "Bearer your-token") // 如果需要认证,替换为实际的 token
+
+	// 创建 HTTP 客户端
+	client := &http.Client{
+		Timeout: 10 * time.Second,
+	}
+
+	// 发送请求
+	resp, err := client.Do(req)
+	if err != nil {
+		zaplog.Error("HandleFileContent", zap.Any("error", err))
+		return
+	}
+	defer resp.Body.Close()
+
+	// 读取响应
+	body, err := io.ReadAll(resp.Body)
+	if err != nil {
+		zaplog.Error("HandleFileContent", zap.Any("error", err))
+		return
+	}
+
+	// 检查响应状态码
+	if resp.StatusCode != http.StatusOK {
+		zaplog.Error("HandleFileContent", zap.Any("error", err))
+		return
+	}
+	zaplog.Info("HandleFileContent", zap.Any("请求成功,响应内容:", string(body)))
+}