servlet.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. package server
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "net/http"
  7. "os"
  8. "os/signal"
  9. "syscall"
  10. "time"
  11. "kpt-grpc-demo/util/grpc/interceptor/grpcsentry"
  12. "kpt-grpc-demo/util/healthcheck"
  13. grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
  14. grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
  15. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  16. "github.com/prometheus/client_golang/prometheus/promhttp"
  17. "go.uber.org/zap"
  18. "go.uber.org/zap/zapcore"
  19. "google.golang.org/grpc"
  20. "google.golang.org/grpc/health"
  21. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  22. )
  23. type Config struct {
  24. ServerEnv string `json:"server_env"`
  25. ServerName string `json:"server_name"`
  26. HealthCheckAddr string `json:"health_check_addr"`
  27. // Kubernetes readinessProbe checks GET /health and after (failureThreshold * periodSecond) it stops redirecting traffic to the app (because it continuously returns 500)
  28. SignalTermWait time.Duration `json:"signal_term_wait"`
  29. ServerHost string `json:"server_host"`
  30. MetricsHost string `json:"metrics_host"`
  31. SentryDSN string `json:"sentry_dsn"`
  32. }
  33. type Servlet struct {
  34. server *grpc.Server
  35. httpHealth *http.Server
  36. grpcHealth *health.Server
  37. cfg *Config
  38. log *zap.Logger
  39. }
  40. // Server return grpc.Server for register
  41. func (s *Servlet) Server() *grpc.Server {
  42. return s.server
  43. }
  44. // New gRPC Servlet
  45. func New(cfg *Config, opts ...grpc.ServerOption) *Servlet {
  46. lg, err := zap.NewProduction()
  47. if err != nil {
  48. log.Fatal(err)
  49. }
  50. if cfg.SentryDSN != "" {
  51. // sentryhub.MustInit(cfg.SentryDSN)
  52. }
  53. if cfg.SignalTermWait == 0 {
  54. cfg.SignalTermWait = 10 * time.Second
  55. }
  56. server := newGRPCServer(lg, opts...)
  57. servlet := &Servlet{
  58. cfg: cfg,
  59. log: lg,
  60. server: server,
  61. }
  62. return servlet
  63. }
  64. func (s *Servlet) SetServer(server *grpc.Server) {
  65. s.server = server
  66. }
  67. func newGRPCServer(l *zap.Logger, opts ...grpc.ServerOption) *grpc.Server {
  68. zapOptions := []grpc_zap.Option{
  69. grpc_zap.WithDurationField(func(duration time.Duration) zapcore.Field {
  70. return zap.Int64("grpc.time_ns", duration.Nanoseconds())
  71. }),
  72. }
  73. defaultOpts := []grpc.ServerOption{
  74. grpc.ChainStreamInterceptor(
  75. grpc_ctxtags.StreamServerInterceptor(),
  76. grpc_zap.StreamServerInterceptor(l, zapOptions...),
  77. grpc_prometheus.StreamServerInterceptor,
  78. grpcsentry.WithStreamServerHandler(),
  79. ),
  80. grpc.ChainUnaryInterceptor(
  81. grpc_ctxtags.UnaryServerInterceptor(),
  82. grpc_zap.UnaryServerInterceptor(l, zapOptions...),
  83. grpc_prometheus.UnaryServerInterceptor,
  84. grpcsentry.WithUnaryServerHandler(),
  85. ),
  86. }
  87. var serOpts []grpc.ServerOption
  88. if opts != nil {
  89. serOpts = append(defaultOpts, opts...)
  90. } else {
  91. serOpts = defaultOpts
  92. }
  93. return grpc.NewServer(serOpts...)
  94. }
  95. // Run start run service
  96. func (s *Servlet) Run() {
  97. s.startPrometheus()
  98. s.startServe()
  99. s.startHealthMonitor()
  100. s.waitSignal()
  101. }
  102. // Close servlet
  103. func (s *Servlet) Close() error {
  104. s.server.GracefulStop()
  105. return s.log.Sync()
  106. }
  107. func (s *Servlet) startServe() {
  108. s.log.Info(fmt.Sprintf("gRPC listen %s, running in %s mode", s.cfg.ServerHost, s.cfg.ServerEnv))
  109. listen, err := net.Listen("tcp", s.cfg.ServerHost)
  110. if err != nil {
  111. s.log.Fatal(fmt.Sprintf("failed to listen: %v", err))
  112. }
  113. go func() {
  114. if err := s.server.Serve(listen); err != nil {
  115. s.log.Fatal(fmt.Sprintf("grpc server listen err: %s\n", err))
  116. }
  117. }()
  118. }
  119. func (s *Servlet) startPrometheus() {
  120. grpc_prometheus.Register(s.server)
  121. grpc_prometheus.EnableHandlingTimeHistogram()
  122. http.Handle("/metrics", promhttp.Handler())
  123. s.log.Info(fmt.Sprintf("gRPC Prometheus listen on %s", s.cfg.MetricsHost))
  124. go func() {
  125. if err := http.ListenAndServe(s.cfg.MetricsHost, nil); err != nil {
  126. s.log.Fatal(fmt.Sprintf("Failed to start prometheus server: %s", err))
  127. }
  128. }()
  129. }
  130. func (s *Servlet) startHealthMonitor() {
  131. hs := health.NewServer()
  132. hs.SetServingStatus(s.cfg.ServerName, healthpb.HealthCheckResponse_SERVING)
  133. healthpb.RegisterHealthServer(s.server, hs)
  134. s.grpcHealth = hs
  135. server := &http.Server{Addr: s.cfg.HealthCheckAddr, Handler: healthcheck.NewHandler()}
  136. go func() {
  137. if err := server.ListenAndServe(); err != nil {
  138. if err == http.ErrServerClosed {
  139. return
  140. }
  141. s.log.Error("start health server failed", zap.Error(err))
  142. }
  143. }()
  144. s.httpHealth = server
  145. s.log.Info("gRPC Health boot")
  146. }
  147. func (s *Servlet) waitSignal() {
  148. quit := make(chan os.Signal, 1)
  149. signal.Reset(os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
  150. signal.Notify(quit, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
  151. <-quit
  152. s.log.Info("GRPC server receive TERM signal")
  153. if err := s.httpHealth.Close(); err != nil {
  154. s.log.Error("health server shutdown", zap.Error(err))
  155. }
  156. s.grpcHealth.SetServingStatus(s.cfg.ServerName, healthpb.HealthCheckResponse_NOT_SERVING)
  157. time.Sleep(s.cfg.SignalTermWait)
  158. }