1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4// Package runutil provides helpers to advanced function scheduling control like repeat or retry.
5//
6// It's very often the case when you need to excutes some code every fixed intervals or have it retried automatically.
7// To make it reliably with proper timeout, you need to carefully arrange some boilerplate for this.
8// Below function does it for you.
9//
10// For repeat executes, use Repeat:
11//
12// 	err := runutil.Repeat(10*time.Second, stopc, func() error {
13// 		// ...
14// 	})
15//
16// Retry starts executing closure function f until no error is returned from f:
17//
18// 	err := runutil.Retry(10*time.Second, stopc, func() error {
19// 		// ...
20// 	})
21//
22// For logging an error on each f error, use RetryWithLog:
23//
24// 	err := runutil.RetryWithLog(logger, 10*time.Second, stopc, func() error {
25// 		// ...
26// 	})
27//
28// Another use case for runutil package is when you want to close a `Closer` interface. As we all know, we should close all implements of `Closer`, such as *os.File. Commonly we will use:
29//
30// 	defer closer.Close()
31//
32// The problem is that Close() usually can return important error e.g for os.File the actual file flush might happen (and fail) on `Close` method. It's important to *always* check error. Thanos provides utility functions to log every error like those, allowing to put them in convenient `defer`:
33//
34// 	defer runutil.CloseWithLogOnErr(logger, closer, "log format message")
35//
36// For capturing error, use CloseWithErrCapture:
37//
38// 	var err error
39// 	defer runutil.CloseWithErrCapture(&err, closer, "log format message")
40//
41// 	// ...
42//
43// If Close() returns error, err will capture it and return by argument.
44//
45// The rununtil.Exhaust* family of functions provide the same functionality but
46// they take an io.ReadCloser and they exhaust the whole reader before closing
47// them. They are useful when trying to use http keep-alive connections because
48// for the same connection to be re-used the whole response body needs to be
49// exhausted.
50package runutil
51
52import (
53	"fmt"
54	"io"
55	"io/ioutil"
56	"os"
57	"time"
58
59	"github.com/go-kit/kit/log"
60	"github.com/go-kit/kit/log/level"
61	"github.com/pkg/errors"
62	tsdberrors "github.com/prometheus/prometheus/tsdb/errors"
63)
64
65// Repeat executes f every interval seconds until stopc is closed or f returns an error.
66// It executes f once right after being called.
67func Repeat(interval time.Duration, stopc <-chan struct{}, f func() error) error {
68	tick := time.NewTicker(interval)
69	defer tick.Stop()
70
71	for {
72		if err := f(); err != nil {
73			return err
74		}
75		select {
76		case <-stopc:
77			return nil
78		case <-tick.C:
79		}
80	}
81}
82
83// Retry executes f every interval seconds until timeout or no error is returned from f.
84func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error {
85	return RetryWithLog(log.NewNopLogger(), interval, stopc, f)
86}
87
88// RetryWithLog executes f every interval seconds until timeout or no error is returned from f. It logs an error on each f error.
89func RetryWithLog(logger log.Logger, interval time.Duration, stopc <-chan struct{}, f func() error) error {
90	tick := time.NewTicker(interval)
91	defer tick.Stop()
92
93	var err error
94	for {
95		if err = f(); err == nil {
96			return nil
97		}
98		level.Error(logger).Log("msg", "function failed. Retrying in next tick", "err", err)
99		select {
100		case <-stopc:
101			return err
102		case <-tick.C:
103		}
104	}
105}
106
107// CloseWithLogOnErr is making sure we log every error, even those from best effort tiny closers.
108func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ...interface{}) {
109	err := closer.Close()
110	if err == nil {
111		return
112	}
113
114	if logger == nil {
115		logger = log.NewLogfmtLogger(os.Stderr)
116	}
117
118	level.Warn(logger).Log("msg", "detected close error", "err", errors.Wrap(err, fmt.Sprintf(format, a...)))
119}
120
121// ExhaustCloseWithLogOnErr closes the io.ReadCloser with a log message on error but exhausts the reader before.
122func ExhaustCloseWithLogOnErr(logger log.Logger, r io.ReadCloser, format string, a ...interface{}) {
123	_, err := io.Copy(ioutil.Discard, r)
124	if err != nil {
125		level.Warn(logger).Log("msg", "failed to exhaust reader, performance may be impeded", "err", err)
126	}
127
128	CloseWithLogOnErr(logger, r, format, a...)
129}
130
131// CloseWithErrCapture runs function and on error return error by argument including the given error (usually
132// from caller function).
133func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...interface{}) {
134	merr := tsdberrors.MultiError{}
135
136	merr.Add(*err)
137	merr.Add(errors.Wrapf(closer.Close(), format, a...))
138
139	*err = merr.Err()
140}
141
142// ExhaustCloseWithErrCapture closes the io.ReadCloser with error capture but exhausts the reader before.
143func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ...interface{}) {
144	_, copyErr := io.Copy(ioutil.Discard, r)
145
146	CloseWithErrCapture(err, r, format, a...)
147
148	// Prepend the io.Copy error.
149	merr := tsdberrors.MultiError{}
150	merr.Add(copyErr)
151	merr.Add(*err)
152
153	*err = merr.Err()
154}
155