entry.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. // Package cleanup provides a single point for registering clean up functions.
  2. // This is similar to Finalizer, except that the clean up functions are
  3. // guaranteed to be called if the process terminates normally.
  4. //
  5. // Usage:
  6. //
  7. // In my_package.go
  8. //
  9. // cleanup.Register(func(){
  10. // // Arbitrary clean up function, most likely close goroutine, etc.
  11. // })
  12. //
  13. // In main.go
  14. //
  15. // func main() {
  16. // flag.Parse()
  17. // defer cleanup.Run()
  18. // }
  19. package cleanup
  20. import (
  21. "context"
  22. "reflect"
  23. "sync"
  24. "kpt-event/util/logger"
  25. "kpt-event/util/xerr"
  26. )
  27. // entry global cleanup entry
  28. var entry Entry
  29. // Register adds a function to the global cleanup queue.
  30. func Register(container interface{}) {
  31. entry.Register(container)
  32. }
  33. // Run runs all the global cleanup functions registered.
  34. func Run() {
  35. entry.Run()
  36. }
  37. type Entry struct {
  38. mu sync.Mutex
  39. fns []func()
  40. once sync.Once
  41. }
  42. // Run runs all the cleanup functions registered.
  43. func (entry *Entry) Run() {
  44. log.Infof("cleanup: performing %d cleanups", len(entry.fns))
  45. entry.once.Do(func() {
  46. for _, f := range entry.fns {
  47. f()
  48. }
  49. })
  50. log.Infof("cleanup: all done")
  51. }
  52. // Register adds a function to the cleanup queue.
  53. func (entry *Entry) Register(container interface{}) {
  54. v := reflect.Indirect(reflect.ValueOf(container))
  55. var err error
  56. switch v.Kind() {
  57. case reflect.Func:
  58. err = entry.RegisterFunc(v.Interface())
  59. case reflect.Struct:
  60. err = entry.RegisterStruct(v.Interface())
  61. default:
  62. panic("cleanup: unsupported type")
  63. }
  64. if err != nil {
  65. panic(err)
  66. }
  67. }
  68. func (entry *Entry) RegisterStruct(ctor interface{}) error {
  69. cValue := reflect.Indirect(reflect.ValueOf(ctor))
  70. if cValue.Kind() != reflect.Struct {
  71. return xerr.New("RegisterStruct receive a struct or ptr to struct")
  72. }
  73. for i := 0; i < cValue.NumField(); i++ {
  74. field := cValue.Field(i)
  75. if field.IsZero() {
  76. continue
  77. }
  78. method := field.MethodByName("Close")
  79. if !method.IsValid() {
  80. method = field.MethodByName("Flush")
  81. }
  82. if method.IsValid() {
  83. if err := entry.RegisterFunc(method.Interface()); err != nil {
  84. log.WithError(err).WithField("fieldName", field.Type().Name()).Error("register func failed")
  85. }
  86. }
  87. }
  88. return nil
  89. }
  90. // RegisterFunc receive func() or func() error
  91. func (entry *Entry) RegisterFunc(fn interface{}) error {
  92. fType := reflect.TypeOf(fn)
  93. if fType.Kind() != reflect.Func {
  94. return xerr.New("cleanup: unsupported type")
  95. }
  96. if fType.NumIn() > 0 {
  97. return xerr.New("RegisterFunc receive func() or func() error")
  98. }
  99. if f, ok := fn.(func()); ok {
  100. entry.register(f)
  101. return nil
  102. }
  103. if f, ok := fn.(func() error); ok {
  104. entry.register(func() {
  105. if err := f(); err != nil {
  106. _ = xerr.ReportError(context.Background(), err)
  107. }
  108. })
  109. return nil
  110. }
  111. return xerr.New("RegisterFunc receive func() or func() error")
  112. }
  113. func (entry *Entry) register(fns ...func()) {
  114. entry.mu.Lock()
  115. defer entry.mu.Unlock()
  116. for _, fn := range fns {
  117. entry.fns = append(entry.fns, fn)
  118. }
  119. }