// Copyright The OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package trace // import "go.opentelemetry.io/otel/sdk/trace" import ( "context" "runtime" "sync" "sync/atomic" "time" "go.opentelemetry.io/otel" ) const ( DefaultMaxQueueSize = 2048 DefaultBatchTimeout = 5000 * time.Millisecond DefaultExportTimeout = 30000 * time.Millisecond DefaultMaxExportBatchSize = 512 ) type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions) type BatchSpanProcessorOptions struct { // MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the // queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior. // The default value of MaxQueueSize is 2048. MaxQueueSize int // BatchTimeout is the maximum duration for constructing a batch. Processor // forcefully sends available spans when timeout is reached. // The default value of BatchTimeout is 5000 msec. BatchTimeout time.Duration // ExportTimeout specifies the maximum duration for exporting spans. If the timeout // is reached, the export will be cancelled. // The default value of ExportTimeout is 30000 msec. ExportTimeout time.Duration // MaxExportBatchSize is the maximum number of spans to process in a single batch. // If there are more than one batch worth of spans then it processes multiple batches // of spans one batch after the other without any delay. // The default value of MaxExportBatchSize is 512. MaxExportBatchSize int // BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full // AND if BlockOnQueueFull is set to true. // Blocking option should be used carefully as it can severely affect the performance of an // application. BlockOnQueueFull bool } // batchSpanProcessor is a SpanProcessor that batches asynchronously-received // SpanSnapshots and sends them to a trace.Exporter when complete. type batchSpanProcessor struct { e SpanExporter o BatchSpanProcessorOptions queue chan *SpanSnapshot dropped uint32 batch []*SpanSnapshot batchMutex sync.Mutex timer *time.Timer stopWait sync.WaitGroup stopOnce sync.Once stopCh chan struct{} } var _ SpanProcessor = (*batchSpanProcessor)(nil) // NewBatchSpanProcessor creates a new SpanProcessor that will send completed // span batches to the exporter with the supplied options. // // If the exporter is nil, the span processor will preform no action. func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor { o := BatchSpanProcessorOptions{ BatchTimeout: DefaultBatchTimeout, ExportTimeout: DefaultExportTimeout, MaxQueueSize: DefaultMaxQueueSize, MaxExportBatchSize: DefaultMaxExportBatchSize, } for _, opt := range options { opt(&o) } bsp := &batchSpanProcessor{ e: exporter, o: o, batch: make([]*SpanSnapshot, 0, o.MaxExportBatchSize), timer: time.NewTimer(o.BatchTimeout), queue: make(chan *SpanSnapshot, o.MaxQueueSize), stopCh: make(chan struct{}), } bsp.stopWait.Add(1) go func() { defer bsp.stopWait.Done() bsp.processQueue() bsp.drainQueue() }() return bsp } // OnStart method does nothing. func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {} // OnEnd method enqueues a ReadOnlySpan for later processing. func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) { // Do not enqueue spans if we are just going to drop them. if bsp.e == nil { return } bsp.enqueue(s.Snapshot()) } // Shutdown flushes the queue and waits until all spans are processed. // It only executes once. Subsequent call does nothing. func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { var err error bsp.stopOnce.Do(func() { wait := make(chan struct{}) go func() { close(bsp.stopCh) bsp.stopWait.Wait() if bsp.e != nil { if err := bsp.e.Shutdown(ctx); err != nil { otel.Handle(err) } } close(wait) }() // Wait until the wait group is done or the context is cancelled select { case <-wait: case <-ctx.Done(): err = ctx.Err() } }) return err } // ForceFlush exports all ended spans that have not yet been exported. func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { var err error if bsp.e != nil { wait := make(chan struct{}) go func() { if err := bsp.exportSpans(ctx); err != nil { otel.Handle(err) } close(wait) }() // Wait until the export is finished or the context is cancelled/timed out select { case <-wait: case <-ctx.Done(): err = ctx.Err() } } return err } func WithMaxQueueSize(size int) BatchSpanProcessorOption { return func(o *BatchSpanProcessorOptions) { o.MaxQueueSize = size } } func WithMaxExportBatchSize(size int) BatchSpanProcessorOption { return func(o *BatchSpanProcessorOptions) { o.MaxExportBatchSize = size } } func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption { return func(o *BatchSpanProcessorOptions) { o.BatchTimeout = delay } } func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption { return func(o *BatchSpanProcessorOptions) { o.ExportTimeout = timeout } } func WithBlocking() BatchSpanProcessorOption { return func(o *BatchSpanProcessorOptions) { o.BlockOnQueueFull = true } } // exportSpans is a subroutine of processing and draining the queue. func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { bsp.timer.Reset(bsp.o.BatchTimeout) bsp.batchMutex.Lock() defer bsp.batchMutex.Unlock() if bsp.o.ExportTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout) defer cancel() } if len(bsp.batch) > 0 { if err := bsp.e.ExportSpans(ctx, bsp.batch); err != nil { return err } bsp.batch = bsp.batch[:0] } return nil } // processQueue removes spans from the `queue` channel until processor // is shut down. It calls the exporter in batches of up to MaxExportBatchSize // waiting up to BatchTimeout to form a batch. func (bsp *batchSpanProcessor) processQueue() { defer bsp.timer.Stop() ctx, cancel := context.WithCancel(context.Background()) defer cancel() for { select { case <-bsp.stopCh: return case <-bsp.timer.C: if err := bsp.exportSpans(ctx); err != nil { otel.Handle(err) } case sd := <-bsp.queue: bsp.batchMutex.Lock() bsp.batch = append(bsp.batch, sd) shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize bsp.batchMutex.Unlock() if shouldExport { if !bsp.timer.Stop() { <-bsp.timer.C } if err := bsp.exportSpans(ctx); err != nil { otel.Handle(err) } } } } } // drainQueue awaits the any caller that had added to bsp.stopWait // to finish the enqueue, then exports the final batch. func (bsp *batchSpanProcessor) drainQueue() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() for { select { case sd := <-bsp.queue: if sd == nil { if err := bsp.exportSpans(ctx); err != nil { otel.Handle(err) } return } bsp.batchMutex.Lock() bsp.batch = append(bsp.batch, sd) shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize bsp.batchMutex.Unlock() if shouldExport { if err := bsp.exportSpans(ctx); err != nil { otel.Handle(err) } } default: close(bsp.queue) } } } func (bsp *batchSpanProcessor) enqueue(sd *SpanSnapshot) { if !sd.SpanContext.IsSampled() { return } // This ensures the bsp.queue<- below does not panic as the // processor shuts down. defer func() { x := recover() switch err := x.(type) { case nil: return case runtime.Error: if err.Error() == "send on closed channel" { return } } panic(x) }() select { case <-bsp.stopCh: return default: } if bsp.o.BlockOnQueueFull { bsp.queue <- sd return } select { case bsp.queue <- sd: default: atomic.AddUint32(&bsp.dropped, 1) } }