123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package asynqsvc
- import (
- "crypto/tls"
- "kpt-pasture/config"
- "time"
- "gitee.com/xuyiping_admin/pkg/cputil"
- "github.com/hibiken/asynq"
- )
- type AsynqSetting struct {
- Redis AsynqRedisSetting `json:"redis" yaml:"redis"`
- Queues map[string]int `json:"queues,omitempty" yaml:"queues"`
- Concurrency int `json:"concurrency,omitempty" yaml:"concurrency"`
- LogLevel int32 `json:"log_level,omitempty" yaml:"log_level"`
- }
- func NewAsynqSetting(setting *config.AsynqSetting) *AsynqSetting {
- var curr AsynqSetting
- if err := cputil.DeepCopy(&curr, setting); err != nil {
- panic(err)
- }
- return &curr
- }
- type AsynqRedisSetting struct {
- // Network type to use, either tcp or unix.
- // Default is tcp.
- Network string `json:"network,omitempty" yaml:"network" env:"ASYNQ_REDIS_NETWORK"`
- // Redis server address in "host:port" format.
- Addr string `json:"addr,omitempty" yaml:"addr" env:"ASYNQ_REDIS_ADDR"`
- // Username to authenticate the current connection when Redis ACLs are used.
- // See: https://redis.io/commands/auth.
- Username string `json:"username,omitempty" yaml:"username" env:"ASYNQ_REDIS_USERNAME"`
- // Password to authenticate the current connection.
- // See: https://redis.io/commands/auth.
- Password string `json:"password,omitempty" yaml:"password" env:"ASYNQ_REDIS_PASSWORD"`
- // Redis DB to select after connecting to a server.
- // See: https://redis.io/commands/select.
- DB int `json:"db,omitempty" yaml:"db" env:"ASYNQ_REDIS_DB"`
- // Dial timeout for establishing new connections.
- // Default is 5 seconds.
- DialTimeout time.Duration `json:"dialTimeout,omitempty" yaml:"dial_timeout" env:"ASYNQ_REDIS_DIAL_TIMEOUT"`
- // Timeout for socket reads.
- // If timeout is reached, read commands will fail with a timeout error
- // instead of blocking.
- //
- // Use value -1 for no timeout and 0 for default.
- // Default is 3 seconds.
- ReadTimeout time.Duration `json:"readTimeout,omitempty" yaml:"read_timeout" env:"ASYNQ_REDIS_READ_TIMEOUT"`
- // Timeout for socket writes.
- // If timeout is reached, write commands will fail with a timeout error
- // instead of blocking.
- //
- // Use value -1 for no timeout and 0 for default.
- // Default is ReadTimout.
- WriteTimeout time.Duration `json:"writeTimeout,omitempty" yaml:"write_timeout" env:"ASYNQ_REDIS_WRITE_TIMEOUT"`
- // Maximum number of socket connections.
- // Default is 10 connections per every CPU as reported by runtime.NumCPU.
- PoolSize int `json:"poolSize,omitempty" yaml:"pool_size" env:"ASYNQ_REDIS_POOL_SIZE"`
- // TLS Config used to connect to a server.
- // TLS will be negotiated only if this field is set.
- TLSConfig *tls.Config `json:"tlsConfig,omitempty" yaml:"tls_config"`
- }
- func (a *AsynqSetting) Config() asynq.Config {
- return asynq.Config{
- Concurrency: a.Concurrency,
- Queues: a.Queues,
- ErrorHandler: asynq.ErrorHandlerFunc(asynqError),
- LogLevel: asynq.LogLevel(a.LogLevel),
- }
- }
- func (setting AsynqRedisSetting) RedisClientOpt() asynq.RedisClientOpt {
- return asynq.RedisClientOpt{
- Network: setting.Network,
- Addr: setting.Addr,
- Username: setting.Username,
- Password: setting.Password,
- DB: setting.DB,
- DialTimeout: setting.DialTimeout,
- ReadTimeout: setting.ReadTimeout,
- WriteTimeout: setting.WriteTimeout,
- PoolSize: setting.PoolSize,
- TLSConfig: setting.TLSConfig,
- }
- }
|