| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 | package asynqsvcimport (	"context"	"kpt-pasture/config"	"time"	"gitee.com/xuyiping_admin/pkg/di"	"gitee.com/xuyiping_admin/pkg/logger/zaplog"	"github.com/hibiken/asynq"	"go.uber.org/zap")var Module = di.Options(	di.Provide(NewClient))type Server struct {	*asynq.Server	Mux *asynq.ServeMux}func initServer(setting *AsynqSetting) *Server {	srv := asynq.NewServer(setting.Redis.RedisClientOpt(), setting.Config())	mux := asynq.NewServeMux()	mux.Use(asynqMiddlewareLog)	return &Server{		Server: srv,		Mux:    mux,	}}func NewServer(cfg *config.AppConfig) *Server {	return initServer(NewAsynqSetting(&cfg.SideWorkSetting.AsynqSetting))}func asynqError(ctx context.Context, task *asynq.Task, err error) {	if err == nil {		return	}	taskId, _ := asynq.GetTaskID(ctx)	retriId, _ := asynq.GetRetryCount(ctx)	maxRetry, _ := asynq.GetMaxRetry(ctx)	zaplog.Error("asynqError maxRetry",		zap.Any("taskId", taskId),		zap.Any("taskType", task.Type()),		zap.Any("retried exhausted", retriId),		zap.Any("task.Payload", string(task.Payload())),		zap.Any("maxRetry", maxRetry),		zap.Any("Err", err.Error()),	)}func asynqMiddlewareLog(h asynq.Handler) asynq.Handler {	return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {		start := time.Now()		if err := h.ProcessTask(ctx, task); err != nil {			return err		}		zaplog.Info("asynq middleware Process end", zap.Any("task.Type", task.Type()), zap.Any("Time", time.Since(start)))		return nil	})}
 |