1// Package cbreaker implements circuit breaker similar to  https://github.com/Netflix/Hystrix/wiki/How-it-Works
2//
3// Vulcan circuit breaker watches the error condtion to match
4// after which it activates the fallback scenario, e.g. returns the response code
5// or redirects the request to another location
6//
7// Circuit breakers start in the Standby state first, observing responses and watching location metrics.
8//
9// Once the Circuit breaker condition is met, it enters the "Tripped" state, where it activates fallback scenario
10// for all requests during the FallbackDuration time period and reset the stats for the location.
11//
12// After FallbackDuration time period passes, Circuit breaker enters "Recovering" state, during that state it will
13// start passing some traffic back to the endpoints, increasing the amount of passed requests using linear function:
14//
15//    allowedRequestsRatio = 0.5 * (Now() - StartRecovery())/RecoveryDuration
16//
17// Two scenarios are possible in the "Recovering" state:
18// 1. Condition matches again, this will reset the state to "Tripped" and reset the timer.
19// 2. Condition does not match, circuit breaker enters "Standby" state
20//
21// It is possible to define actions (e.g. webhooks) of transitions between states:
22//
23// * OnTripped action is called on transition (Standby -> Tripped)
24// * OnStandby action is called on transition (Recovering -> Standby)
25//
26package cbreaker
27
28import (
29	"fmt"
30	"net/http"
31	"sync"
32	"time"
33
34	"github.com/mailgun/timetools"
35	log "github.com/sirupsen/logrus"
36	"github.com/vulcand/oxy/memmetrics"
37	"github.com/vulcand/oxy/utils"
38)
39
40// CircuitBreaker is http.Handler that implements circuit breaker pattern
41type CircuitBreaker struct {
42	m       *sync.RWMutex
43	metrics *memmetrics.RTMetrics
44
45	condition hpredicate
46
47	fallbackDuration time.Duration
48	recoveryDuration time.Duration
49
50	onTripped SideEffect
51	onStandby SideEffect
52
53	state cbState
54	until time.Time
55
56	rc *ratioController
57
58	checkPeriod time.Duration
59	lastCheck   time.Time
60
61	fallback http.Handler
62	next     http.Handler
63
64	clock timetools.TimeProvider
65
66	log *log.Logger
67}
68
69// New creates a new CircuitBreaker middleware
70func New(next http.Handler, expression string, options ...CircuitBreakerOption) (*CircuitBreaker, error) {
71	cb := &CircuitBreaker{
72		m:    &sync.RWMutex{},
73		next: next,
74		// Default values. Might be overwritten by options below.
75		clock:            &timetools.RealTime{},
76		checkPeriod:      defaultCheckPeriod,
77		fallbackDuration: defaultFallbackDuration,
78		recoveryDuration: defaultRecoveryDuration,
79		fallback:         defaultFallback,
80		log:              log.StandardLogger(),
81	}
82
83	for _, s := range options {
84		if err := s(cb); err != nil {
85			return nil, err
86		}
87	}
88
89	condition, err := parseExpression(expression)
90	if err != nil {
91		return nil, err
92	}
93	cb.condition = condition
94
95	mt, err := memmetrics.NewRTMetrics()
96	if err != nil {
97		return nil, err
98	}
99	cb.metrics = mt
100
101	return cb, nil
102}
103
104// Logger defines the logger the circuit breaker will use.
105//
106// It defaults to logrus.StandardLogger(), the global logger used by logrus.
107func Logger(l *log.Logger) CircuitBreakerOption {
108	return func(c *CircuitBreaker) error {
109		c.log = l
110		return nil
111	}
112}
113
114func (c *CircuitBreaker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
115	if c.log.Level >= log.DebugLevel {
116		logEntry := c.log.WithField("Request", utils.DumpHttpRequest(req))
117		logEntry.Debug("vulcand/oxy/circuitbreaker: begin ServeHttp on request")
118		defer logEntry.Debug("vulcand/oxy/circuitbreaker: completed ServeHttp on request")
119	}
120	if c.activateFallback(w, req) {
121		c.fallback.ServeHTTP(w, req)
122		return
123	}
124	c.serve(w, req)
125}
126
127// Fallback sets the fallback handler to be called by circuit breaker handler.
128func (c *CircuitBreaker) Fallback(f http.Handler) {
129	c.fallback = f
130}
131
132// Wrap sets the next handler to be called by circuit breaker handler.
133func (c *CircuitBreaker) Wrap(next http.Handler) {
134	c.next = next
135}
136
137// updateState updates internal state and returns true if fallback should be used and false otherwise
138func (c *CircuitBreaker) activateFallback(w http.ResponseWriter, req *http.Request) bool {
139	// Quick check with read locks optimized for normal operation use-case
140	if c.isStandby() {
141		return false
142	}
143	// Circuit breaker is in tripped or recovering state
144	c.m.Lock()
145	defer c.m.Unlock()
146
147	c.log.Warnf("%v is in error state", c)
148
149	switch c.state {
150	case stateStandby:
151		// someone else has set it to standby just now
152		return false
153	case stateTripped:
154		if c.clock.UtcNow().Before(c.until) {
155			return true
156		}
157		// We have been in active state enough, enter recovering state
158		c.setRecovering()
159		fallthrough
160	case stateRecovering:
161		// We have been in recovering state enough, enter standby and allow request
162		if c.clock.UtcNow().After(c.until) {
163			c.setState(stateStandby, c.clock.UtcNow())
164			return false
165		}
166		// ratio controller allows this request
167		if c.rc.allowRequest() {
168			return false
169		}
170		return true
171	}
172	return false
173}
174
175func (c *CircuitBreaker) serve(w http.ResponseWriter, req *http.Request) {
176	start := c.clock.UtcNow()
177	p := utils.NewProxyWriterWithLogger(w, c.log)
178
179	c.next.ServeHTTP(p, req)
180
181	latency := c.clock.UtcNow().Sub(start)
182	c.metrics.Record(p.StatusCode(), latency)
183
184	// Note that this call is less expensive than it looks -- checkCondition only performs the real check
185	// periodically. Because of that we can afford to call it here on every single response.
186	c.checkAndSet()
187}
188
189func (c *CircuitBreaker) isStandby() bool {
190	c.m.RLock()
191	defer c.m.RUnlock()
192	return c.state == stateStandby
193}
194
195// String returns log-friendly representation of the circuit breaker state
196func (c *CircuitBreaker) String() string {
197	switch c.state {
198	case stateTripped, stateRecovering:
199		return fmt.Sprintf("CircuitBreaker(state=%v, until=%v)", c.state, c.until)
200	default:
201		return fmt.Sprintf("CircuitBreaker(state=%v)", c.state)
202	}
203}
204
205// exec executes side effect
206func (c *CircuitBreaker) exec(s SideEffect) {
207	if s == nil {
208		return
209	}
210	go func() {
211		if err := s.Exec(); err != nil {
212			c.log.Errorf("%v side effect failure: %v", c, err)
213		}
214	}()
215}
216
217func (c *CircuitBreaker) setState(new cbState, until time.Time) {
218	c.log.Debugf("%v setting state to %v, until %v", c, new, until)
219	c.state = new
220	c.until = until
221	switch new {
222	case stateTripped:
223		c.exec(c.onTripped)
224	case stateStandby:
225		c.exec(c.onStandby)
226	}
227}
228
229func (c *CircuitBreaker) timeToCheck() bool {
230	c.m.RLock()
231	defer c.m.RUnlock()
232	return c.clock.UtcNow().After(c.lastCheck)
233}
234
235// Checks if tripping condition matches and sets circuit breaker to the tripped state
236func (c *CircuitBreaker) checkAndSet() {
237	if !c.timeToCheck() {
238		return
239	}
240
241	c.m.Lock()
242	defer c.m.Unlock()
243
244	// Other goroutine could have updated the lastCheck variable before we grabbed mutex
245	if !c.clock.UtcNow().After(c.lastCheck) {
246		return
247	}
248	c.lastCheck = c.clock.UtcNow().Add(c.checkPeriod)
249
250	if c.state == stateTripped {
251		c.log.Debugf("%v skip set tripped", c)
252		return
253	}
254
255	if !c.condition(c) {
256		return
257	}
258
259	c.setState(stateTripped, c.clock.UtcNow().Add(c.fallbackDuration))
260	c.metrics.Reset()
261}
262
263func (c *CircuitBreaker) setRecovering() {
264	c.setState(stateRecovering, c.clock.UtcNow().Add(c.recoveryDuration))
265	c.rc = newRatioController(c.clock, c.recoveryDuration, c.log)
266}
267
268// CircuitBreakerOption represents an option you can pass to New.
269// See the documentation for the individual options below.
270type CircuitBreakerOption func(*CircuitBreaker) error
271
272// Clock allows you to fake che CircuitBreaker's view of the current time.
273// Intended for unit tests.
274func Clock(clock timetools.TimeProvider) CircuitBreakerOption {
275	return func(c *CircuitBreaker) error {
276		c.clock = clock
277		return nil
278	}
279}
280
281// FallbackDuration is how long the CircuitBreaker will remain in the Tripped
282// state before trying to recover.
283func FallbackDuration(d time.Duration) CircuitBreakerOption {
284	return func(c *CircuitBreaker) error {
285		c.fallbackDuration = d
286		return nil
287	}
288}
289
290// RecoveryDuration is how long the CircuitBreaker will take to ramp up
291// requests during the Recovering state.
292func RecoveryDuration(d time.Duration) CircuitBreakerOption {
293	return func(c *CircuitBreaker) error {
294		c.recoveryDuration = d
295		return nil
296	}
297}
298
299// CheckPeriod is how long the CircuitBreaker will wait between successive
300// checks of the breaker condition.
301func CheckPeriod(d time.Duration) CircuitBreakerOption {
302	return func(c *CircuitBreaker) error {
303		c.checkPeriod = d
304		return nil
305	}
306}
307
308// OnTripped sets a SideEffect to run when entering the Tripped state.
309// Only one SideEffect can be set for this hook.
310func OnTripped(s SideEffect) CircuitBreakerOption {
311	return func(c *CircuitBreaker) error {
312		c.onTripped = s
313		return nil
314	}
315}
316
317// OnStandby sets a SideEffect to run when entering the Standby state.
318// Only one SideEffect can be set for this hook.
319func OnStandby(s SideEffect) CircuitBreakerOption {
320	return func(c *CircuitBreaker) error {
321		c.onStandby = s
322		return nil
323	}
324}
325
326// Fallback defines the http.Handler that the CircuitBreaker should route
327// requests to when it prevents a request from taking its normal path.
328func Fallback(h http.Handler) CircuitBreakerOption {
329	return func(c *CircuitBreaker) error {
330		c.fallback = h
331		return nil
332	}
333}
334
335// cbState is the state of the circuit breaker
336type cbState int
337
338func (s cbState) String() string {
339	switch s {
340	case stateStandby:
341		return "standby"
342	case stateTripped:
343		return "tripped"
344	case stateRecovering:
345		return "recovering"
346	}
347	return "undefined"
348}
349
350const (
351	// CircuitBreaker is passing all requests and watching stats
352	stateStandby = iota
353	// CircuitBreaker activates fallback scenario for all requests
354	stateTripped
355	// CircuitBreaker passes some requests to go through, rejecting others
356	stateRecovering
357)
358
359const (
360	defaultFallbackDuration = 10 * time.Second
361	defaultRecoveryDuration = 10 * time.Second
362	defaultCheckPeriod      = 100 * time.Millisecond
363)
364
365var defaultFallback = &fallback{}
366
367type fallback struct{}
368
369func (f *fallback) ServeHTTP(w http.ResponseWriter, req *http.Request) {
370	w.WriteHeader(http.StatusServiceUnavailable)
371	w.Write([]byte(http.StatusText(http.StatusServiceUnavailable)))
372}
373