1package concurrent
2
3import (
4	"context"
5	"fmt"
6	"runtime"
7	"runtime/debug"
8	"sync"
9	"time"
10	"reflect"
11)
12
13// HandlePanic logs goroutine panic by default
14var HandlePanic = func(recovered interface{}, funcName string) {
15	ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
16	ErrorLogger.Println(string(debug.Stack()))
17}
18
19// UnboundedExecutor is a executor without limits on counts of alive goroutines
20// it tracks the goroutine started by it, and can cancel them when shutdown
21type UnboundedExecutor struct {
22	ctx                   context.Context
23	cancel                context.CancelFunc
24	activeGoroutinesMutex *sync.Mutex
25	activeGoroutines      map[string]int
26	HandlePanic           func(recovered interface{}, funcName string)
27}
28
29// GlobalUnboundedExecutor has the life cycle of the program itself
30// any goroutine want to be shutdown before main exit can be started from this executor
31// GlobalUnboundedExecutor expects the main function to call stop
32// it does not magically knows the main function exits
33var GlobalUnboundedExecutor = NewUnboundedExecutor()
34
35// NewUnboundedExecutor creates a new UnboundedExecutor,
36// UnboundedExecutor can not be created by &UnboundedExecutor{}
37// HandlePanic can be set with a callback to override global HandlePanic
38func NewUnboundedExecutor() *UnboundedExecutor {
39	ctx, cancel := context.WithCancel(context.TODO())
40	return &UnboundedExecutor{
41		ctx:                   ctx,
42		cancel:                cancel,
43		activeGoroutinesMutex: &sync.Mutex{},
44		activeGoroutines:      map[string]int{},
45	}
46}
47
48// Go starts a new goroutine and tracks its lifecycle.
49// Panic will be recovered and logged automatically, except for StopSignal
50func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
51	pc := reflect.ValueOf(handler).Pointer()
52	f := runtime.FuncForPC(pc)
53	funcName := f.Name()
54	file, line := f.FileLine(pc)
55	executor.activeGoroutinesMutex.Lock()
56	defer executor.activeGoroutinesMutex.Unlock()
57	startFrom := fmt.Sprintf("%s:%d", file, line)
58	executor.activeGoroutines[startFrom] += 1
59	go func() {
60		defer func() {
61			recovered := recover()
62			// if you want to quit a goroutine without trigger HandlePanic
63			// use runtime.Goexit() to quit
64			if recovered != nil {
65				if executor.HandlePanic == nil {
66					HandlePanic(recovered, funcName)
67				} else {
68					executor.HandlePanic(recovered, funcName)
69				}
70			}
71			executor.activeGoroutinesMutex.Lock()
72			executor.activeGoroutines[startFrom] -= 1
73			executor.activeGoroutinesMutex.Unlock()
74		}()
75		handler(executor.ctx)
76	}()
77}
78
79// Stop cancel all goroutines started by this executor without wait
80func (executor *UnboundedExecutor) Stop() {
81	executor.cancel()
82}
83
84// StopAndWaitForever cancel all goroutines started by this executor and
85// wait until all goroutines exited
86func (executor *UnboundedExecutor) StopAndWaitForever() {
87	executor.StopAndWait(context.Background())
88}
89
90// StopAndWait cancel all goroutines started by this executor and wait.
91// Wait can be cancelled by the context passed in.
92func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
93	executor.cancel()
94	for {
95		oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
96		select {
97		case <-oneHundredMilliseconds.C:
98			if executor.checkNoActiveGoroutines() {
99				return
100			}
101		case <-ctx.Done():
102			return
103		}
104	}
105}
106
107func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
108	executor.activeGoroutinesMutex.Lock()
109	defer executor.activeGoroutinesMutex.Unlock()
110	for startFrom, count := range executor.activeGoroutines {
111		if count > 0 {
112			InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
113				"startFrom", startFrom,
114				"count", count)
115			return false
116		}
117	}
118	return true
119}
120