service.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/afex/hystrix-go/hystrix"
  8. "github.com/longjoy/micro-go-course/section28/goods/common"
  9. "github.com/longjoy/micro-go-course/section28/goods/pkg/discovery"
  10. "github.com/longjoy/micro-go-course/section28/goods/pkg/loadbalancer"
  11. "go.etcd.io/etcd/clientv3"
  12. "io/ioutil"
  13. "log"
  14. "net/http"
  15. "net/url"
  16. "strconv"
  17. "time"
  18. )
  19. type GoodsDetailVO struct {
  20. Id string
  21. Name string
  22. Comments common.CommentListVO
  23. }
  24. type Service interface {
  25. GetGoodsDetail(ctx context.Context, id string) (GoodsDetailVO, error)
  26. HealthCheck() string
  27. }
  28. func NewGoodsServiceImpl(discoveryClient *discovery.DiscoveryClient, loadbalancer loadbalancer.LoadBalancer) Service {
  29. return &GoodsDetailServiceImpl{
  30. discoveryClient: discoveryClient,
  31. loadbalancer: loadbalancer,
  32. }
  33. }
  34. type GoodsDetailServiceImpl struct {
  35. discoveryClient *discovery.DiscoveryClient
  36. loadbalancer loadbalancer.LoadBalancer
  37. callCommentService int
  38. }
  39. func (service *GoodsDetailServiceImpl) GetGoodsDetail(ctx context.Context, id string) (GoodsDetailVO, error) {
  40. detail := GoodsDetailVO{Id: id, Name: "Name"}
  41. commentResult, _ := service.GetGoodsComments(ctx, id)
  42. detail.Comments = commentResult.Detail
  43. return detail, nil
  44. }
  45. var ErrNotServiceInstances = errors.New("instances are not existed")
  46. var ErrLoadBalancer = errors.New("loadbalancer select instance error")
  47. func (service *GoodsDetailServiceImpl) DiscoveryService(ctx context.Context, serviceName string) ([]*discovery.InstanceInfo, error) {
  48. instances, err := service.discoveryClient.DiscoverServices(ctx, serviceName)
  49. if err != nil {
  50. log.Printf("get service info err: %s", err)
  51. }
  52. if instances == nil || len(instances) == 0 {
  53. return nil, ErrNotServiceInstances
  54. }
  55. return instances, nil
  56. }
  57. func (service *GoodsDetailServiceImpl) GetGoodsComments(ctx context.Context, id string) (common.CommentResult, error) {
  58. var result common.CommentResult
  59. serviceName := "comment"
  60. instances, err := service.discoveryClient.DiscoverServices(ctx, serviceName)
  61. if err != nil {
  62. log.Printf("get service info err: %s", err)
  63. }
  64. if instances == nil || len(instances) == 0 {
  65. log.Printf("no instance")
  66. return result, ErrNotServiceInstances
  67. }
  68. selectedInstance, err2 := service.loadbalancer.SelectService(instances)
  69. log.Print("select instance info :" + selectedInstance.Address + ":" + strconv.Itoa(selectedInstance.Port))
  70. if err2 != nil {
  71. log.Printf("loadbalancer get selected instance err: %s", err2)
  72. return result, ErrLoadBalancer
  73. }
  74. call_err := hystrix.Do(serviceName, func() error {
  75. requestUrl := url.URL{
  76. Scheme: "http",
  77. Host: selectedInstance.Address + ":" + strconv.Itoa(selectedInstance.Port),
  78. Path: "/comments/detail",
  79. RawQuery: "id=" + id,
  80. }
  81. resp, err := http.Get(requestUrl.String())
  82. if err != nil {
  83. return err
  84. }
  85. body, _ := ioutil.ReadAll(resp.Body)
  86. jsonErr := json.Unmarshal(body, &result)
  87. if jsonErr != nil {
  88. return jsonErr
  89. }
  90. return nil
  91. }, func(e error) error {
  92. // 断路器打开时的处理逻辑,本示例是直接返回错误提示
  93. return errors.New("Http errors!")
  94. })
  95. if call_err == nil {
  96. return result, nil
  97. } else {
  98. return result, call_err
  99. }
  100. }
  101. func (service *GoodsDetailServiceImpl) InitConfig(ctx context.Context) {
  102. cli, _ := clientv3.New(clientv3.Config{
  103. Endpoints: []string{"127.0.0.1:2379"},
  104. DialTimeout: 5 * time.Second,
  105. })
  106. // get
  107. resp, _ := cli.Get(ctx, "call_service_d")
  108. for _, ev := range resp.Kvs {
  109. fmt.Printf("%s:%s\n", ev.Key, ev.Value)
  110. if string(ev.Key) == "call_service_d" {
  111. service.callCommentService, _ = strconv.Atoi(string(ev.Value))
  112. }
  113. }
  114. rch := cli.Watch(context.Background(), "call_service_d") // <-chan WatchResponse
  115. for wresp := range rch {
  116. for _, ev := range wresp.Events {
  117. fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
  118. if string(ev.Kv.Key) == "call_service_d" {
  119. service.callCommentService, _ = strconv.Atoi(string(ev.Kv.Value))
  120. }
  121. }
  122. }
  123. }
  124. func (service *GoodsDetailServiceImpl) HealthCheck() string {
  125. return "OK"
  126. }