graceful_do.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package svcutil
  2. import (
  3. "context"
  4. "net/http"
  5. "os"
  6. "os/signal"
  7. "sync"
  8. "syscall"
  9. "time"
  10. "kpt-grpc-demo/util/healthcheck"
  11. "kpt-grpc-demo/util/waitutil"
  12. log "kpt-grpc-demo/util/logger"
  13. )
  14. // StandBy graceful doUtilStop func and with HTTP health check at addr
  15. // it will block and stop when function is finished
  16. func StandBy(addr string, f func()) {
  17. stop := WaitSignals()
  18. grace := &GracefulDo{}
  19. done := grace.Do(addr, f)
  20. for {
  21. select {
  22. case <-done:
  23. return
  24. case <-stop:
  25. return
  26. }
  27. }
  28. }
  29. // NeverStop graceful doUtilStop func and with HTTP health check at addr
  30. // it will block and never stop
  31. func NeverStop(addr string, f func()) {
  32. // wait for signal
  33. stop := WaitSignals()
  34. grace := &GracefulDo{}
  35. grace.DoUtilStop(addr, stop, f)
  36. }
  37. type GracefulDo struct {
  38. once sync.Once
  39. }
  40. func (g *GracefulDo) Do(addr string, f func()) <-chan struct{} {
  41. stop := make(chan struct{})
  42. // start health at once
  43. go g.withHealthCheck(addr, stop)
  44. go func() {
  45. defer close(stop)
  46. func() {
  47. defer waitutil.HandleCrash()
  48. f()
  49. }()
  50. }()
  51. return stop
  52. }
  53. func (g *GracefulDo) DoUtilStop(addr string, stop <-chan struct{}, f func()) {
  54. // start health at once
  55. go g.withHealthCheck(addr, stop)
  56. select {
  57. case <-stop:
  58. return
  59. default:
  60. }
  61. func() {
  62. defer waitutil.HandleCrash()
  63. f()
  64. }()
  65. // NOTE: b/c there is no priority selection in golang
  66. // it is possible for this to race, meaning we could
  67. // trigger t.C and stopCh, and t.C select falls through.
  68. // In order to mitigate we re-check stopCh at the beginning
  69. // of every loop to prevent extra executions of f().
  70. <-stop
  71. }
  72. func (g *GracefulDo) withHealthCheck(addr string, stop <-chan struct{}) {
  73. g.once.Do(func() {
  74. HTTPHealthCheck(addr, stop)
  75. })
  76. }
  77. // WaitFor 简单版本(并不优雅) 处理退出, 需要相关处理函数 f 能够阻塞执行
  78. func WaitFor(addr string, f func(stop <-chan struct{}) error) {
  79. stop := WaitSignals()
  80. quit := make(chan struct{})
  81. go func() {
  82. HTTPHealthCheck(addr, quit)
  83. }()
  84. if err := f(stop); err != nil {
  85. log.Errorf("%+v", err)
  86. }
  87. quit <- struct{}{}
  88. }
  89. // HTTPHealthCheck HTTP 模式健康检查,会阻塞执行
  90. func HTTPHealthCheck(addr string, stop <-chan struct{}) {
  91. server := &http.Server{Addr: addr, Handler: healthcheck.NewHandler()}
  92. go func() {
  93. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  94. log.Errorf("[BgHealthCheck] health server close with err: %+v", err)
  95. }
  96. }()
  97. <-stop
  98. server.SetKeepAlivesEnabled(false)
  99. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  100. defer cancel()
  101. if err := server.Shutdown(ctx); err != nil {
  102. log.Errorf("[BgHealthCheck] stop server graceful stop with err: %+v", err)
  103. }
  104. }
  105. // WaitSignals 监听退出信号
  106. func WaitSignals() chan struct{} {
  107. stop := make(chan struct{})
  108. quit := make(chan os.Signal)
  109. signal.Notify(quit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  110. signal.Notify(quit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  111. go func() {
  112. <-quit
  113. close(stop)
  114. }()
  115. return stop
  116. }