123456789101112131415161718192021222324252627282930313233343536373839 |
- package asynqsvc
- import (
- "context"
- "kpt-pasture/config"
- "gitee.com/xuyiping_admin/pkg/logger/zaplog"
- "gitee.com/xuyiping_admin/pkg/xerr"
- "github.com/hibiken/asynq"
- "go.uber.org/zap"
- )
- type Option = asynq.Option
- type Client interface {
- CtxEnqueue(ctx context.Context, task *asynq.Task, ops ...Option) (*asynq.TaskInfo, error)
- }
- func NewClient(cfg *config.AppConfig) Client {
- setting := NewAsynqSetting(&cfg.SideWorkSetting.AsynqSetting)
- client := asynq.NewClient(setting.Redis.RedisClientOpt())
- return &ClientEntry{client}
- }
- type ClientEntry struct {
- *asynq.Client
- }
- func (c *ClientEntry) CtxEnqueue(ctx context.Context, task *asynq.Task, ops ...Option) (*asynq.TaskInfo, error) {
- zaplog.Info("asynq CtxEnqueue", zap.Any("task", task), zap.Any("ops", ops))
- taskInfo, err := c.Client.Enqueue(task, ops...)
- if err != nil {
- zaplog.Info("asynq CtxEnqueue failed Error ", zap.Any("err", err))
- return taskInfo, xerr.WithStack(err)
- } else {
- return taskInfo, nil
- }
- }
|