123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- package milk
- import (
- "bytes"
- "encoding/json"
- "io"
- "kpt-pasture/config"
- "kpt-pasture/model"
- "net/http"
- "os"
- "path/filepath"
- "strings"
- "sync"
- "time"
- "gitee.com/xuyiping_admin/pkg/logger/zaplog"
- "go.uber.org/zap"
- )
- type FindDataService struct {
- fileCache sync.Map // 用于存储文件信息
- cfg config.MilkHallSetting
- }
- // FileInfo 存储文件信息
- type FileInfo struct {
- ModTime time.Time
- Size int64
- }
- func NewMilkDataService(cfg config.MilkHallSetting) *FindDataService {
- return &FindDataService{
- cfg: cfg,
- }
- }
- // ReadFiles 读取指定路径下的所有文件
- func (s *FindDataService) ReadFiles() ([]string, error) {
- var files []string
- for _, path := range s.cfg.FilePath {
- fullPath := filepath.Join(s.cfg.BasePath, path)
- if err := filepath.Walk(fullPath, func(path string, info os.FileInfo, err error) error {
- if err != nil {
- return err
- }
- if !info.IsDir() {
- files = append(files, path)
- }
- return nil
- }); err != nil {
- return files, err
- }
- }
- return files, nil
- }
- func (s *FindDataService) Run(fileList []string) {
- ticker := time.NewTicker(30 * time.Second) // 增加间隔到30秒
- defer ticker.Stop()
- // 创建退出通道
- done := make(chan struct{})
- defer close(done)
- for {
- select {
- case <-ticker.C:
- for _, file := range fileList {
- // 检查文件是否存在
- info, err := os.Stat(file)
- if err != nil {
- continue
- }
- // 检查文件大小,如果超过10MB则跳过
- /*if info.Size() > 10*1024*1024 {
- continue
- }*/
- // 检查文件是否被修改
- if cachedInfo, ok := s.fileCache.Load(file); ok {
- if cachedInfo.(FileInfo).ModTime.Equal(info.ModTime()) {
- continue // 文件未修改,跳过
- }
- }
- if len(info.Name()) < 30 || strings.Index(info.Name(), "done.") < 0 || strings.Index(info.Name(), "Parlor.json") < 0 {
- continue
- }
- // 读取文件内容
- content, err := os.ReadFile(file)
- if err != nil {
- continue
- }
- // 更新文件缓存
- s.fileCache.Store(file, FileInfo{
- ModTime: info.ModTime(),
- Size: info.Size(),
- })
- s.HandleFileContent(content)
- }
- case <-done:
- return
- }
- }
- }
- func (s *FindDataService) HandleFileContent(contentBody []byte) {
- requestBody := model.MilkHallBody{
- Brand: s.cfg.Brand,
- FarmId: s.cfg.FarmId,
- Content: contentBody,
- }
- // 将内容转换为 JSON 字节
- jsonData, err := json.Marshal(requestBody)
- if err != nil {
- zaplog.Error("HandleFileContent", zap.Any("error", err))
- return
- }
- // 创建 HTTP 请求
- req, err := http.NewRequest("POST", strings.Join([]string{s.cfg.BackPath, s.cfg.UrlPath}, ""), bytes.NewBuffer(jsonData))
- if err != nil {
- zaplog.Error("HandleFileContent", zap.Any("error", err))
- return
- }
- // 设置请求头
- req.Header.Set("Content-Type", "application/json")
- req.Header.Set("Authorization", "Bearer your-token") // 如果需要认证,替换为实际的 token
- // 创建 HTTP 客户端
- client := &http.Client{
- Timeout: 10 * time.Second,
- }
- // 发送请求
- resp, err := client.Do(req)
- if err != nil {
- zaplog.Error("HandleFileContent", zap.Any("error", err))
- return
- }
- defer resp.Body.Close()
- // 读取响应
- body, err := io.ReadAll(resp.Body)
- if err != nil {
- zaplog.Error("HandleFileContent", zap.Any("error", err))
- return
- }
- // 检查响应状态码
- if resp.StatusCode != http.StatusOK {
- zaplog.Error("HandleFileContent", zap.Any("error", err))
- return
- }
- zaplog.Info("HandleFileContent", zap.Any("请求成功,响应内容:", string(body)))
- }
|