1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package trace // import "go.opentelemetry.io/otel/sdk/trace" 16 17import ( 18 "context" 19 "runtime" 20 "sync" 21 "sync/atomic" 22 "time" 23 24 "go.opentelemetry.io/otel" 25) 26 27const ( 28 DefaultMaxQueueSize = 2048 29 DefaultBatchTimeout = 5000 * time.Millisecond 30 DefaultExportTimeout = 30000 * time.Millisecond 31 DefaultMaxExportBatchSize = 512 32) 33 34type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions) 35 36type BatchSpanProcessorOptions struct { 37 // MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the 38 // queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior. 39 // The default value of MaxQueueSize is 2048. 40 MaxQueueSize int 41 42 // BatchTimeout is the maximum duration for constructing a batch. Processor 43 // forcefully sends available spans when timeout is reached. 44 // The default value of BatchTimeout is 5000 msec. 45 BatchTimeout time.Duration 46 47 // ExportTimeout specifies the maximum duration for exporting spans. If the timeout 48 // is reached, the export will be cancelled. 49 // The default value of ExportTimeout is 30000 msec. 50 ExportTimeout time.Duration 51 52 // MaxExportBatchSize is the maximum number of spans to process in a single batch. 53 // If there are more than one batch worth of spans then it processes multiple batches 54 // of spans one batch after the other without any delay. 55 // The default value of MaxExportBatchSize is 512. 56 MaxExportBatchSize int 57 58 // BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full 59 // AND if BlockOnQueueFull is set to true. 60 // Blocking option should be used carefully as it can severely affect the performance of an 61 // application. 62 BlockOnQueueFull bool 63} 64 65// batchSpanProcessor is a SpanProcessor that batches asynchronously-received 66// SpanSnapshots and sends them to a trace.Exporter when complete. 67type batchSpanProcessor struct { 68 e SpanExporter 69 o BatchSpanProcessorOptions 70 71 queue chan *SpanSnapshot 72 dropped uint32 73 74 batch []*SpanSnapshot 75 batchMutex sync.Mutex 76 timer *time.Timer 77 stopWait sync.WaitGroup 78 stopOnce sync.Once 79 stopCh chan struct{} 80} 81 82var _ SpanProcessor = (*batchSpanProcessor)(nil) 83 84// NewBatchSpanProcessor creates a new SpanProcessor that will send completed 85// span batches to the exporter with the supplied options. 86// 87// If the exporter is nil, the span processor will preform no action. 88func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor { 89 o := BatchSpanProcessorOptions{ 90 BatchTimeout: DefaultBatchTimeout, 91 ExportTimeout: DefaultExportTimeout, 92 MaxQueueSize: DefaultMaxQueueSize, 93 MaxExportBatchSize: DefaultMaxExportBatchSize, 94 } 95 for _, opt := range options { 96 opt(&o) 97 } 98 bsp := &batchSpanProcessor{ 99 e: exporter, 100 o: o, 101 batch: make([]*SpanSnapshot, 0, o.MaxExportBatchSize), 102 timer: time.NewTimer(o.BatchTimeout), 103 queue: make(chan *SpanSnapshot, o.MaxQueueSize), 104 stopCh: make(chan struct{}), 105 } 106 107 bsp.stopWait.Add(1) 108 go func() { 109 defer bsp.stopWait.Done() 110 bsp.processQueue() 111 bsp.drainQueue() 112 }() 113 114 return bsp 115} 116 117// OnStart method does nothing. 118func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {} 119 120// OnEnd method enqueues a ReadOnlySpan for later processing. 121func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) { 122 // Do not enqueue spans if we are just going to drop them. 123 if bsp.e == nil { 124 return 125 } 126 bsp.enqueue(s.Snapshot()) 127} 128 129// Shutdown flushes the queue and waits until all spans are processed. 130// It only executes once. Subsequent call does nothing. 131func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { 132 var err error 133 bsp.stopOnce.Do(func() { 134 wait := make(chan struct{}) 135 go func() { 136 close(bsp.stopCh) 137 bsp.stopWait.Wait() 138 if bsp.e != nil { 139 if err := bsp.e.Shutdown(ctx); err != nil { 140 otel.Handle(err) 141 } 142 } 143 close(wait) 144 }() 145 // Wait until the wait group is done or the context is cancelled 146 select { 147 case <-wait: 148 case <-ctx.Done(): 149 err = ctx.Err() 150 } 151 }) 152 return err 153} 154 155// ForceFlush exports all ended spans that have not yet been exported. 156func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { 157 var err error 158 if bsp.e != nil { 159 wait := make(chan struct{}) 160 go func() { 161 if err := bsp.exportSpans(ctx); err != nil { 162 otel.Handle(err) 163 } 164 close(wait) 165 }() 166 // Wait until the export is finished or the context is cancelled/timed out 167 select { 168 case <-wait: 169 case <-ctx.Done(): 170 err = ctx.Err() 171 } 172 } 173 return err 174} 175 176func WithMaxQueueSize(size int) BatchSpanProcessorOption { 177 return func(o *BatchSpanProcessorOptions) { 178 o.MaxQueueSize = size 179 } 180} 181 182func WithMaxExportBatchSize(size int) BatchSpanProcessorOption { 183 return func(o *BatchSpanProcessorOptions) { 184 o.MaxExportBatchSize = size 185 } 186} 187 188func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption { 189 return func(o *BatchSpanProcessorOptions) { 190 o.BatchTimeout = delay 191 } 192} 193 194func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption { 195 return func(o *BatchSpanProcessorOptions) { 196 o.ExportTimeout = timeout 197 } 198} 199 200func WithBlocking() BatchSpanProcessorOption { 201 return func(o *BatchSpanProcessorOptions) { 202 o.BlockOnQueueFull = true 203 } 204} 205 206// exportSpans is a subroutine of processing and draining the queue. 207func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { 208 bsp.timer.Reset(bsp.o.BatchTimeout) 209 210 bsp.batchMutex.Lock() 211 defer bsp.batchMutex.Unlock() 212 213 if bsp.o.ExportTimeout > 0 { 214 var cancel context.CancelFunc 215 ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout) 216 defer cancel() 217 } 218 219 if len(bsp.batch) > 0 { 220 if err := bsp.e.ExportSpans(ctx, bsp.batch); err != nil { 221 return err 222 } 223 bsp.batch = bsp.batch[:0] 224 } 225 return nil 226} 227 228// processQueue removes spans from the `queue` channel until processor 229// is shut down. It calls the exporter in batches of up to MaxExportBatchSize 230// waiting up to BatchTimeout to form a batch. 231func (bsp *batchSpanProcessor) processQueue() { 232 defer bsp.timer.Stop() 233 234 ctx, cancel := context.WithCancel(context.Background()) 235 defer cancel() 236 for { 237 select { 238 case <-bsp.stopCh: 239 return 240 case <-bsp.timer.C: 241 if err := bsp.exportSpans(ctx); err != nil { 242 otel.Handle(err) 243 } 244 case sd := <-bsp.queue: 245 bsp.batchMutex.Lock() 246 bsp.batch = append(bsp.batch, sd) 247 shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize 248 bsp.batchMutex.Unlock() 249 if shouldExport { 250 if !bsp.timer.Stop() { 251 <-bsp.timer.C 252 } 253 if err := bsp.exportSpans(ctx); err != nil { 254 otel.Handle(err) 255 } 256 } 257 } 258 } 259} 260 261// drainQueue awaits the any caller that had added to bsp.stopWait 262// to finish the enqueue, then exports the final batch. 263func (bsp *batchSpanProcessor) drainQueue() { 264 ctx, cancel := context.WithCancel(context.Background()) 265 defer cancel() 266 for { 267 select { 268 case sd := <-bsp.queue: 269 if sd == nil { 270 if err := bsp.exportSpans(ctx); err != nil { 271 otel.Handle(err) 272 } 273 return 274 } 275 276 bsp.batchMutex.Lock() 277 bsp.batch = append(bsp.batch, sd) 278 shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize 279 bsp.batchMutex.Unlock() 280 281 if shouldExport { 282 if err := bsp.exportSpans(ctx); err != nil { 283 otel.Handle(err) 284 } 285 } 286 default: 287 close(bsp.queue) 288 } 289 } 290} 291 292func (bsp *batchSpanProcessor) enqueue(sd *SpanSnapshot) { 293 if !sd.SpanContext.IsSampled() { 294 return 295 } 296 297 // This ensures the bsp.queue<- below does not panic as the 298 // processor shuts down. 299 defer func() { 300 x := recover() 301 switch err := x.(type) { 302 case nil: 303 return 304 case runtime.Error: 305 if err.Error() == "send on closed channel" { 306 return 307 } 308 } 309 panic(x) 310 }() 311 312 select { 313 case <-bsp.stopCh: 314 return 315 default: 316 } 317 318 if bsp.o.BlockOnQueueFull { 319 bsp.queue <- sd 320 return 321 } 322 323 select { 324 case bsp.queue <- sd: 325 default: 326 atomic.AddUint32(&bsp.dropped, 1) 327 } 328} 329