1// Copyright (c) 2017 Uber Technologies, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package jaeger 16 17import ( 18 "fmt" 19 "sync" 20 "sync/atomic" 21 "time" 22 23 "github.com/opentracing/opentracing-go" 24 25 "github.com/uber/jaeger-client-go/internal/reporterstats" 26 "github.com/uber/jaeger-client-go/log" 27) 28 29// Reporter is called by the tracer when a span is completed to report the span to the tracing collector. 30type Reporter interface { 31 // Report submits a new span to collectors, possibly asynchronously and/or with buffering. 32 // If the reporter is processing Span asynchronously then it needs to Retain() the span, 33 // and then Release() it when no longer needed, to avoid span data corruption. 34 Report(span *Span) 35 36 // Close does a clean shutdown of the reporter, flushing any traces that may be buffered in memory. 37 Close() 38} 39 40// ------------------------------ 41 42type nullReporter struct{} 43 44// NewNullReporter creates a no-op reporter that ignores all reported spans. 45func NewNullReporter() Reporter { 46 return &nullReporter{} 47} 48 49// Report implements Report() method of Reporter by doing nothing. 50func (r *nullReporter) Report(span *Span) { 51 // no-op 52} 53 54// Close implements Close() method of Reporter by doing nothing. 55func (r *nullReporter) Close() { 56 // no-op 57} 58 59// ------------------------------ 60 61type loggingReporter struct { 62 logger Logger 63} 64 65// NewLoggingReporter creates a reporter that logs all reported spans to provided logger. 66func NewLoggingReporter(logger Logger) Reporter { 67 return &loggingReporter{logger} 68} 69 70// Report implements Report() method of Reporter by logging the span to the logger. 71func (r *loggingReporter) Report(span *Span) { 72 r.logger.Infof("Reporting span %+v", span) 73} 74 75// Close implements Close() method of Reporter by doing nothing. 76func (r *loggingReporter) Close() { 77 // no-op 78} 79 80// ------------------------------ 81 82// InMemoryReporter is used for testing, and simply collects spans in memory. 83type InMemoryReporter struct { 84 spans []opentracing.Span 85 lock sync.Mutex 86} 87 88// NewInMemoryReporter creates a reporter that stores spans in memory. 89// NOTE: the Tracer should be created with options.PoolSpans = false. 90func NewInMemoryReporter() *InMemoryReporter { 91 return &InMemoryReporter{ 92 spans: make([]opentracing.Span, 0, 10), 93 } 94} 95 96// Report implements Report() method of Reporter by storing the span in the buffer. 97func (r *InMemoryReporter) Report(span *Span) { 98 r.lock.Lock() 99 // Need to retain the span otherwise it will be released 100 r.spans = append(r.spans, span.Retain()) 101 r.lock.Unlock() 102} 103 104// Close implements Close() method of Reporter 105func (r *InMemoryReporter) Close() { 106 r.Reset() 107} 108 109// SpansSubmitted returns the number of spans accumulated in the buffer. 110func (r *InMemoryReporter) SpansSubmitted() int { 111 r.lock.Lock() 112 defer r.lock.Unlock() 113 return len(r.spans) 114} 115 116// GetSpans returns accumulated spans as a copy of the buffer. 117func (r *InMemoryReporter) GetSpans() []opentracing.Span { 118 r.lock.Lock() 119 defer r.lock.Unlock() 120 copied := make([]opentracing.Span, len(r.spans)) 121 copy(copied, r.spans) 122 return copied 123} 124 125// Reset clears all accumulated spans. 126func (r *InMemoryReporter) Reset() { 127 r.lock.Lock() 128 defer r.lock.Unlock() 129 130 // Before reset the collection need to release Span memory 131 for _, span := range r.spans { 132 span.(*Span).Release() 133 } 134 r.spans = r.spans[:0] 135} 136 137// ------------------------------ 138 139type compositeReporter struct { 140 reporters []Reporter 141} 142 143// NewCompositeReporter creates a reporter that ignores all reported spans. 144func NewCompositeReporter(reporters ...Reporter) Reporter { 145 return &compositeReporter{reporters: reporters} 146} 147 148// Report implements Report() method of Reporter by delegating to each underlying reporter. 149func (r *compositeReporter) Report(span *Span) { 150 for _, reporter := range r.reporters { 151 reporter.Report(span) 152 } 153} 154 155// Close implements Close() method of Reporter by closing each underlying reporter. 156func (r *compositeReporter) Close() { 157 for _, reporter := range r.reporters { 158 reporter.Close() 159 } 160} 161 162// ------------- REMOTE REPORTER ----------------- 163 164type reporterQueueItemType int 165 166const ( 167 defaultQueueSize = 100 168 defaultBufferFlushInterval = 1 * time.Second 169 170 reporterQueueItemSpan reporterQueueItemType = iota 171 reporterQueueItemClose 172) 173 174type reporterQueueItem struct { 175 itemType reporterQueueItemType 176 span *Span 177 close *sync.WaitGroup 178} 179 180// reporterStats implements reporterstats.ReporterStats. 181type reporterStats struct { 182 droppedCount int64 // provided to Transports to report data loss to the backend 183} 184 185// SpansDroppedFromQueue implements reporterstats.ReporterStats. 186func (r *reporterStats) SpansDroppedFromQueue() int64 { 187 return atomic.LoadInt64(&r.droppedCount) 188} 189 190func (r *reporterStats) incDroppedCount() { 191 atomic.AddInt64(&r.droppedCount, 1) 192} 193 194type remoteReporter struct { 195 // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment. 196 // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq 197 queueLength int64 // used to update metrics.Gauge 198 closed int64 // 0 - not closed, 1 - closed 199 200 reporterOptions 201 202 sender Transport 203 queue chan reporterQueueItem 204 reporterStats *reporterStats 205} 206 207// NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender. 208// Calls to Report(Span) return immediately (side effect: if internal buffer is full the span is dropped). 209// Periodically the transport buffer is flushed even if it hasn't reached max packet size. 210// Calls to Close() block until all spans reported prior to the call to Close are flushed. 211func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter { 212 options := reporterOptions{} 213 for _, option := range opts { 214 option(&options) 215 } 216 if options.bufferFlushInterval <= 0 { 217 options.bufferFlushInterval = defaultBufferFlushInterval 218 } 219 if options.logger == nil { 220 options.logger = log.NullLogger 221 } 222 if options.metrics == nil { 223 options.metrics = NewNullMetrics() 224 } 225 if options.queueSize <= 0 { 226 options.queueSize = defaultQueueSize 227 } 228 reporter := &remoteReporter{ 229 reporterOptions: options, 230 sender: sender, 231 queue: make(chan reporterQueueItem, options.queueSize), 232 reporterStats: new(reporterStats), 233 } 234 if receiver, ok := sender.(reporterstats.Receiver); ok { 235 receiver.SetReporterStats(reporter.reporterStats) 236 } 237 go reporter.processQueue() 238 return reporter 239} 240 241// Report implements Report() method of Reporter. 242// It passes the span to a background go-routine for submission to Jaeger backend. 243// If the internal queue is full, the span is dropped and metrics.ReporterDropped counter is incremented. 244// If Report() is called after the reporter has been Close()-ed, the additional spans will not be 245// sent to the backend, but the metrics.ReporterDropped counter may not reflect them correctly, 246// because some of them may still be successfully added to the queue. 247func (r *remoteReporter) Report(span *Span) { 248 select { 249 // Need to retain the span otherwise it will be released 250 case r.queue <- reporterQueueItem{itemType: reporterQueueItemSpan, span: span.Retain()}: 251 atomic.AddInt64(&r.queueLength, 1) 252 default: 253 r.metrics.ReporterDropped.Inc(1) 254 r.reporterStats.incDroppedCount() 255 } 256} 257 258// Close implements Close() method of Reporter by waiting for the queue to be drained. 259func (r *remoteReporter) Close() { 260 r.logger.Debugf("closing reporter") 261 if swapped := atomic.CompareAndSwapInt64(&r.closed, 0, 1); !swapped { 262 r.logger.Error("Repeated attempt to close the reporter is ignored") 263 return 264 } 265 r.sendCloseEvent() 266 _ = r.sender.Close() 267} 268 269func (r *remoteReporter) sendCloseEvent() { 270 wg := &sync.WaitGroup{} 271 wg.Add(1) 272 item := reporterQueueItem{itemType: reporterQueueItemClose, close: wg} 273 274 r.queue <- item // if the queue is full we will block until there is space 275 atomic.AddInt64(&r.queueLength, 1) 276 wg.Wait() 277} 278 279// processQueue reads spans from the queue, converts them to Thrift, and stores them in an internal buffer. 280// When the buffer length reaches batchSize, it is flushed by submitting the accumulated spans to Jaeger. 281// Buffer also gets flushed automatically every batchFlushInterval seconds, just in case the tracer stopped 282// reporting new spans. 283func (r *remoteReporter) processQueue() { 284 // flush causes the Sender to flush its accumulated spans and clear the buffer 285 flush := func() { 286 if flushed, err := r.sender.Flush(); err != nil { 287 r.metrics.ReporterFailure.Inc(int64(flushed)) 288 r.logger.Error(fmt.Sprintf("failed to flush Jaeger spans to server: %s", err.Error())) 289 } else if flushed > 0 { 290 r.metrics.ReporterSuccess.Inc(int64(flushed)) 291 } 292 } 293 294 timer := time.NewTicker(r.bufferFlushInterval) 295 for { 296 select { 297 case <-timer.C: 298 flush() 299 case item := <-r.queue: 300 atomic.AddInt64(&r.queueLength, -1) 301 switch item.itemType { 302 case reporterQueueItemSpan: 303 span := item.span 304 if flushed, err := r.sender.Append(span); err != nil { 305 r.metrics.ReporterFailure.Inc(int64(flushed)) 306 r.logger.Error(fmt.Sprintf("error reporting Jaeger span %q: %s", span.OperationName(), err.Error())) 307 } else if flushed > 0 { 308 r.metrics.ReporterSuccess.Inc(int64(flushed)) 309 // to reduce the number of gauge stats, we only emit queue length on flush 310 r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength)) 311 r.logger.Debugf("flushed %d spans", flushed) 312 } 313 span.Release() 314 case reporterQueueItemClose: 315 timer.Stop() 316 flush() 317 item.close.Done() 318 return 319 } 320 } 321 } 322} 323