client.go 989 B

123456789101112131415161718192021222324252627282930313233343536373839
  1. package asynqsvc
  2. import (
  3. "context"
  4. "kpt-pasture/config"
  5. "gitee.com/xuyiping_admin/pkg/logger/zaplog"
  6. "gitee.com/xuyiping_admin/pkg/xerr"
  7. "github.com/hibiken/asynq"
  8. "go.uber.org/zap"
  9. )
  10. type Option = asynq.Option
  11. type Client interface {
  12. CtxEnqueue(ctx context.Context, task *asynq.Task, ops ...Option) (*asynq.TaskInfo, error)
  13. }
  14. func NewClient(cfg *config.AppConfig) Client {
  15. setting := NewAsynqSetting(&cfg.SideWorkSetting.AsynqSetting)
  16. client := asynq.NewClient(setting.Redis.RedisClientOpt())
  17. return &ClientEntry{client}
  18. }
  19. type ClientEntry struct {
  20. *asynq.Client
  21. }
  22. func (c *ClientEntry) CtxEnqueue(ctx context.Context, task *asynq.Task, ops ...Option) (*asynq.TaskInfo, error) {
  23. zaplog.Info("asynq CtxEnqueue", zap.Any("task", task), zap.Any("ops", ops))
  24. taskInfo, err := c.Client.Enqueue(task, ops...)
  25. if err != nil {
  26. zaplog.Info("asynq CtxEnqueue failed Error ", zap.Any("err", err))
  27. return taskInfo, xerr.WithStack(err)
  28. } else {
  29. return taskInfo, nil
  30. }
  31. }