1// Package breaker implements the circuit-breaker resiliency pattern for Go.
2package breaker
3
4import (
5	"errors"
6	"sync"
7	"sync/atomic"
8	"time"
9)
10
11// ErrBreakerOpen is the error returned from Run() when the function is not executed
12// because the breaker is currently open.
13var ErrBreakerOpen = errors.New("circuit breaker is open")
14
15const (
16	closed uint32 = iota
17	open
18	halfOpen
19)
20
21// Breaker implements the circuit-breaker resiliency pattern
22type Breaker struct {
23	errorThreshold, successThreshold int
24	timeout                          time.Duration
25
26	lock              sync.Mutex
27	state             uint32
28	errors, successes int
29	lastError         time.Time
30}
31
32// New constructs a new circuit-breaker that starts closed.
33// From closed, the breaker opens if "errorThreshold" errors are seen
34// without an error-free period of at least "timeout". From open, the
35// breaker half-closes after "timeout". From half-open, the breaker closes
36// after "successThreshold" consecutive successes, or opens on a single error.
37func New(errorThreshold, successThreshold int, timeout time.Duration) *Breaker {
38	return &Breaker{
39		errorThreshold:   errorThreshold,
40		successThreshold: successThreshold,
41		timeout:          timeout,
42	}
43}
44
45// Run will either return ErrBreakerOpen immediately if the circuit-breaker is
46// already open, or it will run the given function and pass along its return
47// value. It is safe to call Run concurrently on the same Breaker.
48func (b *Breaker) Run(work func() error) error {
49	state := atomic.LoadUint32(&b.state)
50
51	if state == open {
52		return ErrBreakerOpen
53	}
54
55	return b.doWork(state, work)
56}
57
58// Go will either return ErrBreakerOpen immediately if the circuit-breaker is
59// already open, or it will run the given function in a separate goroutine.
60// If the function is run, Go will return nil immediately, and will *not* return
61// the return value of the function. It is safe to call Go concurrently on the
62// same Breaker.
63func (b *Breaker) Go(work func() error) error {
64	state := atomic.LoadUint32(&b.state)
65
66	if state == open {
67		return ErrBreakerOpen
68	}
69
70	// errcheck complains about ignoring the error return value, but
71	// that's on purpose; if you want an error from a goroutine you have to
72	// get it over a channel or something
73	go b.doWork(state, work)
74
75	return nil
76}
77
78func (b *Breaker) doWork(state uint32, work func() error) error {
79	var panicValue interface{}
80
81	result := func() error {
82		defer func() {
83			panicValue = recover()
84		}()
85		return work()
86	}()
87
88	if result == nil && panicValue == nil && state == closed {
89		// short-circuit the normal, success path without contending
90		// on the lock
91		return nil
92	}
93
94	// oh well, I guess we have to contend on the lock
95	b.processResult(result, panicValue)
96
97	if panicValue != nil {
98		// as close as Go lets us come to a "rethrow" although unfortunately
99		// we lose the original panicing location
100		panic(panicValue)
101	}
102
103	return result
104}
105
106func (b *Breaker) processResult(result error, panicValue interface{}) {
107	b.lock.Lock()
108	defer b.lock.Unlock()
109
110	if result == nil && panicValue == nil {
111		if b.state == halfOpen {
112			b.successes++
113			if b.successes == b.successThreshold {
114				b.closeBreaker()
115			}
116		}
117	} else {
118		if b.errors > 0 {
119			expiry := b.lastError.Add(b.timeout)
120			if time.Now().After(expiry) {
121				b.errors = 0
122			}
123		}
124
125		switch b.state {
126		case closed:
127			b.errors++
128			if b.errors == b.errorThreshold {
129				b.openBreaker()
130			} else {
131				b.lastError = time.Now()
132			}
133		case halfOpen:
134			b.openBreaker()
135		}
136	}
137}
138
139func (b *Breaker) openBreaker() {
140	b.changeState(open)
141	go b.timer()
142}
143
144func (b *Breaker) closeBreaker() {
145	b.changeState(closed)
146}
147
148func (b *Breaker) timer() {
149	time.Sleep(b.timeout)
150
151	b.lock.Lock()
152	defer b.lock.Unlock()
153
154	b.changeState(halfOpen)
155}
156
157func (b *Breaker) changeState(newState uint32) {
158	b.errors = 0
159	b.successes = 0
160	atomic.StoreUint32(&b.state, newState)
161}
162