1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//       http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package exporterhelper
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"time"
22
23	"github.com/cenkalti/backoff/v4"
24	"github.com/jaegertracing/jaeger/pkg/queue"
25	"go.opencensus.io/metric"
26	"go.opencensus.io/metric/metricdata"
27	"go.opencensus.io/metric/metricproducer"
28	"go.opentelemetry.io/otel/attribute"
29	"go.opentelemetry.io/otel/trace"
30	"go.uber.org/zap"
31	"go.uber.org/zap/zapcore"
32
33	"go.opentelemetry.io/collector/consumer/consumererror"
34	"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
35)
36
37var (
38	r = metric.NewRegistry()
39
40	queueSizeGauge, _ = r.AddInt64DerivedGauge(
41		obsmetrics.ExporterKey+"/queue_size",
42		metric.WithDescription("Current size of the retry queue (in batches)"),
43		metric.WithLabelKeys(obsmetrics.ExporterKey),
44		metric.WithUnit(metricdata.UnitDimensionless))
45
46	errSendingQueueIsFull = errors.New("sending_queue is full")
47)
48
49func init() {
50	metricproducer.GlobalManager().AddProducer(r)
51}
52
53// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
54type QueueSettings struct {
55	// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
56	Enabled bool `mapstructure:"enabled"`
57	// NumConsumers is the number of consumers from the queue.
58	NumConsumers int `mapstructure:"num_consumers"`
59	// QueueSize is the maximum number of batches allowed in queue at a given time.
60	QueueSize int `mapstructure:"queue_size"`
61}
62
63// DefaultQueueSettings returns the default settings for QueueSettings.
64func DefaultQueueSettings() QueueSettings {
65	return QueueSettings{
66		Enabled:      true,
67		NumConsumers: 10,
68		// For 5000 queue elements at 100 requests/sec gives about 50 sec of survival of destination outage.
69		// This is a pretty decent value for production.
70		// User should calculate this from the perspective of how many seconds to buffer in case of a backend outage,
71		// multiply that by the number of requests per seconds.
72		QueueSize: 5000,
73	}
74}
75
76// RetrySettings defines configuration for retrying batches in case of export failure.
77// The current supported strategy is exponential backoff.
78type RetrySettings struct {
79	// Enabled indicates whether to not retry sending batches in case of export failure.
80	Enabled bool `mapstructure:"enabled"`
81	// InitialInterval the time to wait after the first failure before retrying.
82	InitialInterval time.Duration `mapstructure:"initial_interval"`
83	// MaxInterval is the upper bound on backoff interval. Once this value is reached the delay between
84	// consecutive retries will always be `MaxInterval`.
85	MaxInterval time.Duration `mapstructure:"max_interval"`
86	// MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch.
87	// Once this value is reached, the data is discarded.
88	MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"`
89}
90
91// DefaultRetrySettings returns the default settings for RetrySettings.
92func DefaultRetrySettings() RetrySettings {
93	return RetrySettings{
94		Enabled:         true,
95		InitialInterval: 5 * time.Second,
96		MaxInterval:     30 * time.Second,
97		MaxElapsedTime:  5 * time.Minute,
98	}
99}
100
101type queuedRetrySender struct {
102	fullName        string
103	cfg             QueueSettings
104	consumerSender  requestSender
105	queue           *queue.BoundedQueue
106	retryStopCh     chan struct{}
107	traceAttributes []attribute.KeyValue
108	logger          *zap.Logger
109}
110
111func createSampledLogger(logger *zap.Logger) *zap.Logger {
112	if logger.Core().Enabled(zapcore.DebugLevel) {
113		// Debugging is enabled. Don't do any sampling.
114		return logger
115	}
116
117	// Create a logger that samples all messages to 1 per 10 seconds initially,
118	// and 1/100 of messages after that.
119	opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core {
120		return zapcore.NewSamplerWithOptions(
121			core,
122			10*time.Second,
123			1,
124			100,
125		)
126	})
127	return logger.WithOptions(opts)
128}
129
130func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySettings, nextSender requestSender, logger *zap.Logger) *queuedRetrySender {
131	retryStopCh := make(chan struct{})
132	sampledLogger := createSampledLogger(logger)
133	traceAttr := attribute.String(obsmetrics.ExporterKey, fullName)
134	return &queuedRetrySender{
135		fullName: fullName,
136		cfg:      qCfg,
137		consumerSender: &retrySender{
138			traceAttribute: traceAttr,
139			cfg:            rCfg,
140			nextSender:     nextSender,
141			stopCh:         retryStopCh,
142			logger:         sampledLogger,
143		},
144		queue:           queue.NewBoundedQueue(qCfg.QueueSize, func(item interface{}) {}),
145		retryStopCh:     retryStopCh,
146		traceAttributes: []attribute.KeyValue{traceAttr},
147		logger:          sampledLogger,
148	}
149}
150
151// start is invoked during service startup.
152func (qrs *queuedRetrySender) start() error {
153	qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) {
154		req := item.(request)
155		_ = qrs.consumerSender.send(req)
156	})
157
158	// Start reporting queue length metric
159	if qrs.cfg.Enabled {
160		err := queueSizeGauge.UpsertEntry(func() int64 {
161			return int64(qrs.queue.Size())
162		}, metricdata.NewLabelValue(qrs.fullName))
163		if err != nil {
164			return fmt.Errorf("failed to create retry queue size metric: %v", err)
165		}
166	}
167
168	return nil
169}
170
171// send implements the requestSender interface
172func (qrs *queuedRetrySender) send(req request) error {
173	if !qrs.cfg.Enabled {
174		err := qrs.consumerSender.send(req)
175		if err != nil {
176			qrs.logger.Error(
177				"Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.",
178				zap.Int("dropped_items", req.count()),
179			)
180		}
181		return err
182	}
183
184	// Prevent cancellation and deadline to propagate to the context stored in the queue.
185	// The grpc/http based receivers will cancel the request context after this function returns.
186	req.setContext(noCancellationContext{Context: req.context()})
187
188	span := trace.SpanFromContext(req.context())
189	if !qrs.queue.Produce(req) {
190		qrs.logger.Error(
191			"Dropping data because sending_queue is full. Try increasing queue_size.",
192			zap.Int("dropped_items", req.count()),
193		)
194		span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qrs.traceAttributes...))
195		return errSendingQueueIsFull
196	}
197
198	span.AddEvent("Enqueued item.", trace.WithAttributes(qrs.traceAttributes...))
199	return nil
200}
201
202// shutdown is invoked during service shutdown.
203func (qrs *queuedRetrySender) shutdown() {
204	// Cleanup queue metrics reporting
205	if qrs.cfg.Enabled {
206		_ = queueSizeGauge.UpsertEntry(func() int64 {
207			return int64(0)
208		}, metricdata.NewLabelValue(qrs.fullName))
209	}
210
211	// First stop the retry goroutines, so that unblocks the queue workers.
212	close(qrs.retryStopCh)
213
214	// Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only
215	// try once every request.
216	qrs.queue.Stop()
217}
218
219// TODO: Clean this by forcing all exporters to return an internal error type that always include the information about retries.
220type throttleRetry struct {
221	err   error
222	delay time.Duration
223}
224
225func (t throttleRetry) Error() string {
226	return "Throttle (" + t.delay.String() + "), error: " + t.err.Error()
227}
228
229func (t throttleRetry) Unwrap() error {
230	return t.err
231}
232
233// NewThrottleRetry creates a new throttle retry error.
234func NewThrottleRetry(err error, delay time.Duration) error {
235	return throttleRetry{
236		err:   err,
237		delay: delay,
238	}
239}
240
241type retrySender struct {
242	traceAttribute attribute.KeyValue
243	cfg            RetrySettings
244	nextSender     requestSender
245	stopCh         chan struct{}
246	logger         *zap.Logger
247}
248
249// send implements the requestSender interface
250func (rs *retrySender) send(req request) error {
251	if !rs.cfg.Enabled {
252		err := rs.nextSender.send(req)
253		if err != nil {
254			rs.logger.Error(
255				"Exporting failed. Try enabling retry_on_failure config option.",
256				zap.Error(err),
257			)
258		}
259		return err
260	}
261
262	// Do not use NewExponentialBackOff since it calls Reset and the code here must
263	// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
264	expBackoff := backoff.ExponentialBackOff{
265		InitialInterval:     rs.cfg.InitialInterval,
266		RandomizationFactor: backoff.DefaultRandomizationFactor,
267		Multiplier:          backoff.DefaultMultiplier,
268		MaxInterval:         rs.cfg.MaxInterval,
269		MaxElapsedTime:      rs.cfg.MaxElapsedTime,
270		Stop:                backoff.Stop,
271		Clock:               backoff.SystemClock,
272	}
273	expBackoff.Reset()
274	span := trace.SpanFromContext(req.context())
275	retryNum := int64(0)
276	for {
277		span.AddEvent(
278			"Sending request.",
279			trace.WithAttributes(rs.traceAttribute, attribute.Int64("retry_num", retryNum)))
280
281		err := rs.nextSender.send(req)
282		if err == nil {
283			return nil
284		}
285
286		// Immediately drop data on permanent errors.
287		if consumererror.IsPermanent(err) {
288			rs.logger.Error(
289				"Exporting failed. The error is not retryable. Dropping data.",
290				zap.Error(err),
291				zap.Int("dropped_items", req.count()),
292			)
293			return err
294		}
295
296		// Give the request a chance to extract signal data to retry if only some data
297		// failed to process.
298		req = req.onError(err)
299
300		backoffDelay := expBackoff.NextBackOff()
301		if backoffDelay == backoff.Stop {
302			// throw away the batch
303			err = fmt.Errorf("max elapsed time expired %w", err)
304			rs.logger.Error(
305				"Exporting failed. No more retries left. Dropping data.",
306				zap.Error(err),
307				zap.Int("dropped_items", req.count()),
308			)
309			return err
310		}
311
312		throttleErr := throttleRetry{}
313		isThrottle := errors.As(err, &throttleErr)
314		if isThrottle {
315			backoffDelay = max(backoffDelay, throttleErr.delay)
316		}
317
318		backoffDelayStr := backoffDelay.String()
319		span.AddEvent(
320			"Exporting failed. Will retry the request after interval.",
321			trace.WithAttributes(
322				rs.traceAttribute,
323				attribute.String("interval", backoffDelayStr),
324				attribute.String("error", err.Error())))
325		rs.logger.Info(
326			"Exporting failed. Will retry the request after interval.",
327			zap.Error(err),
328			zap.String("interval", backoffDelayStr),
329		)
330		retryNum++
331
332		// back-off, but get interrupted when shutting down or request is cancelled or timed out.
333		select {
334		case <-req.context().Done():
335			return fmt.Errorf("request is cancelled or timed out %w", err)
336		case <-rs.stopCh:
337			return fmt.Errorf("interrupted due to shutdown %w", err)
338		case <-time.After(backoffDelay):
339		}
340	}
341}
342
343// max returns the larger of x or y.
344func max(x, y time.Duration) time.Duration {
345	if x < y {
346		return y
347	}
348	return x
349}
350
351type noCancellationContext struct {
352	context.Context
353}
354
355func (noCancellationContext) Deadline() (deadline time.Time, ok bool) {
356	return
357}
358
359func (noCancellationContext) Done() <-chan struct{} {
360	return nil
361}
362
363func (noCancellationContext) Err() error {
364	return nil
365}
366