1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4// Package runutil provides helpers to advanced function scheduling control like repeat or retry. 5// 6// It's very often the case when you need to excutes some code every fixed intervals or have it retried automatically. 7// To make it reliably with proper timeout, you need to carefully arrange some boilerplate for this. 8// Below function does it for you. 9// 10// For repeat executes, use Repeat: 11// 12// err := runutil.Repeat(10*time.Second, stopc, func() error { 13// // ... 14// }) 15// 16// Retry starts executing closure function f until no error is returned from f: 17// 18// err := runutil.Retry(10*time.Second, stopc, func() error { 19// // ... 20// }) 21// 22// For logging an error on each f error, use RetryWithLog: 23// 24// err := runutil.RetryWithLog(logger, 10*time.Second, stopc, func() error { 25// // ... 26// }) 27// 28// Another use case for runutil package is when you want to close a `Closer` interface. As we all know, we should close all implements of `Closer`, such as *os.File. Commonly we will use: 29// 30// defer closer.Close() 31// 32// The problem is that Close() usually can return important error e.g for os.File the actual file flush might happen (and fail) on `Close` method. It's important to *always* check error. Thanos provides utility functions to log every error like those, allowing to put them in convenient `defer`: 33// 34// defer runutil.CloseWithLogOnErr(logger, closer, "log format message") 35// 36// For capturing error, use CloseWithErrCapture: 37// 38// var err error 39// defer runutil.CloseWithErrCapture(&err, closer, "log format message") 40// 41// // ... 42// 43// If Close() returns error, err will capture it and return by argument. 44// 45// The rununtil.Exhaust* family of functions provide the same functionality but 46// they take an io.ReadCloser and they exhaust the whole reader before closing 47// them. They are useful when trying to use http keep-alive connections because 48// for the same connection to be re-used the whole response body needs to be 49// exhausted. 50package runutil 51 52import ( 53 "fmt" 54 "io" 55 "io/ioutil" 56 "os" 57 "time" 58 59 "github.com/go-kit/kit/log" 60 "github.com/go-kit/kit/log/level" 61 "github.com/pkg/errors" 62 tsdberrors "github.com/prometheus/prometheus/tsdb/errors" 63) 64 65// Repeat executes f every interval seconds until stopc is closed or f returns an error. 66// It executes f once right after being called. 67func Repeat(interval time.Duration, stopc <-chan struct{}, f func() error) error { 68 tick := time.NewTicker(interval) 69 defer tick.Stop() 70 71 for { 72 if err := f(); err != nil { 73 return err 74 } 75 select { 76 case <-stopc: 77 return nil 78 case <-tick.C: 79 } 80 } 81} 82 83// Retry executes f every interval seconds until timeout or no error is returned from f. 84func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error { 85 return RetryWithLog(log.NewNopLogger(), interval, stopc, f) 86} 87 88// RetryWithLog executes f every interval seconds until timeout or no error is returned from f. It logs an error on each f error. 89func RetryWithLog(logger log.Logger, interval time.Duration, stopc <-chan struct{}, f func() error) error { 90 tick := time.NewTicker(interval) 91 defer tick.Stop() 92 93 var err error 94 for { 95 if err = f(); err == nil { 96 return nil 97 } 98 level.Error(logger).Log("msg", "function failed. Retrying in next tick", "err", err) 99 select { 100 case <-stopc: 101 return err 102 case <-tick.C: 103 } 104 } 105} 106 107// CloseWithLogOnErr is making sure we log every error, even those from best effort tiny closers. 108func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ...interface{}) { 109 err := closer.Close() 110 if err == nil { 111 return 112 } 113 114 if logger == nil { 115 logger = log.NewLogfmtLogger(os.Stderr) 116 } 117 118 level.Warn(logger).Log("msg", "detected close error", "err", errors.Wrap(err, fmt.Sprintf(format, a...))) 119} 120 121// ExhaustCloseWithLogOnErr closes the io.ReadCloser with a log message on error but exhausts the reader before. 122func ExhaustCloseWithLogOnErr(logger log.Logger, r io.ReadCloser, format string, a ...interface{}) { 123 _, err := io.Copy(ioutil.Discard, r) 124 if err != nil { 125 level.Warn(logger).Log("msg", "failed to exhaust reader, performance may be impeded", "err", err) 126 } 127 128 CloseWithLogOnErr(logger, r, format, a...) 129} 130 131// CloseWithErrCapture runs function and on error return error by argument including the given error (usually 132// from caller function). 133func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...interface{}) { 134 merr := tsdberrors.MultiError{} 135 136 merr.Add(*err) 137 merr.Add(errors.Wrapf(closer.Close(), format, a...)) 138 139 *err = merr.Err() 140} 141 142// ExhaustCloseWithErrCapture closes the io.ReadCloser with error capture but exhausts the reader before. 143func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ...interface{}) { 144 _, copyErr := io.Copy(ioutil.Discard, r) 145 146 CloseWithErrCapture(err, r, format, a...) 147 148 // Prepend the io.Copy error. 149 merr := tsdberrors.MultiError{} 150 merr.Add(copyErr) 151 merr.Add(*err) 152 153 *err = merr.Err() 154} 155