package milk import ( "bytes" "encoding/json" "io" "kpt-pasture/config" "kpt-pasture/model" "net/http" "os" "path/filepath" "strings" "sync" "time" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "go.uber.org/zap" ) type FindDataService struct { fileCache sync.Map // 用于存储文件信息 cfg config.MilkHallSetting } // FileInfo 存储文件信息 type FileInfo struct { ModTime time.Time Size int64 } func NewMilkDataService(cfg config.MilkHallSetting) *FindDataService { return &FindDataService{ cfg: cfg, } } // ReadFiles 读取指定路径下的所有文件 func (s *FindDataService) ReadFiles() ([]string, error) { var files []string for _, path := range s.cfg.FilePath { fullPath := filepath.Join(s.cfg.BasePath, path) if err := filepath.Walk(fullPath, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() { files = append(files, path) } return nil }); err != nil { return files, err } } return files, nil } func (s *FindDataService) Run(fileList []string) { ticker := time.NewTicker(30 * time.Second) // 增加间隔到30秒 defer ticker.Stop() // 创建退出通道 done := make(chan struct{}) defer close(done) for { select { case <-ticker.C: for _, file := range fileList { // 检查文件是否存在 info, err := os.Stat(file) if err != nil { continue } // 检查文件大小,如果超过10MB则跳过 /*if info.Size() > 10*1024*1024 { continue }*/ // 检查文件是否被修改 if cachedInfo, ok := s.fileCache.Load(file); ok { if cachedInfo.(FileInfo).ModTime.Equal(info.ModTime()) { continue // 文件未修改,跳过 } } if len(info.Name()) < 30 || strings.Index(info.Name(), "done.") < 0 || strings.Index(info.Name(), "Parlor.json") < 0 { continue } // 读取文件内容 content, err := os.ReadFile(file) if err != nil { continue } // 更新文件缓存 s.fileCache.Store(file, FileInfo{ ModTime: info.ModTime(), Size: info.Size(), }) s.HandleFileContent(content) } case <-done: return } } } func (s *FindDataService) HandleFileContent(contentBody []byte) { requestBody := model.MilkHallBody{ Brand: s.cfg.Brand, FarmId: s.cfg.FarmId, Content: contentBody, } // 将内容转换为 JSON 字节 jsonData, err := json.Marshal(requestBody) if err != nil { zaplog.Error("HandleFileContent", zap.Any("error", err)) return } // 创建 HTTP 请求 req, err := http.NewRequest("POST", strings.Join([]string{s.cfg.BackPath, s.cfg.UrlPath}, ""), bytes.NewBuffer(jsonData)) if err != nil { zaplog.Error("HandleFileContent", zap.Any("error", err)) return } // 设置请求头 req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer your-token") // 如果需要认证,替换为实际的 token // 创建 HTTP 客户端 client := &http.Client{ Timeout: 10 * time.Second, } // 发送请求 resp, err := client.Do(req) if err != nil { zaplog.Error("HandleFileContent", zap.Any("error", err)) return } defer resp.Body.Close() // 读取响应 body, err := io.ReadAll(resp.Body) if err != nil { zaplog.Error("HandleFileContent", zap.Any("error", err)) return } // 检查响应状态码 if resp.StatusCode != http.StatusOK { zaplog.Error("HandleFileContent", zap.Any("error", err)) return } zaplog.Info("HandleFileContent", zap.Any("请求成功,响应内容:", string(body))) }