1// Package lightstep implements the LightStep OpenTracing client for Go. 2package lightstep 3 4import ( 5 "context" 6 "fmt" 7 "runtime" 8 "sync" 9 "time" 10 11 "github.com/opentracing/opentracing-go" 12) 13 14// Tracer extends the `opentracing.Tracer` interface with methods for manual 15// flushing and closing. To access these methods, you can take the global 16// tracer and typecast it to a `lightstep.Tracer`. As a convenience, the 17// lightstep package provides static functions which perform the typecasting. 18type Tracer interface { 19 opentracing.Tracer 20 21 // Close flushes and then terminates the LightStep collector 22 Close(context.Context) 23 // Flush sends all spans currently in the buffer to the LighStep collector 24 Flush(context.Context) 25 // Options gets the Options used in New() or NewWithOptions(). 26 Options() Options 27 // Disable prevents the tracer from recording spans or flushing 28 Disable() 29} 30 31// Implements the `Tracer` interface. Buffers spans and forwards to a Lightstep collector. 32type tracerImpl struct { 33 ////////////////////////////////////////////////////////////// 34 // IMMUTABLE IMMUTABLE IMMUTABLE IMMUTABLE IMMUTABLE IMMUTABLE 35 ////////////////////////////////////////////////////////////// 36 37 // Note: there may be a desire to update some of these fields 38 // at runtime, in which case suitable changes may be needed 39 // for variables accessed during Flush. 40 41 reporterID uint64 // the LightStep tracer guid 42 opts Options 43 44 // report loop management 45 closeOnce sync.Once 46 closeReportLoopChannel chan struct{} 47 reportLoopClosedChannel chan struct{} 48 49 converter *protoConverter 50 accessToken string 51 attributes map[string]string 52 53 ////////////////////////////////////////////////////////// 54 // MUTABLE MUTABLE MUTABLE MUTABLE MUTABLE MUTABLE MUTABLE 55 ////////////////////////////////////////////////////////// 56 57 // the following fields are modified under `lock`. 58 lock sync.Mutex 59 60 // Remote service that will receive reports. 61 client collectorClient 62 connection Connection 63 64 // Two buffers of data. 65 buffer reportBuffer 66 flushing reportBuffer 67 68 // Flush state. 69 flushingLock sync.Mutex 70 reportInFlight bool 71 lastReportAttempt time.Time 72 73 // Meta Event Reporting can be enabled at tracer creation or on-demand by satellite 74 metaEventReportingEnabled bool 75 // Set to true on first report 76 firstReportHasRun bool 77 78 // We allow our remote peer to disable this instrumentation at any 79 // time, turning all potentially costly runtime operations into 80 // no-ops. 81 // 82 // TODO this should use atomic load/store to test disabled 83 // prior to taking the lock, do please. 84 disabled bool 85 86 // Map of propagators used to determine the correct propagator to use 87 // based on the format passed into Inject/Extract. Supports one 88 // propagator for each of the formats: TextMap, HTTPHeaders, Binary 89 propagators map[opentracing.BuiltinFormat]Propagator 90} 91 92// NewTracer creates and starts a new Lightstep Tracer. 93// In case of error, we emit event and return nil. 94func NewTracer(opts Options) Tracer { 95 tr, err := CreateTracer(opts) 96 if err != nil { 97 emitEvent(newEventStartError(err)) 98 return nil 99 } 100 return tr 101} 102 103// CreateTracer creates and starts a new Lightstep Tracer. 104// It is meant to replace NewTracer which does not propagate the error. 105func CreateTracer(opts Options) (Tracer, error) { 106 if err := opts.Initialize(); err != nil { 107 return nil, fmt.Errorf("init; err: %v", err) 108 } 109 110 attributes := map[string]string{} 111 for k, v := range opts.Tags { 112 attributes[k] = fmt.Sprint(v) 113 } 114 // Don't let the GrpcOptions override these values. That would be confusing. 115 attributes[TracerPlatformKey] = TracerPlatformValue 116 attributes[TracerPlatformVersionKey] = runtime.Version() 117 attributes[TracerVersionKey] = TracerVersionValue 118 119 now := time.Now() 120 impl := &tracerImpl{ 121 opts: opts, 122 reporterID: genSeededGUID(), 123 buffer: newSpansBuffer(opts.MaxBufferedSpans), 124 flushing: newSpansBuffer(opts.MaxBufferedSpans), 125 closeReportLoopChannel: make(chan struct{}), 126 reportLoopClosedChannel: make(chan struct{}), 127 converter: newProtoConverter(opts), 128 accessToken: opts.AccessToken, 129 attributes: attributes, 130 } 131 132 impl.buffer.setCurrent(now) 133 134 var err error 135 impl.client, err = newCollectorClient(opts) 136 if err != nil { 137 return nil, fmt.Errorf("create collector client; err: %v", err) 138 } 139 140 conn, err := impl.client.ConnectClient() 141 if err != nil { 142 return nil, err 143 } 144 impl.connection = conn 145 146 // set meta reporting to defined option 147 impl.metaEventReportingEnabled = opts.MetaEventReportingEnabled 148 impl.firstReportHasRun = false 149 150 go impl.reportLoop() 151 152 impl.propagators = map[opentracing.BuiltinFormat]Propagator{ 153 opentracing.TextMap: theLightStepPropagator, 154 opentracing.HTTPHeaders: theLightStepPropagator, 155 opentracing.Binary: theBinaryPropagator, 156 } 157 158 if opts.Propagator == "b3" { 159 impl.propagators[opentracing.TextMap] = theB3Propagator 160 impl.propagators[opentracing.HTTPHeaders] = theB3Propagator 161 } 162 163 return impl, nil 164} 165 166func (tracer *tracerImpl) Options() Options { 167 return tracer.opts 168} 169 170func (tracer *tracerImpl) StartSpan( 171 operationName string, 172 sso ...opentracing.StartSpanOption, 173) opentracing.Span { 174 return newSpan(operationName, tracer, sso) 175} 176 177func (tracer *tracerImpl) Inject(sc opentracing.SpanContext, format interface{}, carrier interface{}) error { 178 if tracer.opts.MetaEventReportingEnabled { 179 opentracing.StartSpan(LSMetaEvent_InjectOperation, 180 opentracing.Tag{Key: LSMetaEvent_MetaEventKey, Value: true}, 181 opentracing.Tag{Key: LSMetaEvent_TraceIdKey, Value: sc.(SpanContext).TraceID}, 182 opentracing.Tag{Key: LSMetaEvent_SpanIdKey, Value: sc.(SpanContext).SpanID}, 183 opentracing.Tag{Key: LSMetaEvent_PropagationFormatKey, Value: format}). 184 Finish() 185 } 186 187 builtin, ok := format.(opentracing.BuiltinFormat) 188 if !ok { 189 return opentracing.ErrUnsupportedFormat 190 } 191 return tracer.propagators[builtin].Inject(sc, carrier) 192} 193 194func (tracer *tracerImpl) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) { 195 if tracer.opts.MetaEventReportingEnabled { 196 opentracing.StartSpan(LSMetaEvent_ExtractOperation, 197 opentracing.Tag{Key: LSMetaEvent_MetaEventKey, Value: true}, 198 opentracing.Tag{Key: LSMetaEvent_PropagationFormatKey, Value: format}). 199 Finish() 200 } 201 builtin, ok := format.(opentracing.BuiltinFormat) 202 if !ok { 203 return nil, opentracing.ErrUnsupportedFormat 204 } 205 return tracer.propagators[builtin].Extract(carrier) 206} 207 208func (tracer *tracerImpl) reconnectClient(now time.Time) { 209 conn, err := tracer.client.ConnectClient() 210 if err != nil { 211 emitEvent(newEventConnectionError(err)) 212 } else { 213 tracer.lock.Lock() 214 oldConn := tracer.connection 215 tracer.connection = conn 216 tracer.lock.Unlock() 217 218 oldConn.Close() 219 } 220} 221 222// Close flushes and then terminates the LightStep collector. Close may only be 223// called once; subsequent calls to Close are no-ops. 224func (tracer *tracerImpl) Close(ctx context.Context) { 225 tracer.closeOnce.Do(func() { 226 // notify report loop that we are closing 227 close(tracer.closeReportLoopChannel) 228 select { 229 case <-tracer.reportLoopClosedChannel: 230 tracer.Flush(ctx) 231 case <-ctx.Done(): 232 return 233 } 234 235 // now its safe to close the connection 236 tracer.lock.Lock() 237 conn := tracer.connection 238 tracer.connection = nil 239 tracer.lock.Unlock() 240 241 if conn != nil { 242 err := conn.Close() 243 if err != nil { 244 emitEvent(newEventConnectionError(err)) 245 } 246 } 247 }) 248} 249 250// RecordSpan records a finished Span. 251func (tracer *tracerImpl) RecordSpan(raw RawSpan) { 252 tracer.lock.Lock() 253 254 // Early-out for disabled runtimes 255 if tracer.disabled { 256 tracer.lock.Unlock() 257 return 258 } 259 260 tracer.buffer.addSpan(raw) 261 tracer.lock.Unlock() 262 263 if tracer.opts.Recorder != nil { 264 tracer.opts.Recorder.RecordSpan(raw) 265 } 266} 267 268// Flush sends all buffered data to the collector. 269func (tracer *tracerImpl) Flush(ctx context.Context) { 270 tracer.flushingLock.Lock() 271 defer tracer.flushingLock.Unlock() 272 273 if errorEvent := tracer.preFlush(); errorEvent != nil { 274 emitEvent(errorEvent) 275 return 276 } 277 278 if tracer.opts.MetaEventReportingEnabled && !tracer.firstReportHasRun { 279 opentracing.StartSpan(LSMetaEvent_TracerCreateOperation, 280 opentracing.Tag{Key: LSMetaEvent_MetaEventKey, Value: true}, 281 opentracing.Tag{Key: LSMetaEvent_TracerGuidKey, Value: tracer.reporterID}). 282 Finish() 283 tracer.firstReportHasRun = true 284 } 285 286 ctx, cancel := context.WithTimeout(ctx, tracer.opts.ReportTimeout) 287 defer cancel() 288 289 protoReq := tracer.converter.toReportRequest( 290 tracer.reporterID, 291 tracer.attributes, 292 tracer.accessToken, 293 &tracer.flushing, 294 ) 295 req, err := tracer.client.Translate(protoReq) 296 if err != nil { 297 errorEvent := newEventFlushError(err, FlushErrorTranslate) 298 emitEvent(errorEvent) 299 // call postflush to prevent the tracer from going into an invalid state. 300 emitEvent(tracer.postFlush(errorEvent)) 301 return 302 } 303 304 var reportErrorEvent *eventFlushError 305 resp, err := tracer.client.Report(ctx, req) 306 if err != nil { 307 reportErrorEvent = newEventFlushError(err, FlushErrorTransport) 308 } else if len(resp.GetErrors()) > 0 { 309 reportErrorEvent = newEventFlushError(fmt.Errorf(resp.GetErrors()[0]), FlushErrorReport) 310 } 311 312 if reportErrorEvent != nil { 313 emitEvent(reportErrorEvent) 314 } 315 emitEvent(tracer.postFlush(reportErrorEvent)) 316 317 if err == nil && resp.DevMode() { 318 tracer.metaEventReportingEnabled = true 319 } 320 321 if err == nil && !resp.DevMode() { 322 tracer.metaEventReportingEnabled = false 323 } 324 325 if err == nil && resp.Disable() { 326 tracer.Disable() 327 } 328} 329 330// preFlush handles lock-protected data manipulation before flushing 331func (tracer *tracerImpl) preFlush() *eventFlushError { 332 tracer.lock.Lock() 333 defer tracer.lock.Unlock() 334 335 if tracer.disabled { 336 return newEventFlushError(errFlushFailedTracerClosed, FlushErrorTracerDisabled) 337 } 338 339 if tracer.connection == nil { 340 return newEventFlushError(errFlushFailedTracerClosed, FlushErrorTracerClosed) 341 } 342 343 now := time.Now() 344 tracer.buffer, tracer.flushing = tracer.flushing, tracer.buffer 345 tracer.reportInFlight = true 346 tracer.flushing.setFlushing(now) 347 tracer.buffer.setCurrent(now) 348 tracer.lastReportAttempt = now 349 return nil 350} 351 352// postFlush handles lock-protected data manipulation after flushing 353func (tracer *tracerImpl) postFlush(flushEventError *eventFlushError) *eventStatusReport { 354 tracer.lock.Lock() 355 defer tracer.lock.Unlock() 356 357 tracer.reportInFlight = false 358 359 statusReportEvent := newEventStatusReport( 360 tracer.flushing.reportStart, 361 tracer.flushing.reportEnd, 362 len(tracer.flushing.rawSpans), 363 int(tracer.flushing.droppedSpanCount+tracer.buffer.droppedSpanCount), 364 int(tracer.flushing.logEncoderErrorCount+tracer.buffer.logEncoderErrorCount), 365 ) 366 367 if flushEventError == nil { 368 tracer.flushing.clear() 369 return statusReportEvent 370 } 371 372 switch flushEventError.State() { 373 case FlushErrorTranslate: 374 // When there's a translation error, we do not want to retry. 375 tracer.flushing.clear() 376 default: 377 // Restore the records that did not get sent correctly 378 tracer.buffer.mergeFrom(&tracer.flushing) 379 } 380 381 statusReportEvent.SetSentSpans(0) 382 383 return statusReportEvent 384} 385 386func (tracer *tracerImpl) Disable() { 387 tracer.lock.Lock() 388 if tracer.disabled { 389 tracer.lock.Unlock() 390 return 391 } 392 tracer.disabled = true 393 tracer.buffer.clear() 394 tracer.lock.Unlock() 395 396 emitEvent(newEventTracerDisabled()) 397} 398 399// Every MinReportingPeriod the reporting loop wakes up and checks to see if 400// either (a) the Runtime's max reporting period is about to expire (see 401// maxReportingPeriod()), (b) the number of buffered log records is 402// approaching kMaxBufferedLogs, or if (c) the number of buffered span records 403// is approaching kMaxBufferedSpans. If any of those conditions are true, 404// pending data is flushed to the remote peer. If not, the reporting loop waits 405// until the next cycle. See Runtime.maybeFlush() for details. 406// 407// This could alternatively be implemented using flush channels and so forth, 408// but that would introduce opportunities for client code to block on the 409// runtime library, and we want to avoid that at all costs (even dropping data, 410// which can certainly happen with high data rates and/or unresponsive remote 411// peers). 412 413func (tracer *tracerImpl) shouldFlushLocked(now time.Time) bool { 414 if now.Add(tracer.opts.MinReportingPeriod).Sub(tracer.lastReportAttempt) > tracer.opts.ReportingPeriod { 415 return true 416 } else if tracer.buffer.isHalfFull() { 417 return true 418 } 419 return false 420} 421 422func (tracer *tracerImpl) reportLoop() { 423 tickerChan := time.Tick(tracer.opts.MinReportingPeriod) 424 for { 425 select { 426 case <-tickerChan: 427 now := time.Now() 428 429 tracer.lock.Lock() 430 disabled := tracer.disabled 431 reconnect := !tracer.reportInFlight && tracer.client.ShouldReconnect() 432 shouldFlush := tracer.shouldFlushLocked(now) 433 tracer.lock.Unlock() 434 435 if disabled { 436 return 437 } 438 if shouldFlush { 439 tracer.Flush(context.Background()) 440 } 441 if reconnect { 442 tracer.reconnectClient(now) 443 } 444 case <-tracer.closeReportLoopChannel: 445 close(tracer.reportLoopClosedChannel) 446 return 447 } 448 } 449} 450