discovery_client.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package discovery
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "log"
  8. "net/http"
  9. "strconv"
  10. "time"
  11. )
  12. // 服务实例结构体
  13. type InstanceInfo struct {
  14. ID string `json:"ID"` // 服务实例ID
  15. Service string `json:"Service,omitempty"` // 服务发现时返回的服务名
  16. Name string `json:"Name"` // 服务名
  17. Tags []string `json:"Tags,omitempty"` // 标签,可用于进行服务过滤
  18. Address string `json:"Address"` // 服务实例HOST
  19. Port int `json:"Port"` // 服务实例端口
  20. Meta map[string]string `json:"Meta,omitempty"` // 元数据
  21. EnableTagOverride bool `json:"EnableTagOverride"` // 是否允许标签覆盖
  22. Check `json:"Check,omitempty"` // 健康检查相关配置
  23. Weights `json:"Weights,omitempty"` // 权重
  24. }
  25. type Check struct {
  26. DeregisterCriticalServiceAfter string `json:"DeregisterCriticalServiceAfter"` // 多久之后注销服务
  27. Args []string `json:"Args,omitempty"` // 请求参数
  28. HTTP string `json:"HTTP"` // 健康检查地址
  29. Interval string `json:"Interval,omitempty"` // Consul 主动检查间隔
  30. TTL string `json:"TTL,omitempty"` // 服务实例主动维持心跳间隔,与Interval只存其一
  31. }
  32. type Weights struct {
  33. Passing int `json:"Passing"`
  34. Warning int `json:"Warning"`
  35. }
  36. type DiscoveryClient struct {
  37. host string // Consul 的 Host
  38. port int // Consul 的 端口
  39. }
  40. func NewDiscoveryClient(host string, port int) *DiscoveryClient {
  41. return &DiscoveryClient{
  42. host: host,
  43. port: port,
  44. }
  45. }
  46. func (consulClient *DiscoveryClient) Register(ctx context.Context, serviceName, instanceId, healthCheckUrl string, instanceHost string, instancePort int, meta map[string]string, weights *Weights) error {
  47. instanceInfo := &InstanceInfo{
  48. ID: instanceId,
  49. Name: serviceName,
  50. Address: instanceHost,
  51. Port: instancePort,
  52. Meta: meta,
  53. EnableTagOverride: false,
  54. Check: Check{
  55. DeregisterCriticalServiceAfter: "30s",
  56. HTTP: "http://" + instanceHost + ":" + strconv.Itoa(instancePort) + healthCheckUrl,
  57. Interval: "15s",
  58. },
  59. }
  60. if weights != nil {
  61. instanceInfo.Weights = *weights
  62. } else {
  63. instanceInfo.Weights = Weights{
  64. Passing: 10,
  65. Warning: 1,
  66. }
  67. }
  68. byteData, err := json.Marshal(instanceInfo)
  69. if err != nil {
  70. log.Printf("json format err: %s", err)
  71. return err
  72. }
  73. req, err := http.NewRequest("PUT",
  74. "http://"+consulClient.host+":"+strconv.Itoa(consulClient.port)+"/v1/agent/service/register",
  75. bytes.NewReader(byteData))
  76. if err != nil {
  77. return err
  78. }
  79. req.Header.Set("Content-Type", "application/json;charset=UTF-8")
  80. client := http.Client{}
  81. client.Timeout = time.Second * 2
  82. resp, err := client.Do(req)
  83. if err != nil {
  84. log.Printf("register service err : %s", err)
  85. return err
  86. }
  87. defer resp.Body.Close()
  88. if resp.StatusCode != 200 {
  89. log.Printf("register service http request errCode : %v", resp.StatusCode)
  90. return fmt.Errorf("register service http request errCode : %v", resp.StatusCode)
  91. }
  92. log.Println("register service success")
  93. return nil
  94. }
  95. func (consulClient *DiscoveryClient) Deregister(ctx context.Context, instanceId string) error {
  96. req, err := http.NewRequest("PUT",
  97. "http://"+consulClient.host+":"+strconv.Itoa(consulClient.port)+"/v1/agent/service/deregister/"+instanceId, nil)
  98. if err != nil {
  99. log.Printf("req format err: %s", err)
  100. return err
  101. }
  102. client := http.Client{}
  103. client.Timeout = time.Second * 2
  104. resp, err := client.Do(req)
  105. if err != nil {
  106. log.Printf("deregister service err : %s", err)
  107. return err
  108. }
  109. resp.Body.Close()
  110. if resp.StatusCode != 200 {
  111. log.Printf("deresigister service http request errCode : %v", resp.StatusCode)
  112. return fmt.Errorf("deresigister service http request errCode : %v", resp.StatusCode)
  113. }
  114. log.Println("deregister service success")
  115. return nil
  116. }
  117. func (consulClient *DiscoveryClient) DiscoverServices(ctx context.Context, serviceName string) ([]*InstanceInfo, error) {
  118. req, err := http.NewRequest("GET",
  119. "http://"+consulClient.host+":"+strconv.Itoa(consulClient.port)+"/v1/health/service/"+serviceName, nil)
  120. if err != nil {
  121. log.Printf("req format err: %s", err)
  122. return nil, err
  123. }
  124. client := http.Client{}
  125. client.Timeout = time.Second * 2
  126. resp, err := client.Do(req)
  127. if err != nil {
  128. log.Printf("discover service err : %s", err)
  129. return nil, err
  130. }
  131. defer resp.Body.Close()
  132. if resp.StatusCode != 200 {
  133. log.Printf("discover service http request errCode : %v", resp.StatusCode)
  134. return nil, fmt.Errorf("discover service http request errCode : %v", resp.StatusCode)
  135. }
  136. var serviceList []struct {
  137. Service InstanceInfo `json:"Service"`
  138. }
  139. err = json.NewDecoder(resp.Body).Decode(&serviceList)
  140. if err != nil {
  141. log.Printf("format service info err : %s", err)
  142. return nil, err
  143. }
  144. instances := make([]*InstanceInfo, len(serviceList))
  145. for i := 0; i < len(instances); i++ {
  146. instances[i] = &serviceList[i].Service
  147. }
  148. return instances, nil
  149. }