1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package trace // import "go.opentelemetry.io/otel/sdk/trace"
16
17import (
18	"context"
19	"runtime"
20	"sync"
21	"sync/atomic"
22	"time"
23
24	"go.opentelemetry.io/otel"
25	export "go.opentelemetry.io/otel/sdk/export/trace"
26)
27
28const (
29	DefaultMaxQueueSize       = 2048
30	DefaultBatchTimeout       = 5000 * time.Millisecond
31	DefaultMaxExportBatchSize = 512
32)
33
34type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
35
36type BatchSpanProcessorOptions struct {
37	// MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the
38	// queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior.
39	// The default value of MaxQueueSize is 2048.
40	MaxQueueSize int
41
42	// BatchTimeout is the maximum duration for constructing a batch. Processor
43	// forcefully sends available spans when timeout is reached.
44	// The default value of BatchTimeout is 5000 msec.
45	BatchTimeout time.Duration
46
47	// MaxExportBatchSize is the maximum number of spans to process in a single batch.
48	// If there are more than one batch worth of spans then it processes multiple batches
49	// of spans one batch after the other without any delay.
50	// The default value of MaxExportBatchSize is 512.
51	MaxExportBatchSize int
52
53	// BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full
54	// AND if BlockOnQueueFull is set to true.
55	// Blocking option should be used carefully as it can severely affect the performance of an
56	// application.
57	BlockOnQueueFull bool
58}
59
60// BatchSpanProcessor is a SpanProcessor that batches asynchronously-received
61// SpanSnapshots and sends them to a trace.Exporter when complete.
62type BatchSpanProcessor struct {
63	e export.SpanExporter
64	o BatchSpanProcessorOptions
65
66	queue   chan *export.SpanSnapshot
67	dropped uint32
68
69	batch      []*export.SpanSnapshot
70	batchMutex sync.Mutex
71	timer      *time.Timer
72	stopWait   sync.WaitGroup
73	stopOnce   sync.Once
74	stopCh     chan struct{}
75}
76
77var _ SpanProcessor = (*BatchSpanProcessor)(nil)
78
79// NewBatchSpanProcessor creates a new BatchSpanProcessor that will send
80// SpanSnapshot batches to the exporters with the supplied options.
81//
82// The returned BatchSpanProcessor needs to be registered with the SDK using
83// the RegisterSpanProcessor method for it to process spans.
84//
85// If the exporter is nil, the span processor will preform no action.
86func NewBatchSpanProcessor(exporter export.SpanExporter, options ...BatchSpanProcessorOption) *BatchSpanProcessor {
87	o := BatchSpanProcessorOptions{
88		BatchTimeout:       DefaultBatchTimeout,
89		MaxQueueSize:       DefaultMaxQueueSize,
90		MaxExportBatchSize: DefaultMaxExportBatchSize,
91	}
92	for _, opt := range options {
93		opt(&o)
94	}
95	bsp := &BatchSpanProcessor{
96		e:      exporter,
97		o:      o,
98		batch:  make([]*export.SpanSnapshot, 0, o.MaxExportBatchSize),
99		timer:  time.NewTimer(o.BatchTimeout),
100		queue:  make(chan *export.SpanSnapshot, o.MaxQueueSize),
101		stopCh: make(chan struct{}),
102	}
103
104	bsp.stopWait.Add(1)
105	go func() {
106		defer bsp.stopWait.Done()
107		bsp.processQueue()
108		bsp.drainQueue()
109	}()
110
111	return bsp
112}
113
114// OnStart method does nothing.
115func (bsp *BatchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {}
116
117// OnEnd method enqueues a ReadOnlySpan for later processing.
118func (bsp *BatchSpanProcessor) OnEnd(s ReadOnlySpan) {
119	// Do not enqueue spans if we are just going to drop them.
120	if bsp.e == nil {
121		return
122	}
123	bsp.enqueue(s.Snapshot())
124}
125
126// Shutdown flushes the queue and waits until all spans are processed.
127// It only executes once. Subsequent call does nothing.
128func (bsp *BatchSpanProcessor) Shutdown(ctx context.Context) error {
129	var err error
130	bsp.stopOnce.Do(func() {
131		wait := make(chan struct{})
132		go func() {
133			close(bsp.stopCh)
134			bsp.stopWait.Wait()
135			if bsp.e != nil {
136				if err := bsp.e.Shutdown(ctx); err != nil {
137					otel.Handle(err)
138				}
139			}
140			close(wait)
141		}()
142		// Wait until the wait group is done or the context is cancelled
143		select {
144		case <-wait:
145		case <-ctx.Done():
146			err = ctx.Err()
147		}
148	})
149	return err
150}
151
152// ForceFlush exports all ended spans that have not yet been exported.
153func (bsp *BatchSpanProcessor) ForceFlush() {
154	bsp.exportSpans()
155}
156
157func WithMaxQueueSize(size int) BatchSpanProcessorOption {
158	return func(o *BatchSpanProcessorOptions) {
159		o.MaxQueueSize = size
160	}
161}
162
163func WithMaxExportBatchSize(size int) BatchSpanProcessorOption {
164	return func(o *BatchSpanProcessorOptions) {
165		o.MaxExportBatchSize = size
166	}
167}
168
169func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption {
170	return func(o *BatchSpanProcessorOptions) {
171		o.BatchTimeout = delay
172	}
173}
174
175func WithBlocking() BatchSpanProcessorOption {
176	return func(o *BatchSpanProcessorOptions) {
177		o.BlockOnQueueFull = true
178	}
179}
180
181// exportSpans is a subroutine of processing and draining the queue.
182func (bsp *BatchSpanProcessor) exportSpans() {
183	bsp.timer.Reset(bsp.o.BatchTimeout)
184
185	bsp.batchMutex.Lock()
186	defer bsp.batchMutex.Unlock()
187
188	if len(bsp.batch) > 0 {
189		if err := bsp.e.ExportSpans(context.Background(), bsp.batch); err != nil {
190			otel.Handle(err)
191		}
192		bsp.batch = bsp.batch[:0]
193	}
194}
195
196// processQueue removes spans from the `queue` channel until processor
197// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
198// waiting up to BatchTimeout to form a batch.
199func (bsp *BatchSpanProcessor) processQueue() {
200	defer bsp.timer.Stop()
201
202	for {
203		select {
204		case <-bsp.stopCh:
205			return
206		case <-bsp.timer.C:
207			bsp.exportSpans()
208		case sd := <-bsp.queue:
209			bsp.batchMutex.Lock()
210			bsp.batch = append(bsp.batch, sd)
211			shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
212			bsp.batchMutex.Unlock()
213			if shouldExport {
214				if !bsp.timer.Stop() {
215					<-bsp.timer.C
216				}
217				bsp.exportSpans()
218			}
219		}
220	}
221}
222
223// drainQueue awaits the any caller that had added to bsp.stopWait
224// to finish the enqueue, then exports the final batch.
225func (bsp *BatchSpanProcessor) drainQueue() {
226	for {
227		select {
228		case sd := <-bsp.queue:
229			if sd == nil {
230				bsp.exportSpans()
231				return
232			}
233
234			bsp.batchMutex.Lock()
235			bsp.batch = append(bsp.batch, sd)
236			shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
237			bsp.batchMutex.Unlock()
238
239			if shouldExport {
240				bsp.exportSpans()
241			}
242		default:
243			close(bsp.queue)
244		}
245	}
246}
247
248func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanSnapshot) {
249	if !sd.SpanContext.IsSampled() {
250		return
251	}
252
253	// This ensures the bsp.queue<- below does not panic as the
254	// processor shuts down.
255	defer func() {
256		x := recover()
257		switch err := x.(type) {
258		case nil:
259			return
260		case runtime.Error:
261			if err.Error() == "send on closed channel" {
262				return
263			}
264		}
265		panic(x)
266	}()
267
268	select {
269	case <-bsp.stopCh:
270		return
271	default:
272	}
273
274	if bsp.o.BlockOnQueueFull {
275		bsp.queue <- sd
276		return
277	}
278
279	select {
280	case bsp.queue <- sd:
281	default:
282		atomic.AddUint32(&bsp.dropped, 1)
283	}
284}
285