12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- package asynqsvc
- import (
- "context"
- "kpt-pasture/config"
- "time"
- "gitee.com/xuyiping_admin/pkg/di"
- "gitee.com/xuyiping_admin/pkg/logger/zaplog"
- "github.com/hibiken/asynq"
- "go.uber.org/zap"
- )
- var Module = di.Options(di.Provide(NewClient))
- type Server struct {
- *asynq.Server
- Mux *asynq.ServeMux
- }
- func initServer(setting *AsynqSetting) *Server {
- srv := asynq.NewServer(setting.Redis.RedisClientOpt(), setting.Config())
- mux := asynq.NewServeMux()
- mux.Use(asynqMiddlewareLog)
- return &Server{
- Server: srv,
- Mux: mux,
- }
- }
- func NewServer(cfg *config.AppConfig) *Server {
- return initServer(NewAsynqSetting(&cfg.SideWorkSetting.AsynqSetting))
- }
- func asynqError(ctx context.Context, task *asynq.Task, err error) {
- if err == nil {
- return
- }
- taskId, _ := asynq.GetTaskID(ctx)
- retriId, _ := asynq.GetRetryCount(ctx)
- maxRetry, _ := asynq.GetMaxRetry(ctx)
- zaplog.Error("asynqError maxRetry",
- zap.Any("taskId", taskId),
- zap.Any("taskType", task.Type()),
- zap.Any("retried exhausted", retriId),
- zap.Any("task.Payload", string(task.Payload())),
- zap.Any("maxRetry", maxRetry),
- zap.Any("Err", err.Error()),
- )
- }
- func asynqMiddlewareLog(h asynq.Handler) asynq.Handler {
- return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
- start := time.Now()
- if err := h.ProcessTask(ctx, task); err != nil {
- return err
- }
- zaplog.Info("asynqMiddlewareLog", zap.Any("task.Type", task.Type()), zap.Any("Time", time.Since(start)))
- return nil
- })
- }
|