1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- package asynqsvc
- import (
- "context"
- "kpt-pasture/config"
- "time"
- "gitee.com/xuyiping_admin/pkg/di"
- "gitee.com/xuyiping_admin/pkg/logger/zaplog"
- "github.com/hibiken/asynq"
- "go.uber.org/zap"
- )
- var Module = di.Options(
- di.Provide(NewClient))
- type Server struct {
- *asynq.Server
- Mux *asynq.ServeMux
- }
- func initServer(setting *AsynqSetting) *Server {
- srv := asynq.NewServer(setting.Redis.RedisClientOpt(), setting.Config())
- mux := asynq.NewServeMux()
- mux.Use(asynqMiddlewareLog)
- return &Server{
- Server: srv,
- Mux: mux,
- }
- }
- func NewServer(cfg *config.AppConfig) *Server {
- return initServer(NewAsynqSetting(&cfg.SideWorkSetting.AsynqSetting))
- }
- func asynqError(ctx context.Context, task *asynq.Task, err error) {
- if err == nil {
- return
- }
- taskId, _ := asynq.GetTaskID(ctx)
- retriId, _ := asynq.GetRetryCount(ctx)
- maxRetry, _ := asynq.GetMaxRetry(ctx)
- zaplog.Error("asynqError maxRetry",
- zap.Any("taskId", taskId),
- zap.Any("taskType", task.Type()),
- zap.Any("retried exhausted", retriId),
- zap.Any("task.Payload", string(task.Payload())),
- zap.Any("maxRetry", maxRetry),
- zap.Any("Err", err.Error()),
- )
- }
- func asynqMiddlewareLog(h asynq.Handler) asynq.Handler {
- return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
- start := time.Now()
- if err := h.ProcessTask(ctx, task); err != nil {
- return err
- }
- zaplog.Info("asynq middleware Process end", zap.Any("task.Type", task.Type()), zap.Any("Time", time.Since(start)))
- return nil
- })
- }
|