1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package exporterhelper 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "time" 22 23 "github.com/cenkalti/backoff/v4" 24 "github.com/jaegertracing/jaeger/pkg/queue" 25 "go.opencensus.io/metric" 26 "go.opencensus.io/metric/metricdata" 27 "go.opencensus.io/metric/metricproducer" 28 "go.opentelemetry.io/otel/attribute" 29 "go.opentelemetry.io/otel/trace" 30 "go.uber.org/zap" 31 "go.uber.org/zap/zapcore" 32 33 "go.opentelemetry.io/collector/consumer/consumererror" 34 "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" 35) 36 37var ( 38 r = metric.NewRegistry() 39 40 queueSizeGauge, _ = r.AddInt64DerivedGauge( 41 obsmetrics.ExporterKey+"/queue_size", 42 metric.WithDescription("Current size of the retry queue (in batches)"), 43 metric.WithLabelKeys(obsmetrics.ExporterKey), 44 metric.WithUnit(metricdata.UnitDimensionless)) 45 46 errSendingQueueIsFull = errors.New("sending_queue is full") 47) 48 49func init() { 50 metricproducer.GlobalManager().AddProducer(r) 51} 52 53// QueueSettings defines configuration for queueing batches before sending to the consumerSender. 54type QueueSettings struct { 55 // Enabled indicates whether to not enqueue batches before sending to the consumerSender. 56 Enabled bool `mapstructure:"enabled"` 57 // NumConsumers is the number of consumers from the queue. 58 NumConsumers int `mapstructure:"num_consumers"` 59 // QueueSize is the maximum number of batches allowed in queue at a given time. 60 QueueSize int `mapstructure:"queue_size"` 61} 62 63// DefaultQueueSettings returns the default settings for QueueSettings. 64func DefaultQueueSettings() QueueSettings { 65 return QueueSettings{ 66 Enabled: true, 67 NumConsumers: 10, 68 // For 5000 queue elements at 100 requests/sec gives about 50 sec of survival of destination outage. 69 // This is a pretty decent value for production. 70 // User should calculate this from the perspective of how many seconds to buffer in case of a backend outage, 71 // multiply that by the number of requests per seconds. 72 QueueSize: 5000, 73 } 74} 75 76// RetrySettings defines configuration for retrying batches in case of export failure. 77// The current supported strategy is exponential backoff. 78type RetrySettings struct { 79 // Enabled indicates whether to not retry sending batches in case of export failure. 80 Enabled bool `mapstructure:"enabled"` 81 // InitialInterval the time to wait after the first failure before retrying. 82 InitialInterval time.Duration `mapstructure:"initial_interval"` 83 // MaxInterval is the upper bound on backoff interval. Once this value is reached the delay between 84 // consecutive retries will always be `MaxInterval`. 85 MaxInterval time.Duration `mapstructure:"max_interval"` 86 // MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch. 87 // Once this value is reached, the data is discarded. 88 MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"` 89} 90 91// DefaultRetrySettings returns the default settings for RetrySettings. 92func DefaultRetrySettings() RetrySettings { 93 return RetrySettings{ 94 Enabled: true, 95 InitialInterval: 5 * time.Second, 96 MaxInterval: 30 * time.Second, 97 MaxElapsedTime: 5 * time.Minute, 98 } 99} 100 101type queuedRetrySender struct { 102 fullName string 103 cfg QueueSettings 104 consumerSender requestSender 105 queue *queue.BoundedQueue 106 retryStopCh chan struct{} 107 traceAttributes []attribute.KeyValue 108 logger *zap.Logger 109} 110 111func createSampledLogger(logger *zap.Logger) *zap.Logger { 112 if logger.Core().Enabled(zapcore.DebugLevel) { 113 // Debugging is enabled. Don't do any sampling. 114 return logger 115 } 116 117 // Create a logger that samples all messages to 1 per 10 seconds initially, 118 // and 1/100 of messages after that. 119 opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core { 120 return zapcore.NewSamplerWithOptions( 121 core, 122 10*time.Second, 123 1, 124 100, 125 ) 126 }) 127 return logger.WithOptions(opts) 128} 129 130func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySettings, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { 131 retryStopCh := make(chan struct{}) 132 sampledLogger := createSampledLogger(logger) 133 traceAttr := attribute.String(obsmetrics.ExporterKey, fullName) 134 return &queuedRetrySender{ 135 fullName: fullName, 136 cfg: qCfg, 137 consumerSender: &retrySender{ 138 traceAttribute: traceAttr, 139 cfg: rCfg, 140 nextSender: nextSender, 141 stopCh: retryStopCh, 142 logger: sampledLogger, 143 }, 144 queue: queue.NewBoundedQueue(qCfg.QueueSize, func(item interface{}) {}), 145 retryStopCh: retryStopCh, 146 traceAttributes: []attribute.KeyValue{traceAttr}, 147 logger: sampledLogger, 148 } 149} 150 151// start is invoked during service startup. 152func (qrs *queuedRetrySender) start() error { 153 qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) { 154 req := item.(request) 155 _ = qrs.consumerSender.send(req) 156 }) 157 158 // Start reporting queue length metric 159 if qrs.cfg.Enabled { 160 err := queueSizeGauge.UpsertEntry(func() int64 { 161 return int64(qrs.queue.Size()) 162 }, metricdata.NewLabelValue(qrs.fullName)) 163 if err != nil { 164 return fmt.Errorf("failed to create retry queue size metric: %v", err) 165 } 166 } 167 168 return nil 169} 170 171// send implements the requestSender interface 172func (qrs *queuedRetrySender) send(req request) error { 173 if !qrs.cfg.Enabled { 174 err := qrs.consumerSender.send(req) 175 if err != nil { 176 qrs.logger.Error( 177 "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", 178 zap.Int("dropped_items", req.count()), 179 ) 180 } 181 return err 182 } 183 184 // Prevent cancellation and deadline to propagate to the context stored in the queue. 185 // The grpc/http based receivers will cancel the request context after this function returns. 186 req.setContext(noCancellationContext{Context: req.context()}) 187 188 span := trace.SpanFromContext(req.context()) 189 if !qrs.queue.Produce(req) { 190 qrs.logger.Error( 191 "Dropping data because sending_queue is full. Try increasing queue_size.", 192 zap.Int("dropped_items", req.count()), 193 ) 194 span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qrs.traceAttributes...)) 195 return errSendingQueueIsFull 196 } 197 198 span.AddEvent("Enqueued item.", trace.WithAttributes(qrs.traceAttributes...)) 199 return nil 200} 201 202// shutdown is invoked during service shutdown. 203func (qrs *queuedRetrySender) shutdown() { 204 // Cleanup queue metrics reporting 205 if qrs.cfg.Enabled { 206 _ = queueSizeGauge.UpsertEntry(func() int64 { 207 return int64(0) 208 }, metricdata.NewLabelValue(qrs.fullName)) 209 } 210 211 // First stop the retry goroutines, so that unblocks the queue workers. 212 close(qrs.retryStopCh) 213 214 // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only 215 // try once every request. 216 qrs.queue.Stop() 217} 218 219// TODO: Clean this by forcing all exporters to return an internal error type that always include the information about retries. 220type throttleRetry struct { 221 err error 222 delay time.Duration 223} 224 225func (t throttleRetry) Error() string { 226 return "Throttle (" + t.delay.String() + "), error: " + t.err.Error() 227} 228 229func (t throttleRetry) Unwrap() error { 230 return t.err 231} 232 233// NewThrottleRetry creates a new throttle retry error. 234func NewThrottleRetry(err error, delay time.Duration) error { 235 return throttleRetry{ 236 err: err, 237 delay: delay, 238 } 239} 240 241type retrySender struct { 242 traceAttribute attribute.KeyValue 243 cfg RetrySettings 244 nextSender requestSender 245 stopCh chan struct{} 246 logger *zap.Logger 247} 248 249// send implements the requestSender interface 250func (rs *retrySender) send(req request) error { 251 if !rs.cfg.Enabled { 252 err := rs.nextSender.send(req) 253 if err != nil { 254 rs.logger.Error( 255 "Exporting failed. Try enabling retry_on_failure config option.", 256 zap.Error(err), 257 ) 258 } 259 return err 260 } 261 262 // Do not use NewExponentialBackOff since it calls Reset and the code here must 263 // call Reset after changing the InitialInterval (this saves an unnecessary call to Now). 264 expBackoff := backoff.ExponentialBackOff{ 265 InitialInterval: rs.cfg.InitialInterval, 266 RandomizationFactor: backoff.DefaultRandomizationFactor, 267 Multiplier: backoff.DefaultMultiplier, 268 MaxInterval: rs.cfg.MaxInterval, 269 MaxElapsedTime: rs.cfg.MaxElapsedTime, 270 Stop: backoff.Stop, 271 Clock: backoff.SystemClock, 272 } 273 expBackoff.Reset() 274 span := trace.SpanFromContext(req.context()) 275 retryNum := int64(0) 276 for { 277 span.AddEvent( 278 "Sending request.", 279 trace.WithAttributes(rs.traceAttribute, attribute.Int64("retry_num", retryNum))) 280 281 err := rs.nextSender.send(req) 282 if err == nil { 283 return nil 284 } 285 286 // Immediately drop data on permanent errors. 287 if consumererror.IsPermanent(err) { 288 rs.logger.Error( 289 "Exporting failed. The error is not retryable. Dropping data.", 290 zap.Error(err), 291 zap.Int("dropped_items", req.count()), 292 ) 293 return err 294 } 295 296 // Give the request a chance to extract signal data to retry if only some data 297 // failed to process. 298 req = req.onError(err) 299 300 backoffDelay := expBackoff.NextBackOff() 301 if backoffDelay == backoff.Stop { 302 // throw away the batch 303 err = fmt.Errorf("max elapsed time expired %w", err) 304 rs.logger.Error( 305 "Exporting failed. No more retries left. Dropping data.", 306 zap.Error(err), 307 zap.Int("dropped_items", req.count()), 308 ) 309 return err 310 } 311 312 throttleErr := throttleRetry{} 313 isThrottle := errors.As(err, &throttleErr) 314 if isThrottle { 315 backoffDelay = max(backoffDelay, throttleErr.delay) 316 } 317 318 backoffDelayStr := backoffDelay.String() 319 span.AddEvent( 320 "Exporting failed. Will retry the request after interval.", 321 trace.WithAttributes( 322 rs.traceAttribute, 323 attribute.String("interval", backoffDelayStr), 324 attribute.String("error", err.Error()))) 325 rs.logger.Info( 326 "Exporting failed. Will retry the request after interval.", 327 zap.Error(err), 328 zap.String("interval", backoffDelayStr), 329 ) 330 retryNum++ 331 332 // back-off, but get interrupted when shutting down or request is cancelled or timed out. 333 select { 334 case <-req.context().Done(): 335 return fmt.Errorf("request is cancelled or timed out %w", err) 336 case <-rs.stopCh: 337 return fmt.Errorf("interrupted due to shutdown %w", err) 338 case <-time.After(backoffDelay): 339 } 340 } 341} 342 343// max returns the larger of x or y. 344func max(x, y time.Duration) time.Duration { 345 if x < y { 346 return y 347 } 348 return x 349} 350 351type noCancellationContext struct { 352 context.Context 353} 354 355func (noCancellationContext) Deadline() (deadline time.Time, ok bool) { 356 return 357} 358 359func (noCancellationContext) Done() <-chan struct{} { 360 return nil 361} 362 363func (noCancellationContext) Err() error { 364 return nil 365} 366