1package concurrent
2
3import (
4	"context"
5	"fmt"
6	"runtime"
7	"sync"
8	"time"
9	"runtime/debug"
10)
11
12var LogInfo = func(event string, properties ...interface{}) {
13}
14
15var LogPanic = func(recovered interface{}, properties ...interface{}) interface{} {
16	fmt.Println(fmt.Sprintf("paniced: %v", recovered))
17	debug.PrintStack()
18	return recovered
19}
20
21const StopSignal = "STOP!"
22
23type UnboundedExecutor struct {
24	ctx                   context.Context
25	cancel                context.CancelFunc
26	activeGoroutinesMutex *sync.Mutex
27	activeGoroutines      map[string]int
28}
29
30// GlobalUnboundedExecutor has the life cycle of the program itself
31// any goroutine want to be shutdown before main exit can be started from this executor
32var GlobalUnboundedExecutor = NewUnboundedExecutor()
33
34func NewUnboundedExecutor() *UnboundedExecutor {
35	ctx, cancel := context.WithCancel(context.TODO())
36	return &UnboundedExecutor{
37		ctx:                   ctx,
38		cancel:                cancel,
39		activeGoroutinesMutex: &sync.Mutex{},
40		activeGoroutines:      map[string]int{},
41	}
42}
43
44func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
45	_, file, line, _ := runtime.Caller(1)
46	executor.activeGoroutinesMutex.Lock()
47	defer executor.activeGoroutinesMutex.Unlock()
48	startFrom := fmt.Sprintf("%s:%d", file, line)
49	executor.activeGoroutines[startFrom] += 1
50	go func() {
51		defer func() {
52			recovered := recover()
53			if recovered != nil && recovered != StopSignal {
54				LogPanic(recovered)
55			}
56			executor.activeGoroutinesMutex.Lock()
57			defer executor.activeGoroutinesMutex.Unlock()
58			executor.activeGoroutines[startFrom] -= 1
59		}()
60		handler(executor.ctx)
61	}()
62}
63
64func (executor *UnboundedExecutor) Stop() {
65	executor.cancel()
66}
67
68func (executor *UnboundedExecutor) StopAndWaitForever() {
69	executor.StopAndWait(context.Background())
70}
71
72func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
73	executor.cancel()
74	for {
75		fiveSeconds := time.NewTimer(time.Millisecond * 100)
76		select {
77		case <-fiveSeconds.C:
78		case <-ctx.Done():
79			return
80		}
81		if executor.checkGoroutines() {
82			return
83		}
84	}
85}
86
87func (executor *UnboundedExecutor) checkGoroutines() bool {
88	executor.activeGoroutinesMutex.Lock()
89	defer executor.activeGoroutinesMutex.Unlock()
90	for startFrom, count := range executor.activeGoroutines {
91		if count > 0 {
92			LogInfo("event!unbounded_executor.still waiting goroutines to quit",
93				"startFrom", startFrom,
94				"count", count)
95			return false
96		}
97	}
98	return true
99}
100