client_conn.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package grpcutil
  2. import (
  3. "context"
  4. "time"
  5. "kpt-tmr-group/pkg/xerr"
  6. grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
  7. grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
  8. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/codes"
  11. )
  12. func DialContext(ctx context.Context, target string, options ...grpc.DialOption) (*grpc.ClientConn, error) {
  13. if len(options) == 0 {
  14. options = DefaultDialOptions()
  15. }
  16. conn, err := grpc.DialContext(ctx, target, options...)
  17. if err != nil {
  18. return nil, xerr.WithStack(err)
  19. }
  20. return conn, err
  21. }
  22. func DefaultDialOptions() []grpc.DialOption {
  23. options := DefaultAsyncDialOptions()
  24. return append(options, grpc.WithBlock())
  25. }
  26. func DefaultAsyncDialOptions() []grpc.DialOption {
  27. unaryInterceptors := []grpc.UnaryClientInterceptor{
  28. grpc_retry.UnaryClientInterceptor(
  29. grpc_retry.WithMax(3),
  30. grpc_retry.WithCodes(codes.Aborted, codes.DeadlineExceeded),
  31. grpc_retry.WithPerRetryTimeout(time.Millisecond*500),
  32. ),
  33. grpc_prometheus.UnaryClientInterceptor,
  34. }
  35. streamInterceptors := []grpc.StreamClientInterceptor{
  36. grpc_retry.StreamClientInterceptor(
  37. grpc_retry.WithMax(3),
  38. grpc_retry.WithCodes(codes.Aborted, codes.DeadlineExceeded),
  39. grpc_retry.WithPerRetryTimeout(time.Millisecond*500),
  40. ),
  41. grpc_prometheus.StreamClientInterceptor,
  42. }
  43. return []grpc.DialOption{
  44. grpc.WithInsecure(),
  45. grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(unaryInterceptors...)),
  46. grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(streamInterceptors...)),
  47. }
  48. }