1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package trace // import "go.opentelemetry.io/otel/sdk/trace"
16
17import (
18	"context"
19	"runtime"
20	"sync"
21	"sync/atomic"
22	"time"
23
24	"go.opentelemetry.io/otel"
25)
26
27const (
28	DefaultMaxQueueSize       = 2048
29	DefaultBatchTimeout       = 5000 * time.Millisecond
30	DefaultExportTimeout      = 30000 * time.Millisecond
31	DefaultMaxExportBatchSize = 512
32)
33
34type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
35
36type BatchSpanProcessorOptions struct {
37	// MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the
38	// queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior.
39	// The default value of MaxQueueSize is 2048.
40	MaxQueueSize int
41
42	// BatchTimeout is the maximum duration for constructing a batch. Processor
43	// forcefully sends available spans when timeout is reached.
44	// The default value of BatchTimeout is 5000 msec.
45	BatchTimeout time.Duration
46
47	// ExportTimeout specifies the maximum duration for exporting spans. If the timeout
48	// is reached, the export will be cancelled.
49	// The default value of ExportTimeout is 30000 msec.
50	ExportTimeout time.Duration
51
52	// MaxExportBatchSize is the maximum number of spans to process in a single batch.
53	// If there are more than one batch worth of spans then it processes multiple batches
54	// of spans one batch after the other without any delay.
55	// The default value of MaxExportBatchSize is 512.
56	MaxExportBatchSize int
57
58	// BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full
59	// AND if BlockOnQueueFull is set to true.
60	// Blocking option should be used carefully as it can severely affect the performance of an
61	// application.
62	BlockOnQueueFull bool
63}
64
65// batchSpanProcessor is a SpanProcessor that batches asynchronously-received
66// SpanSnapshots and sends them to a trace.Exporter when complete.
67type batchSpanProcessor struct {
68	e SpanExporter
69	o BatchSpanProcessorOptions
70
71	queue   chan *SpanSnapshot
72	dropped uint32
73
74	batch      []*SpanSnapshot
75	batchMutex sync.Mutex
76	timer      *time.Timer
77	stopWait   sync.WaitGroup
78	stopOnce   sync.Once
79	stopCh     chan struct{}
80}
81
82var _ SpanProcessor = (*batchSpanProcessor)(nil)
83
84// NewBatchSpanProcessor creates a new SpanProcessor that will send completed
85// span batches to the exporter with the supplied options.
86//
87// If the exporter is nil, the span processor will preform no action.
88func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
89	o := BatchSpanProcessorOptions{
90		BatchTimeout:       DefaultBatchTimeout,
91		ExportTimeout:      DefaultExportTimeout,
92		MaxQueueSize:       DefaultMaxQueueSize,
93		MaxExportBatchSize: DefaultMaxExportBatchSize,
94	}
95	for _, opt := range options {
96		opt(&o)
97	}
98	bsp := &batchSpanProcessor{
99		e:      exporter,
100		o:      o,
101		batch:  make([]*SpanSnapshot, 0, o.MaxExportBatchSize),
102		timer:  time.NewTimer(o.BatchTimeout),
103		queue:  make(chan *SpanSnapshot, o.MaxQueueSize),
104		stopCh: make(chan struct{}),
105	}
106
107	bsp.stopWait.Add(1)
108	go func() {
109		defer bsp.stopWait.Done()
110		bsp.processQueue()
111		bsp.drainQueue()
112	}()
113
114	return bsp
115}
116
117// OnStart method does nothing.
118func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {}
119
120// OnEnd method enqueues a ReadOnlySpan for later processing.
121func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
122	// Do not enqueue spans if we are just going to drop them.
123	if bsp.e == nil {
124		return
125	}
126	bsp.enqueue(s.Snapshot())
127}
128
129// Shutdown flushes the queue and waits until all spans are processed.
130// It only executes once. Subsequent call does nothing.
131func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
132	var err error
133	bsp.stopOnce.Do(func() {
134		wait := make(chan struct{})
135		go func() {
136			close(bsp.stopCh)
137			bsp.stopWait.Wait()
138			if bsp.e != nil {
139				if err := bsp.e.Shutdown(ctx); err != nil {
140					otel.Handle(err)
141				}
142			}
143			close(wait)
144		}()
145		// Wait until the wait group is done or the context is cancelled
146		select {
147		case <-wait:
148		case <-ctx.Done():
149			err = ctx.Err()
150		}
151	})
152	return err
153}
154
155// ForceFlush exports all ended spans that have not yet been exported.
156func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
157	var err error
158	if bsp.e != nil {
159		wait := make(chan struct{})
160		go func() {
161			if err := bsp.exportSpans(ctx); err != nil {
162				otel.Handle(err)
163			}
164			close(wait)
165		}()
166		// Wait until the export is finished or the context is cancelled/timed out
167		select {
168		case <-wait:
169		case <-ctx.Done():
170			err = ctx.Err()
171		}
172	}
173	return err
174}
175
176func WithMaxQueueSize(size int) BatchSpanProcessorOption {
177	return func(o *BatchSpanProcessorOptions) {
178		o.MaxQueueSize = size
179	}
180}
181
182func WithMaxExportBatchSize(size int) BatchSpanProcessorOption {
183	return func(o *BatchSpanProcessorOptions) {
184		o.MaxExportBatchSize = size
185	}
186}
187
188func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption {
189	return func(o *BatchSpanProcessorOptions) {
190		o.BatchTimeout = delay
191	}
192}
193
194func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption {
195	return func(o *BatchSpanProcessorOptions) {
196		o.ExportTimeout = timeout
197	}
198}
199
200func WithBlocking() BatchSpanProcessorOption {
201	return func(o *BatchSpanProcessorOptions) {
202		o.BlockOnQueueFull = true
203	}
204}
205
206// exportSpans is a subroutine of processing and draining the queue.
207func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
208	bsp.timer.Reset(bsp.o.BatchTimeout)
209
210	bsp.batchMutex.Lock()
211	defer bsp.batchMutex.Unlock()
212
213	if bsp.o.ExportTimeout > 0 {
214		var cancel context.CancelFunc
215		ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout)
216		defer cancel()
217	}
218
219	if len(bsp.batch) > 0 {
220		if err := bsp.e.ExportSpans(ctx, bsp.batch); err != nil {
221			return err
222		}
223		bsp.batch = bsp.batch[:0]
224	}
225	return nil
226}
227
228// processQueue removes spans from the `queue` channel until processor
229// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
230// waiting up to BatchTimeout to form a batch.
231func (bsp *batchSpanProcessor) processQueue() {
232	defer bsp.timer.Stop()
233
234	ctx, cancel := context.WithCancel(context.Background())
235	defer cancel()
236	for {
237		select {
238		case <-bsp.stopCh:
239			return
240		case <-bsp.timer.C:
241			if err := bsp.exportSpans(ctx); err != nil {
242				otel.Handle(err)
243			}
244		case sd := <-bsp.queue:
245			bsp.batchMutex.Lock()
246			bsp.batch = append(bsp.batch, sd)
247			shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
248			bsp.batchMutex.Unlock()
249			if shouldExport {
250				if !bsp.timer.Stop() {
251					<-bsp.timer.C
252				}
253				if err := bsp.exportSpans(ctx); err != nil {
254					otel.Handle(err)
255				}
256			}
257		}
258	}
259}
260
261// drainQueue awaits the any caller that had added to bsp.stopWait
262// to finish the enqueue, then exports the final batch.
263func (bsp *batchSpanProcessor) drainQueue() {
264	ctx, cancel := context.WithCancel(context.Background())
265	defer cancel()
266	for {
267		select {
268		case sd := <-bsp.queue:
269			if sd == nil {
270				if err := bsp.exportSpans(ctx); err != nil {
271					otel.Handle(err)
272				}
273				return
274			}
275
276			bsp.batchMutex.Lock()
277			bsp.batch = append(bsp.batch, sd)
278			shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
279			bsp.batchMutex.Unlock()
280
281			if shouldExport {
282				if err := bsp.exportSpans(ctx); err != nil {
283					otel.Handle(err)
284				}
285			}
286		default:
287			close(bsp.queue)
288		}
289	}
290}
291
292func (bsp *batchSpanProcessor) enqueue(sd *SpanSnapshot) {
293	if !sd.SpanContext.IsSampled() {
294		return
295	}
296
297	// This ensures the bsp.queue<- below does not panic as the
298	// processor shuts down.
299	defer func() {
300		x := recover()
301		switch err := x.(type) {
302		case nil:
303			return
304		case runtime.Error:
305			if err.Error() == "send on closed channel" {
306				return
307			}
308		}
309		panic(x)
310	}()
311
312	select {
313	case <-bsp.stopCh:
314		return
315	default:
316	}
317
318	if bsp.o.BlockOnQueueFull {
319		bsp.queue <- sd
320		return
321	}
322
323	select {
324	case bsp.queue <- sd:
325	default:
326		atomic.AddUint32(&bsp.dropped, 1)
327	}
328}
329