package asynqsvc import ( "crypto/tls" "kpt-pasture/config" "time" "gitee.com/xuyiping_admin/pkg/cputil" "github.com/hibiken/asynq" ) type AsynqSetting struct { Redis AsynqRedisSetting `json:"redis" yaml:"redis"` Queues map[string]int `json:"queues,omitempty" yaml:"queues"` Concurrency int `json:"concurrency,omitempty" yaml:"concurrency"` LogLevel int32 `json:"log_level,omitempty" yaml:"log_level"` } func NewAsynqSetting(setting *config.AsynqSetting) *AsynqSetting { var curr AsynqSetting if err := cputil.DeepCopy(&curr, setting); err != nil { panic(err) } return &curr } type AsynqRedisSetting struct { // Network type to use, either tcp or unix. // Default is tcp. Network string `json:"network,omitempty" yaml:"network" env:"ASYNQ_REDIS_NETWORK"` // Redis server address in "host:port" format. Addr string `json:"addr,omitempty" yaml:"addr" env:"ASYNQ_REDIS_ADDR"` // Username to authenticate the current connection when Redis ACLs are used. // See: https://redis.io/commands/auth. Username string `json:"username,omitempty" yaml:"username" env:"ASYNQ_REDIS_USERNAME"` // Password to authenticate the current connection. // See: https://redis.io/commands/auth. Password string `json:"password,omitempty" yaml:"password" env:"ASYNQ_REDIS_PASSWORD"` // Redis DB to select after connecting to a server. // See: https://redis.io/commands/select. DB int `json:"db,omitempty" yaml:"db" env:"ASYNQ_REDIS_DB"` // Dial timeout for establishing new connections. // Default is 5 seconds. DialTimeout time.Duration `json:"dialTimeout,omitempty" yaml:"dial_timeout" env:"ASYNQ_REDIS_DIAL_TIMEOUT"` // Timeout for socket reads. // If timeout is reached, read commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is 3 seconds. ReadTimeout time.Duration `json:"readTimeout,omitempty" yaml:"read_timeout" env:"ASYNQ_REDIS_READ_TIMEOUT"` // Timeout for socket writes. // If timeout is reached, write commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is ReadTimout. WriteTimeout time.Duration `json:"writeTimeout,omitempty" yaml:"write_timeout" env:"ASYNQ_REDIS_WRITE_TIMEOUT"` // Maximum number of socket connections. // Default is 10 connections per every CPU as reported by runtime.NumCPU. PoolSize int `json:"poolSize,omitempty" yaml:"pool_size" env:"ASYNQ_REDIS_POOL_SIZE"` // TLS Config used to connect to a server. // TLS will be negotiated only if this field is set. TLSConfig *tls.Config `json:"tlsConfig,omitempty" yaml:"tls_config"` } func (a *AsynqSetting) Config() asynq.Config { return asynq.Config{ Concurrency: a.Concurrency, Queues: a.Queues, ErrorHandler: asynq.ErrorHandlerFunc(asynqError), LogLevel: asynq.LogLevel(a.LogLevel), } } func (setting AsynqRedisSetting) RedisClientOpt() asynq.RedisClientOpt { return asynq.RedisClientOpt{ Network: setting.Network, Addr: setting.Addr, Username: setting.Username, Password: setting.Password, DB: setting.DB, DialTimeout: setting.DialTimeout, ReadTimeout: setting.ReadTimeout, WriteTimeout: setting.WriteTimeout, PoolSize: setting.PoolSize, TLSConfig: setting.TLSConfig, } }