| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 | 
							- package asynqsvc
 
- import (
 
- 	"context"
 
- 	"kpt-pasture/config"
 
- 	"time"
 
- 	"gitee.com/xuyiping_admin/pkg/di"
 
- 	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
 
- 	"github.com/hibiken/asynq"
 
- 	"go.uber.org/zap"
 
- )
 
- var Module = di.Options(di.Provide(NewClient))
 
- type Server struct {
 
- 	*asynq.Server
 
- 	Mux *asynq.ServeMux
 
- }
 
- func initServer(setting *AsynqSetting) *Server {
 
- 	srv := asynq.NewServer(setting.Redis.RedisClientOpt(), setting.Config())
 
- 	mux := asynq.NewServeMux()
 
- 	mux.Use(asynqMiddlewareLog)
 
- 	return &Server{
 
- 		Server: srv,
 
- 		Mux:    mux,
 
- 	}
 
- }
 
- func NewServer(cfg *config.AppConfig) *Server {
 
- 	return initServer(NewAsynqSetting(&cfg.SideWorkSetting.AsynqSetting))
 
- }
 
- func asynqError(ctx context.Context, task *asynq.Task, err error) {
 
- 	if err == nil {
 
- 		return
 
- 	}
 
- 	taskId, _ := asynq.GetTaskID(ctx)
 
- 	retriId, _ := asynq.GetRetryCount(ctx)
 
- 	maxRetry, _ := asynq.GetMaxRetry(ctx)
 
- 	zaplog.Error("asynqError maxRetry",
 
- 		zap.Any("taskId", taskId),
 
- 		zap.Any("taskType", task.Type()),
 
- 		zap.Any("retried exhausted", retriId),
 
- 		zap.Any("task.Payload", string(task.Payload())),
 
- 		zap.Any("maxRetry", maxRetry),
 
- 		zap.Any("Err", err.Error()),
 
- 	)
 
- }
 
- func asynqMiddlewareLog(h asynq.Handler) asynq.Handler {
 
- 	return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
 
- 		start := time.Now().Local()
 
- 		if err := h.ProcessTask(ctx, task); err != nil {
 
- 			return err
 
- 		}
 
- 		zaplog.Info("asynqMiddlewareLog", zap.Any("task.Type", task.Type()), zap.Any("Time", time.Since(start)))
 
- 		return nil
 
- 	})
 
- }
 
 
  |