asynq.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package asynqsvc
  2. import (
  3. "context"
  4. "kpt-pasture/config"
  5. "time"
  6. "gitee.com/xuyiping_admin/pkg/di"
  7. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  8. "github.com/hibiken/asynq"
  9. "go.uber.org/zap"
  10. )
  11. var Module = di.Options(di.Provide(NewClient))
  12. type Server struct {
  13. *asynq.Server
  14. Mux *asynq.ServeMux
  15. }
  16. func initServer(setting *AsynqSetting) *Server {
  17. srv := asynq.NewServer(setting.Redis.RedisClientOpt(), setting.Config())
  18. mux := asynq.NewServeMux()
  19. mux.Use(asynqMiddlewareLog)
  20. return &Server{
  21. Server: srv,
  22. Mux: mux,
  23. }
  24. }
  25. func NewServer(cfg *config.AppConfig) *Server {
  26. return initServer(NewAsynqSetting(&cfg.SideWorkSetting.AsynqSetting))
  27. }
  28. func asynqError(ctx context.Context, task *asynq.Task, err error) {
  29. if err == nil {
  30. return
  31. }
  32. taskId, _ := asynq.GetTaskID(ctx)
  33. retriId, _ := asynq.GetRetryCount(ctx)
  34. maxRetry, _ := asynq.GetMaxRetry(ctx)
  35. zaplog.Error("asynqError maxRetry",
  36. zap.Any("taskId", taskId),
  37. zap.Any("taskType", task.Type()),
  38. zap.Any("retried exhausted", retriId),
  39. zap.Any("task.Payload", string(task.Payload())),
  40. zap.Any("maxRetry", maxRetry),
  41. zap.Any("Err", err.Error()),
  42. )
  43. }
  44. func asynqMiddlewareLog(h asynq.Handler) asynq.Handler {
  45. return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
  46. start := time.Now()
  47. if err := h.ProcessTask(ctx, task); err != nil {
  48. return err
  49. }
  50. zaplog.Info("asynqMiddlewareLog", zap.Any("task.Type", task.Type()), zap.Any("Time", time.Since(start)))
  51. return nil
  52. })
  53. }