1// Package breaker implements the circuit-breaker resiliency pattern for Go. 2package breaker 3 4import ( 5 "errors" 6 "sync" 7 "sync/atomic" 8 "time" 9) 10 11// ErrBreakerOpen is the error returned from Run() when the function is not executed 12// because the breaker is currently open. 13var ErrBreakerOpen = errors.New("circuit breaker is open") 14 15const ( 16 closed uint32 = iota 17 open 18 halfOpen 19) 20 21// Breaker implements the circuit-breaker resiliency pattern 22type Breaker struct { 23 errorThreshold, successThreshold int 24 timeout time.Duration 25 26 lock sync.Mutex 27 state uint32 28 errors, successes int 29 lastError time.Time 30} 31 32// New constructs a new circuit-breaker that starts closed. 33// From closed, the breaker opens if "errorThreshold" errors are seen 34// without an error-free period of at least "timeout". From open, the 35// breaker half-closes after "timeout". From half-open, the breaker closes 36// after "successThreshold" consecutive successes, or opens on a single error. 37func New(errorThreshold, successThreshold int, timeout time.Duration) *Breaker { 38 return &Breaker{ 39 errorThreshold: errorThreshold, 40 successThreshold: successThreshold, 41 timeout: timeout, 42 } 43} 44 45// Run will either return ErrBreakerOpen immediately if the circuit-breaker is 46// already open, or it will run the given function and pass along its return 47// value. It is safe to call Run concurrently on the same Breaker. 48func (b *Breaker) Run(work func() error) error { 49 state := atomic.LoadUint32(&b.state) 50 51 if state == open { 52 return ErrBreakerOpen 53 } 54 55 return b.doWork(state, work) 56} 57 58// Go will either return ErrBreakerOpen immediately if the circuit-breaker is 59// already open, or it will run the given function in a separate goroutine. 60// If the function is run, Go will return nil immediately, and will *not* return 61// the return value of the function. It is safe to call Go concurrently on the 62// same Breaker. 63func (b *Breaker) Go(work func() error) error { 64 state := atomic.LoadUint32(&b.state) 65 66 if state == open { 67 return ErrBreakerOpen 68 } 69 70 // errcheck complains about ignoring the error return value, but 71 // that's on purpose; if you want an error from a goroutine you have to 72 // get it over a channel or something 73 go b.doWork(state, work) 74 75 return nil 76} 77 78func (b *Breaker) doWork(state uint32, work func() error) error { 79 var panicValue interface{} 80 81 result := func() error { 82 defer func() { 83 panicValue = recover() 84 }() 85 return work() 86 }() 87 88 if result == nil && panicValue == nil && state == closed { 89 // short-circuit the normal, success path without contending 90 // on the lock 91 return nil 92 } 93 94 // oh well, I guess we have to contend on the lock 95 b.processResult(result, panicValue) 96 97 if panicValue != nil { 98 // as close as Go lets us come to a "rethrow" although unfortunately 99 // we lose the original panicing location 100 panic(panicValue) 101 } 102 103 return result 104} 105 106func (b *Breaker) processResult(result error, panicValue interface{}) { 107 b.lock.Lock() 108 defer b.lock.Unlock() 109 110 if result == nil && panicValue == nil { 111 if b.state == halfOpen { 112 b.successes++ 113 if b.successes == b.successThreshold { 114 b.closeBreaker() 115 } 116 } 117 } else { 118 if b.errors > 0 { 119 expiry := b.lastError.Add(b.timeout) 120 if time.Now().After(expiry) { 121 b.errors = 0 122 } 123 } 124 125 switch b.state { 126 case closed: 127 b.errors++ 128 if b.errors == b.errorThreshold { 129 b.openBreaker() 130 } else { 131 b.lastError = time.Now() 132 } 133 case halfOpen: 134 b.openBreaker() 135 } 136 } 137} 138 139func (b *Breaker) openBreaker() { 140 b.changeState(open) 141 go b.timer() 142} 143 144func (b *Breaker) closeBreaker() { 145 b.changeState(closed) 146} 147 148func (b *Breaker) timer() { 149 time.Sleep(b.timeout) 150 151 b.lock.Lock() 152 defer b.lock.Unlock() 153 154 b.changeState(halfOpen) 155} 156 157func (b *Breaker) changeState(newState uint32) { 158 b.errors = 0 159 b.successes = 0 160 atomic.StoreUint32(&b.state, newState) 161} 162