1package concurrent 2 3import ( 4 "context" 5 "fmt" 6 "runtime" 7 "runtime/debug" 8 "sync" 9 "time" 10 "reflect" 11) 12 13// HandlePanic logs goroutine panic by default 14var HandlePanic = func(recovered interface{}, funcName string) { 15 ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered)) 16 ErrorLogger.Println(string(debug.Stack())) 17} 18 19// UnboundedExecutor is a executor without limits on counts of alive goroutines 20// it tracks the goroutine started by it, and can cancel them when shutdown 21type UnboundedExecutor struct { 22 ctx context.Context 23 cancel context.CancelFunc 24 activeGoroutinesMutex *sync.Mutex 25 activeGoroutines map[string]int 26 HandlePanic func(recovered interface{}, funcName string) 27} 28 29// GlobalUnboundedExecutor has the life cycle of the program itself 30// any goroutine want to be shutdown before main exit can be started from this executor 31// GlobalUnboundedExecutor expects the main function to call stop 32// it does not magically knows the main function exits 33var GlobalUnboundedExecutor = NewUnboundedExecutor() 34 35// NewUnboundedExecutor creates a new UnboundedExecutor, 36// UnboundedExecutor can not be created by &UnboundedExecutor{} 37// HandlePanic can be set with a callback to override global HandlePanic 38func NewUnboundedExecutor() *UnboundedExecutor { 39 ctx, cancel := context.WithCancel(context.TODO()) 40 return &UnboundedExecutor{ 41 ctx: ctx, 42 cancel: cancel, 43 activeGoroutinesMutex: &sync.Mutex{}, 44 activeGoroutines: map[string]int{}, 45 } 46} 47 48// Go starts a new goroutine and tracks its lifecycle. 49// Panic will be recovered and logged automatically, except for StopSignal 50func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) { 51 pc := reflect.ValueOf(handler).Pointer() 52 f := runtime.FuncForPC(pc) 53 funcName := f.Name() 54 file, line := f.FileLine(pc) 55 executor.activeGoroutinesMutex.Lock() 56 defer executor.activeGoroutinesMutex.Unlock() 57 startFrom := fmt.Sprintf("%s:%d", file, line) 58 executor.activeGoroutines[startFrom] += 1 59 go func() { 60 defer func() { 61 recovered := recover() 62 // if you want to quit a goroutine without trigger HandlePanic 63 // use runtime.Goexit() to quit 64 if recovered != nil { 65 if executor.HandlePanic == nil { 66 HandlePanic(recovered, funcName) 67 } else { 68 executor.HandlePanic(recovered, funcName) 69 } 70 } 71 executor.activeGoroutinesMutex.Lock() 72 executor.activeGoroutines[startFrom] -= 1 73 executor.activeGoroutinesMutex.Unlock() 74 }() 75 handler(executor.ctx) 76 }() 77} 78 79// Stop cancel all goroutines started by this executor without wait 80func (executor *UnboundedExecutor) Stop() { 81 executor.cancel() 82} 83 84// StopAndWaitForever cancel all goroutines started by this executor and 85// wait until all goroutines exited 86func (executor *UnboundedExecutor) StopAndWaitForever() { 87 executor.StopAndWait(context.Background()) 88} 89 90// StopAndWait cancel all goroutines started by this executor and wait. 91// Wait can be cancelled by the context passed in. 92func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) { 93 executor.cancel() 94 for { 95 oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100) 96 select { 97 case <-oneHundredMilliseconds.C: 98 if executor.checkNoActiveGoroutines() { 99 return 100 } 101 case <-ctx.Done(): 102 return 103 } 104 } 105} 106 107func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool { 108 executor.activeGoroutinesMutex.Lock() 109 defer executor.activeGoroutinesMutex.Unlock() 110 for startFrom, count := range executor.activeGoroutines { 111 if count > 0 { 112 InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit", 113 "startFrom", startFrom, 114 "count", count) 115 return false 116 } 117 } 118 return true 119} 120