discovery_client.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package discovery
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "log"
  8. "net/http"
  9. "strconv"
  10. "time"
  11. )
  12. // 服务实例结构体
  13. type InstanceInfo struct {
  14. ID string `json:"ID"` // 服务实例ID
  15. Service string `json:"Service,omitempty"` // 服务发现时返回的服务名
  16. Name string `json:"Name"` // 服务名
  17. Tags []string `json:"Tags,omitempty"` // 标签,可用于进行服务过滤
  18. Address string `json:"Address"` // 服务实例HOST
  19. Port int `json:"Port"` // 服务实例端口
  20. Meta map[string]string `json:"Meta,omitempty"` // 元数据
  21. EnableTagOverride bool `json:"EnableTagOverride"` // 是否允许标签覆盖
  22. Check `json:"Check,omitempty"` // 健康检查相关配置
  23. Weights `json:"Weights,omitempty"` // 权重
  24. CurWeight int `json:"CurWeights,omitempty"` // 权重
  25. }
  26. type Check struct {
  27. DeregisterCriticalServiceAfter string `json:"DeregisterCriticalServiceAfter"` // 多久之后注销服务
  28. Args []string `json:"Args,omitempty"` // 请求参数
  29. HTTP string `json:"HTTP"` // 健康检查地址
  30. Interval string `json:"Interval,omitempty"` // Consul 主动检查间隔
  31. TTL string `json:"TTL,omitempty"` // 服务实例主动维持心跳间隔,与Interval只存其一
  32. }
  33. type Weights struct {
  34. Passing int `json:"Passing"`
  35. Warning int `json:"Warning"`
  36. }
  37. type DiscoveryClient struct {
  38. host string // Consul 的 Host
  39. port int // Consul 的 端口
  40. }
  41. func NewDiscoveryClient(host string, port int) *DiscoveryClient {
  42. return &DiscoveryClient{
  43. host: host,
  44. port: port,
  45. }
  46. }
  47. func (consulClient *DiscoveryClient) Register(ctx context.Context, serviceName, instanceId, healthCheckUrl string, instanceHost string, instancePort int, meta map[string]string, weights *Weights) error {
  48. instanceInfo := &InstanceInfo{
  49. ID: instanceId,
  50. Name: serviceName,
  51. Address: instanceHost,
  52. Port: instancePort,
  53. Meta: meta,
  54. EnableTagOverride: false,
  55. Check: Check{
  56. DeregisterCriticalServiceAfter: "30s",
  57. HTTP: "http://" + instanceHost + ":" + strconv.Itoa(instancePort) + healthCheckUrl,
  58. Interval: "15s",
  59. },
  60. }
  61. if weights != nil {
  62. instanceInfo.Weights = *weights
  63. } else {
  64. instanceInfo.Weights = Weights{
  65. Passing: 10,
  66. Warning: 1,
  67. }
  68. }
  69. byteData, err := json.Marshal(instanceInfo)
  70. if err != nil {
  71. log.Printf("json format err: %s", err)
  72. return err
  73. }
  74. req, err := http.NewRequest("PUT",
  75. "http://"+consulClient.host+":"+strconv.Itoa(consulClient.port)+"/v1/agent/service/register",
  76. bytes.NewReader(byteData))
  77. if err != nil {
  78. return err
  79. }
  80. req.Header.Set("Content-Type", "application/json;charset=UTF-8")
  81. client := http.Client{}
  82. client.Timeout = time.Second * 2
  83. resp, err := client.Do(req)
  84. if err != nil {
  85. log.Printf("register service err : %s", err)
  86. return err
  87. }
  88. defer resp.Body.Close()
  89. if resp.StatusCode != 200 {
  90. log.Printf("register service http request errCode : %v", resp.StatusCode)
  91. return fmt.Errorf("register service http request errCode : %v", resp.StatusCode)
  92. }
  93. log.Println("register service success")
  94. return nil
  95. }
  96. func (consulClient *DiscoveryClient) Deregister(ctx context.Context, instanceId string) error {
  97. req, err := http.NewRequest("PUT",
  98. "http://"+consulClient.host+":"+strconv.Itoa(consulClient.port)+"/v1/agent/service/deregister/"+instanceId, nil)
  99. if err != nil {
  100. log.Printf("req format err: %s", err)
  101. return err
  102. }
  103. client := http.Client{}
  104. client.Timeout = time.Second * 2
  105. resp, err := client.Do(req)
  106. if err != nil {
  107. log.Printf("deregister service err : %s", err)
  108. return err
  109. }
  110. resp.Body.Close()
  111. if resp.StatusCode != 200 {
  112. log.Printf("deresigister service http request errCode : %v", resp.StatusCode)
  113. return fmt.Errorf("deresigister service http request errCode : %v", resp.StatusCode)
  114. }
  115. log.Println("deregister service success")
  116. return nil
  117. }
  118. func (consulClient *DiscoveryClient) DiscoverServices(ctx context.Context, serviceName string) ([]*InstanceInfo, error) {
  119. req, err := http.NewRequest("GET",
  120. "http://"+consulClient.host+":"+strconv.Itoa(consulClient.port)+"/v1/health/service/"+serviceName, nil)
  121. if err != nil {
  122. log.Printf("req format err: %s", err)
  123. return nil, err
  124. }
  125. client := http.Client{}
  126. client.Timeout = time.Second * 2
  127. resp, err := client.Do(req)
  128. if err != nil {
  129. log.Printf("discover service err : %s", err)
  130. return nil, err
  131. }
  132. defer resp.Body.Close()
  133. if resp.StatusCode != 200 {
  134. log.Printf("discover service http request errCode : %v", resp.StatusCode)
  135. return nil, fmt.Errorf("discover service http request errCode : %v", resp.StatusCode)
  136. }
  137. var serviceList []struct {
  138. Service InstanceInfo `json:"Service"`
  139. }
  140. err = json.NewDecoder(resp.Body).Decode(&serviceList)
  141. if err != nil {
  142. log.Printf("format service info err : %s", err)
  143. return nil, err
  144. }
  145. instances := make([]*InstanceInfo, len(serviceList))
  146. for i := 0; i < len(instances); i++ {
  147. instances[i] = &serviceList[i].Service
  148. }
  149. return instances, nil
  150. }