1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package trace // import "go.opentelemetry.io/otel/sdk/trace" 16 17import ( 18 "context" 19 "runtime" 20 "sync" 21 "sync/atomic" 22 "time" 23 24 "go.opentelemetry.io/otel" 25 export "go.opentelemetry.io/otel/sdk/export/trace" 26) 27 28const ( 29 DefaultMaxQueueSize = 2048 30 DefaultBatchTimeout = 5000 * time.Millisecond 31 DefaultMaxExportBatchSize = 512 32) 33 34type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions) 35 36type BatchSpanProcessorOptions struct { 37 // MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the 38 // queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior. 39 // The default value of MaxQueueSize is 2048. 40 MaxQueueSize int 41 42 // BatchTimeout is the maximum duration for constructing a batch. Processor 43 // forcefully sends available spans when timeout is reached. 44 // The default value of BatchTimeout is 5000 msec. 45 BatchTimeout time.Duration 46 47 // MaxExportBatchSize is the maximum number of spans to process in a single batch. 48 // If there are more than one batch worth of spans then it processes multiple batches 49 // of spans one batch after the other without any delay. 50 // The default value of MaxExportBatchSize is 512. 51 MaxExportBatchSize int 52 53 // BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full 54 // AND if BlockOnQueueFull is set to true. 55 // Blocking option should be used carefully as it can severely affect the performance of an 56 // application. 57 BlockOnQueueFull bool 58} 59 60// BatchSpanProcessor is a SpanProcessor that batches asynchronously-received 61// SpanSnapshots and sends them to a trace.Exporter when complete. 62type BatchSpanProcessor struct { 63 e export.SpanExporter 64 o BatchSpanProcessorOptions 65 66 queue chan *export.SpanSnapshot 67 dropped uint32 68 69 batch []*export.SpanSnapshot 70 batchMutex sync.Mutex 71 timer *time.Timer 72 stopWait sync.WaitGroup 73 stopOnce sync.Once 74 stopCh chan struct{} 75} 76 77var _ SpanProcessor = (*BatchSpanProcessor)(nil) 78 79// NewBatchSpanProcessor creates a new BatchSpanProcessor that will send 80// SpanSnapshot batches to the exporters with the supplied options. 81// 82// The returned BatchSpanProcessor needs to be registered with the SDK using 83// the RegisterSpanProcessor method for it to process spans. 84// 85// If the exporter is nil, the span processor will preform no action. 86func NewBatchSpanProcessor(exporter export.SpanExporter, options ...BatchSpanProcessorOption) *BatchSpanProcessor { 87 o := BatchSpanProcessorOptions{ 88 BatchTimeout: DefaultBatchTimeout, 89 MaxQueueSize: DefaultMaxQueueSize, 90 MaxExportBatchSize: DefaultMaxExportBatchSize, 91 } 92 for _, opt := range options { 93 opt(&o) 94 } 95 bsp := &BatchSpanProcessor{ 96 e: exporter, 97 o: o, 98 batch: make([]*export.SpanSnapshot, 0, o.MaxExportBatchSize), 99 timer: time.NewTimer(o.BatchTimeout), 100 queue: make(chan *export.SpanSnapshot, o.MaxQueueSize), 101 stopCh: make(chan struct{}), 102 } 103 104 bsp.stopWait.Add(1) 105 go func() { 106 defer bsp.stopWait.Done() 107 bsp.processQueue() 108 bsp.drainQueue() 109 }() 110 111 return bsp 112} 113 114// OnStart method does nothing. 115func (bsp *BatchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {} 116 117// OnEnd method enqueues a ReadOnlySpan for later processing. 118func (bsp *BatchSpanProcessor) OnEnd(s ReadOnlySpan) { 119 // Do not enqueue spans if we are just going to drop them. 120 if bsp.e == nil { 121 return 122 } 123 bsp.enqueue(s.Snapshot()) 124} 125 126// Shutdown flushes the queue and waits until all spans are processed. 127// It only executes once. Subsequent call does nothing. 128func (bsp *BatchSpanProcessor) Shutdown(ctx context.Context) error { 129 var err error 130 bsp.stopOnce.Do(func() { 131 wait := make(chan struct{}) 132 go func() { 133 close(bsp.stopCh) 134 bsp.stopWait.Wait() 135 if bsp.e != nil { 136 if err := bsp.e.Shutdown(ctx); err != nil { 137 otel.Handle(err) 138 } 139 } 140 close(wait) 141 }() 142 // Wait until the wait group is done or the context is cancelled 143 select { 144 case <-wait: 145 case <-ctx.Done(): 146 err = ctx.Err() 147 } 148 }) 149 return err 150} 151 152// ForceFlush exports all ended spans that have not yet been exported. 153func (bsp *BatchSpanProcessor) ForceFlush() { 154 bsp.exportSpans() 155} 156 157func WithMaxQueueSize(size int) BatchSpanProcessorOption { 158 return func(o *BatchSpanProcessorOptions) { 159 o.MaxQueueSize = size 160 } 161} 162 163func WithMaxExportBatchSize(size int) BatchSpanProcessorOption { 164 return func(o *BatchSpanProcessorOptions) { 165 o.MaxExportBatchSize = size 166 } 167} 168 169func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption { 170 return func(o *BatchSpanProcessorOptions) { 171 o.BatchTimeout = delay 172 } 173} 174 175func WithBlocking() BatchSpanProcessorOption { 176 return func(o *BatchSpanProcessorOptions) { 177 o.BlockOnQueueFull = true 178 } 179} 180 181// exportSpans is a subroutine of processing and draining the queue. 182func (bsp *BatchSpanProcessor) exportSpans() { 183 bsp.timer.Reset(bsp.o.BatchTimeout) 184 185 bsp.batchMutex.Lock() 186 defer bsp.batchMutex.Unlock() 187 188 if len(bsp.batch) > 0 { 189 if err := bsp.e.ExportSpans(context.Background(), bsp.batch); err != nil { 190 otel.Handle(err) 191 } 192 bsp.batch = bsp.batch[:0] 193 } 194} 195 196// processQueue removes spans from the `queue` channel until processor 197// is shut down. It calls the exporter in batches of up to MaxExportBatchSize 198// waiting up to BatchTimeout to form a batch. 199func (bsp *BatchSpanProcessor) processQueue() { 200 defer bsp.timer.Stop() 201 202 for { 203 select { 204 case <-bsp.stopCh: 205 return 206 case <-bsp.timer.C: 207 bsp.exportSpans() 208 case sd := <-bsp.queue: 209 bsp.batchMutex.Lock() 210 bsp.batch = append(bsp.batch, sd) 211 shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize 212 bsp.batchMutex.Unlock() 213 if shouldExport { 214 if !bsp.timer.Stop() { 215 <-bsp.timer.C 216 } 217 bsp.exportSpans() 218 } 219 } 220 } 221} 222 223// drainQueue awaits the any caller that had added to bsp.stopWait 224// to finish the enqueue, then exports the final batch. 225func (bsp *BatchSpanProcessor) drainQueue() { 226 for { 227 select { 228 case sd := <-bsp.queue: 229 if sd == nil { 230 bsp.exportSpans() 231 return 232 } 233 234 bsp.batchMutex.Lock() 235 bsp.batch = append(bsp.batch, sd) 236 shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize 237 bsp.batchMutex.Unlock() 238 239 if shouldExport { 240 bsp.exportSpans() 241 } 242 default: 243 close(bsp.queue) 244 } 245 } 246} 247 248func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanSnapshot) { 249 if !sd.SpanContext.IsSampled() { 250 return 251 } 252 253 // This ensures the bsp.queue<- below does not panic as the 254 // processor shuts down. 255 defer func() { 256 x := recover() 257 switch err := x.(type) { 258 case nil: 259 return 260 case runtime.Error: 261 if err.Error() == "send on closed channel" { 262 return 263 } 264 } 265 panic(x) 266 }() 267 268 select { 269 case <-bsp.stopCh: 270 return 271 default: 272 } 273 274 if bsp.o.BlockOnQueueFull { 275 bsp.queue <- sd 276 return 277 } 278 279 select { 280 case bsp.queue <- sd: 281 default: 282 atomic.AddUint32(&bsp.dropped, 1) 283 } 284} 285