asynq.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package asynqsvc
  2. import (
  3. "context"
  4. "kpt-pasture/config"
  5. "time"
  6. "gitee.com/xuyiping_admin/pkg/di"
  7. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  8. "github.com/hibiken/asynq"
  9. "go.uber.org/zap"
  10. )
  11. var Module = di.Options(
  12. di.Provide(NewClient))
  13. type Server struct {
  14. *asynq.Server
  15. Mux *asynq.ServeMux
  16. }
  17. func initServer(setting *AsynqSetting) *Server {
  18. srv := asynq.NewServer(setting.Redis.RedisClientOpt(), setting.Config())
  19. mux := asynq.NewServeMux()
  20. mux.Use(asynqMiddlewareLog)
  21. return &Server{
  22. Server: srv,
  23. Mux: mux,
  24. }
  25. }
  26. func NewServer(cfg *config.AppConfig) *Server {
  27. return initServer(NewAsynqSetting(&cfg.SideWorkSetting.AsynqSetting))
  28. }
  29. func asynqError(ctx context.Context, task *asynq.Task, err error) {
  30. if err == nil {
  31. return
  32. }
  33. taskId, _ := asynq.GetTaskID(ctx)
  34. retriId, _ := asynq.GetRetryCount(ctx)
  35. maxRetry, _ := asynq.GetMaxRetry(ctx)
  36. zaplog.Error("asynqError maxRetry",
  37. zap.Any("taskId", taskId),
  38. zap.Any("taskType", task.Type()),
  39. zap.Any("retried exhausted", retriId),
  40. zap.Any("task.Payload", string(task.Payload())),
  41. zap.Any("maxRetry", maxRetry),
  42. zap.Any("Err", err.Error()),
  43. )
  44. }
  45. func asynqMiddlewareLog(h asynq.Handler) asynq.Handler {
  46. return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
  47. start := time.Now()
  48. if err := h.ProcessTask(ctx, task); err != nil {
  49. return err
  50. }
  51. zaplog.Info("asynq middleware Process end", zap.Any("task.Type", task.Type()), zap.Any("Time", time.Since(start)))
  52. return nil
  53. })
  54. }