package asynqsvc import ( "context" "kpt-pasture/config" "time" "gitee.com/xuyiping_admin/pkg/di" "gitee.com/xuyiping_admin/pkg/logger/zaplog" "github.com/hibiken/asynq" "go.uber.org/zap" ) var Module = di.Options(di.Provide(NewClient)) type Server struct { *asynq.Server Mux *asynq.ServeMux } func initServer(setting *AsynqSetting) *Server { srv := asynq.NewServer(setting.Redis.RedisClientOpt(), setting.Config()) mux := asynq.NewServeMux() mux.Use(asynqMiddlewareLog) return &Server{ Server: srv, Mux: mux, } } func NewServer(cfg *config.AppConfig) *Server { return initServer(NewAsynqSetting(&cfg.SideWorkSetting.AsynqSetting)) } func asynqError(ctx context.Context, task *asynq.Task, err error) { if err == nil { return } taskId, _ := asynq.GetTaskID(ctx) retriId, _ := asynq.GetRetryCount(ctx) maxRetry, _ := asynq.GetMaxRetry(ctx) zaplog.Error("asynqError maxRetry", zap.Any("taskId", taskId), zap.Any("taskType", task.Type()), zap.Any("retried exhausted", retriId), zap.Any("task.Payload", string(task.Payload())), zap.Any("maxRetry", maxRetry), zap.Any("Err", err.Error()), ) } func asynqMiddlewareLog(h asynq.Handler) asynq.Handler { return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error { start := time.Now() if err := h.ProcessTask(ctx, task); err != nil { return err } zaplog.Info("asynqMiddlewareLog", zap.Any("task.Type", task.Type()), zap.Any("Time", time.Since(start))) return nil }) }