1// Package cbreaker implements circuit breaker similar to https://github.com/Netflix/Hystrix/wiki/How-it-Works 2// 3// Vulcan circuit breaker watches the error condtion to match 4// after which it activates the fallback scenario, e.g. returns the response code 5// or redirects the request to another location 6// 7// Circuit breakers start in the Standby state first, observing responses and watching location metrics. 8// 9// Once the Circuit breaker condition is met, it enters the "Tripped" state, where it activates fallback scenario 10// for all requests during the FallbackDuration time period and reset the stats for the location. 11// 12// After FallbackDuration time period passes, Circuit breaker enters "Recovering" state, during that state it will 13// start passing some traffic back to the endpoints, increasing the amount of passed requests using linear function: 14// 15// allowedRequestsRatio = 0.5 * (Now() - StartRecovery())/RecoveryDuration 16// 17// Two scenarios are possible in the "Recovering" state: 18// 1. Condition matches again, this will reset the state to "Tripped" and reset the timer. 19// 2. Condition does not match, circuit breaker enters "Standby" state 20// 21// It is possible to define actions (e.g. webhooks) of transitions between states: 22// 23// * OnTripped action is called on transition (Standby -> Tripped) 24// * OnStandby action is called on transition (Recovering -> Standby) 25// 26package cbreaker 27 28import ( 29 "fmt" 30 "net/http" 31 "sync" 32 "time" 33 34 "github.com/mailgun/timetools" 35 log "github.com/sirupsen/logrus" 36 "github.com/vulcand/oxy/memmetrics" 37 "github.com/vulcand/oxy/utils" 38) 39 40// CircuitBreaker is http.Handler that implements circuit breaker pattern 41type CircuitBreaker struct { 42 m *sync.RWMutex 43 metrics *memmetrics.RTMetrics 44 45 condition hpredicate 46 47 fallbackDuration time.Duration 48 recoveryDuration time.Duration 49 50 onTripped SideEffect 51 onStandby SideEffect 52 53 state cbState 54 until time.Time 55 56 rc *ratioController 57 58 checkPeriod time.Duration 59 lastCheck time.Time 60 61 fallback http.Handler 62 next http.Handler 63 64 clock timetools.TimeProvider 65 66 log *log.Logger 67} 68 69// New creates a new CircuitBreaker middleware 70func New(next http.Handler, expression string, options ...CircuitBreakerOption) (*CircuitBreaker, error) { 71 cb := &CircuitBreaker{ 72 m: &sync.RWMutex{}, 73 next: next, 74 // Default values. Might be overwritten by options below. 75 clock: &timetools.RealTime{}, 76 checkPeriod: defaultCheckPeriod, 77 fallbackDuration: defaultFallbackDuration, 78 recoveryDuration: defaultRecoveryDuration, 79 fallback: defaultFallback, 80 log: log.StandardLogger(), 81 } 82 83 for _, s := range options { 84 if err := s(cb); err != nil { 85 return nil, err 86 } 87 } 88 89 condition, err := parseExpression(expression) 90 if err != nil { 91 return nil, err 92 } 93 cb.condition = condition 94 95 mt, err := memmetrics.NewRTMetrics() 96 if err != nil { 97 return nil, err 98 } 99 cb.metrics = mt 100 101 return cb, nil 102} 103 104// Logger defines the logger the circuit breaker will use. 105// 106// It defaults to logrus.StandardLogger(), the global logger used by logrus. 107func Logger(l *log.Logger) CircuitBreakerOption { 108 return func(c *CircuitBreaker) error { 109 c.log = l 110 return nil 111 } 112} 113 114func (c *CircuitBreaker) ServeHTTP(w http.ResponseWriter, req *http.Request) { 115 if c.log.Level >= log.DebugLevel { 116 logEntry := c.log.WithField("Request", utils.DumpHttpRequest(req)) 117 logEntry.Debug("vulcand/oxy/circuitbreaker: begin ServeHttp on request") 118 defer logEntry.Debug("vulcand/oxy/circuitbreaker: completed ServeHttp on request") 119 } 120 if c.activateFallback(w, req) { 121 c.fallback.ServeHTTP(w, req) 122 return 123 } 124 c.serve(w, req) 125} 126 127// Fallback sets the fallback handler to be called by circuit breaker handler. 128func (c *CircuitBreaker) Fallback(f http.Handler) { 129 c.fallback = f 130} 131 132// Wrap sets the next handler to be called by circuit breaker handler. 133func (c *CircuitBreaker) Wrap(next http.Handler) { 134 c.next = next 135} 136 137// updateState updates internal state and returns true if fallback should be used and false otherwise 138func (c *CircuitBreaker) activateFallback(w http.ResponseWriter, req *http.Request) bool { 139 // Quick check with read locks optimized for normal operation use-case 140 if c.isStandby() { 141 return false 142 } 143 // Circuit breaker is in tripped or recovering state 144 c.m.Lock() 145 defer c.m.Unlock() 146 147 c.log.Warnf("%v is in error state", c) 148 149 switch c.state { 150 case stateStandby: 151 // someone else has set it to standby just now 152 return false 153 case stateTripped: 154 if c.clock.UtcNow().Before(c.until) { 155 return true 156 } 157 // We have been in active state enough, enter recovering state 158 c.setRecovering() 159 fallthrough 160 case stateRecovering: 161 // We have been in recovering state enough, enter standby and allow request 162 if c.clock.UtcNow().After(c.until) { 163 c.setState(stateStandby, c.clock.UtcNow()) 164 return false 165 } 166 // ratio controller allows this request 167 if c.rc.allowRequest() { 168 return false 169 } 170 return true 171 } 172 return false 173} 174 175func (c *CircuitBreaker) serve(w http.ResponseWriter, req *http.Request) { 176 start := c.clock.UtcNow() 177 p := utils.NewProxyWriterWithLogger(w, c.log) 178 179 c.next.ServeHTTP(p, req) 180 181 latency := c.clock.UtcNow().Sub(start) 182 c.metrics.Record(p.StatusCode(), latency) 183 184 // Note that this call is less expensive than it looks -- checkCondition only performs the real check 185 // periodically. Because of that we can afford to call it here on every single response. 186 c.checkAndSet() 187} 188 189func (c *CircuitBreaker) isStandby() bool { 190 c.m.RLock() 191 defer c.m.RUnlock() 192 return c.state == stateStandby 193} 194 195// String returns log-friendly representation of the circuit breaker state 196func (c *CircuitBreaker) String() string { 197 switch c.state { 198 case stateTripped, stateRecovering: 199 return fmt.Sprintf("CircuitBreaker(state=%v, until=%v)", c.state, c.until) 200 default: 201 return fmt.Sprintf("CircuitBreaker(state=%v)", c.state) 202 } 203} 204 205// exec executes side effect 206func (c *CircuitBreaker) exec(s SideEffect) { 207 if s == nil { 208 return 209 } 210 go func() { 211 if err := s.Exec(); err != nil { 212 c.log.Errorf("%v side effect failure: %v", c, err) 213 } 214 }() 215} 216 217func (c *CircuitBreaker) setState(new cbState, until time.Time) { 218 c.log.Debugf("%v setting state to %v, until %v", c, new, until) 219 c.state = new 220 c.until = until 221 switch new { 222 case stateTripped: 223 c.exec(c.onTripped) 224 case stateStandby: 225 c.exec(c.onStandby) 226 } 227} 228 229func (c *CircuitBreaker) timeToCheck() bool { 230 c.m.RLock() 231 defer c.m.RUnlock() 232 return c.clock.UtcNow().After(c.lastCheck) 233} 234 235// Checks if tripping condition matches and sets circuit breaker to the tripped state 236func (c *CircuitBreaker) checkAndSet() { 237 if !c.timeToCheck() { 238 return 239 } 240 241 c.m.Lock() 242 defer c.m.Unlock() 243 244 // Other goroutine could have updated the lastCheck variable before we grabbed mutex 245 if !c.clock.UtcNow().After(c.lastCheck) { 246 return 247 } 248 c.lastCheck = c.clock.UtcNow().Add(c.checkPeriod) 249 250 if c.state == stateTripped { 251 c.log.Debugf("%v skip set tripped", c) 252 return 253 } 254 255 if !c.condition(c) { 256 return 257 } 258 259 c.setState(stateTripped, c.clock.UtcNow().Add(c.fallbackDuration)) 260 c.metrics.Reset() 261} 262 263func (c *CircuitBreaker) setRecovering() { 264 c.setState(stateRecovering, c.clock.UtcNow().Add(c.recoveryDuration)) 265 c.rc = newRatioController(c.clock, c.recoveryDuration, c.log) 266} 267 268// CircuitBreakerOption represents an option you can pass to New. 269// See the documentation for the individual options below. 270type CircuitBreakerOption func(*CircuitBreaker) error 271 272// Clock allows you to fake che CircuitBreaker's view of the current time. 273// Intended for unit tests. 274func Clock(clock timetools.TimeProvider) CircuitBreakerOption { 275 return func(c *CircuitBreaker) error { 276 c.clock = clock 277 return nil 278 } 279} 280 281// FallbackDuration is how long the CircuitBreaker will remain in the Tripped 282// state before trying to recover. 283func FallbackDuration(d time.Duration) CircuitBreakerOption { 284 return func(c *CircuitBreaker) error { 285 c.fallbackDuration = d 286 return nil 287 } 288} 289 290// RecoveryDuration is how long the CircuitBreaker will take to ramp up 291// requests during the Recovering state. 292func RecoveryDuration(d time.Duration) CircuitBreakerOption { 293 return func(c *CircuitBreaker) error { 294 c.recoveryDuration = d 295 return nil 296 } 297} 298 299// CheckPeriod is how long the CircuitBreaker will wait between successive 300// checks of the breaker condition. 301func CheckPeriod(d time.Duration) CircuitBreakerOption { 302 return func(c *CircuitBreaker) error { 303 c.checkPeriod = d 304 return nil 305 } 306} 307 308// OnTripped sets a SideEffect to run when entering the Tripped state. 309// Only one SideEffect can be set for this hook. 310func OnTripped(s SideEffect) CircuitBreakerOption { 311 return func(c *CircuitBreaker) error { 312 c.onTripped = s 313 return nil 314 } 315} 316 317// OnStandby sets a SideEffect to run when entering the Standby state. 318// Only one SideEffect can be set for this hook. 319func OnStandby(s SideEffect) CircuitBreakerOption { 320 return func(c *CircuitBreaker) error { 321 c.onStandby = s 322 return nil 323 } 324} 325 326// Fallback defines the http.Handler that the CircuitBreaker should route 327// requests to when it prevents a request from taking its normal path. 328func Fallback(h http.Handler) CircuitBreakerOption { 329 return func(c *CircuitBreaker) error { 330 c.fallback = h 331 return nil 332 } 333} 334 335// cbState is the state of the circuit breaker 336type cbState int 337 338func (s cbState) String() string { 339 switch s { 340 case stateStandby: 341 return "standby" 342 case stateTripped: 343 return "tripped" 344 case stateRecovering: 345 return "recovering" 346 } 347 return "undefined" 348} 349 350const ( 351 // CircuitBreaker is passing all requests and watching stats 352 stateStandby = iota 353 // CircuitBreaker activates fallback scenario for all requests 354 stateTripped 355 // CircuitBreaker passes some requests to go through, rejecting others 356 stateRecovering 357) 358 359const ( 360 defaultFallbackDuration = 10 * time.Second 361 defaultRecoveryDuration = 10 * time.Second 362 defaultCheckPeriod = 100 * time.Millisecond 363) 364 365var defaultFallback = &fallback{} 366 367type fallback struct{} 368 369func (f *fallback) ServeHTTP(w http.ResponseWriter, req *http.Request) { 370 w.WriteHeader(http.StatusServiceUnavailable) 371 w.Write([]byte(http.StatusText(http.StatusServiceUnavailable))) 372} 373