123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- package svcutil
- import (
- "context"
- "net/http"
- "os"
- "os/signal"
- "sync"
- "syscall"
- "time"
- "kpt-grpc-demo/util/healthcheck"
- "kpt-grpc-demo/util/waitutil"
- log "kpt-grpc-demo/util/logger"
- )
- // StandBy graceful doUtilStop func and with HTTP health check at addr
- // it will block and stop when function is finished
- func StandBy(addr string, f func()) {
- stop := WaitSignals()
- grace := &GracefulDo{}
- done := grace.Do(addr, f)
- for {
- select {
- case <-done:
- return
- case <-stop:
- return
- }
- }
- }
- // NeverStop graceful doUtilStop func and with HTTP health check at addr
- // it will block and never stop
- func NeverStop(addr string, f func()) {
- // wait for signal
- stop := WaitSignals()
- grace := &GracefulDo{}
- grace.DoUtilStop(addr, stop, f)
- }
- type GracefulDo struct {
- once sync.Once
- }
- func (g *GracefulDo) Do(addr string, f func()) <-chan struct{} {
- stop := make(chan struct{})
- // start health at once
- go g.withHealthCheck(addr, stop)
- go func() {
- defer close(stop)
- func() {
- defer waitutil.HandleCrash()
- f()
- }()
- }()
- return stop
- }
- func (g *GracefulDo) DoUtilStop(addr string, stop <-chan struct{}, f func()) {
- // start health at once
- go g.withHealthCheck(addr, stop)
- select {
- case <-stop:
- return
- default:
- }
- func() {
- defer waitutil.HandleCrash()
- f()
- }()
- // NOTE: b/c there is no priority selection in golang
- // it is possible for this to race, meaning we could
- // trigger t.C and stopCh, and t.C select falls through.
- // In order to mitigate we re-check stopCh at the beginning
- // of every loop to prevent extra executions of f().
- <-stop
- }
- func (g *GracefulDo) withHealthCheck(addr string, stop <-chan struct{}) {
- g.once.Do(func() {
- HTTPHealthCheck(addr, stop)
- })
- }
- // WaitFor 简单版本(并不优雅) 处理退出, 需要相关处理函数 f 能够阻塞执行
- func WaitFor(addr string, f func(stop <-chan struct{}) error) {
- stop := WaitSignals()
- quit := make(chan struct{})
- go func() {
- HTTPHealthCheck(addr, quit)
- }()
- if err := f(stop); err != nil {
- log.Errorf("%+v", err)
- }
- quit <- struct{}{}
- }
- // HTTPHealthCheck HTTP 模式健康检查,会阻塞执行
- func HTTPHealthCheck(addr string, stop <-chan struct{}) {
- server := &http.Server{Addr: addr, Handler: healthcheck.NewHandler()}
- go func() {
- if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
- log.Errorf("[BgHealthCheck] health server close with err: %+v", err)
- }
- }()
- <-stop
- server.SetKeepAlivesEnabled(false)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- if err := server.Shutdown(ctx); err != nil {
- log.Errorf("[BgHealthCheck] stop server graceful stop with err: %+v", err)
- }
- }
- // WaitSignals 监听退出信号
- func WaitSignals() chan struct{} {
- stop := make(chan struct{})
- quit := make(chan os.Signal)
- signal.Notify(quit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
- signal.Notify(quit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
- go func() {
- <-quit
- close(stop)
- }()
- return stop
- }
|