1// Copyright (c) 2017-2018 Uber Technologies, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package jaeger 16 17import ( 18 "fmt" 19 "io" 20 "math/rand" 21 "os" 22 "reflect" 23 "strconv" 24 "sync" 25 "time" 26 27 "github.com/opentracing/opentracing-go" 28 "github.com/opentracing/opentracing-go/ext" 29 30 "github.com/uber/jaeger-client-go/internal/baggage" 31 "github.com/uber/jaeger-client-go/internal/throttler" 32 "github.com/uber/jaeger-client-go/log" 33 "github.com/uber/jaeger-client-go/utils" 34) 35 36// Tracer implements opentracing.Tracer. 37type Tracer struct { 38 serviceName string 39 hostIPv4 uint32 // this is for zipkin endpoint conversion 40 41 sampler SamplerV2 42 reporter Reporter 43 metrics Metrics 44 logger log.DebugLogger 45 46 timeNow func() time.Time 47 randomNumber func() uint64 48 49 options struct { 50 gen128Bit bool // whether to generate 128bit trace IDs 51 zipkinSharedRPCSpan bool 52 highTraceIDGenerator func() uint64 // custom high trace ID generator 53 maxTagValueLength int 54 noDebugFlagOnForcedSampling bool 55 maxLogsPerSpan int 56 // more options to come 57 } 58 // allocator of Span objects 59 spanAllocator SpanAllocator 60 61 injectors map[interface{}]Injector 62 extractors map[interface{}]Extractor 63 64 observer compositeObserver 65 66 tags []Tag 67 process Process 68 69 baggageRestrictionManager baggage.RestrictionManager 70 baggageSetter *baggageSetter 71 72 debugThrottler throttler.Throttler 73} 74 75// NewTracer creates Tracer implementation that reports tracing to Jaeger. 76// The returned io.Closer can be used in shutdown hooks to ensure that the internal 77// queue of the Reporter is drained and all buffered spans are submitted to collectors. 78// TODO (breaking change) return *Tracer only, without closer. 79func NewTracer( 80 serviceName string, 81 sampler Sampler, 82 reporter Reporter, 83 options ...TracerOption, 84) (opentracing.Tracer, io.Closer) { 85 t := &Tracer{ 86 serviceName: serviceName, 87 sampler: samplerV1toV2(sampler), 88 reporter: reporter, 89 injectors: make(map[interface{}]Injector), 90 extractors: make(map[interface{}]Extractor), 91 metrics: *NewNullMetrics(), 92 spanAllocator: simpleSpanAllocator{}, 93 } 94 95 for _, option := range options { 96 option(t) 97 } 98 99 // register default injectors/extractors unless they are already provided via options 100 textPropagator := NewTextMapPropagator(getDefaultHeadersConfig(), t.metrics) 101 t.addCodec(opentracing.TextMap, textPropagator, textPropagator) 102 103 httpHeaderPropagator := NewHTTPHeaderPropagator(getDefaultHeadersConfig(), t.metrics) 104 t.addCodec(opentracing.HTTPHeaders, httpHeaderPropagator, httpHeaderPropagator) 105 106 binaryPropagator := NewBinaryPropagator(t) 107 t.addCodec(opentracing.Binary, binaryPropagator, binaryPropagator) 108 109 // TODO remove after TChannel supports OpenTracing 110 interopPropagator := &jaegerTraceContextPropagator{tracer: t} 111 t.addCodec(SpanContextFormat, interopPropagator, interopPropagator) 112 113 zipkinPropagator := &zipkinPropagator{tracer: t} 114 t.addCodec(ZipkinSpanFormat, zipkinPropagator, zipkinPropagator) 115 116 if t.baggageRestrictionManager != nil { 117 t.baggageSetter = newBaggageSetter(t.baggageRestrictionManager, &t.metrics) 118 } else { 119 t.baggageSetter = newBaggageSetter(baggage.NewDefaultRestrictionManager(0), &t.metrics) 120 } 121 if t.debugThrottler == nil { 122 t.debugThrottler = throttler.DefaultThrottler{} 123 } 124 125 if t.randomNumber == nil { 126 seedGenerator := utils.NewRand(time.Now().UnixNano()) 127 pool := sync.Pool{ 128 New: func() interface{} { 129 return rand.NewSource(seedGenerator.Int63()) 130 }, 131 } 132 133 t.randomNumber = func() uint64 { 134 generator := pool.Get().(rand.Source) 135 number := uint64(generator.Int63()) 136 pool.Put(generator) 137 return number 138 } 139 } 140 if t.timeNow == nil { 141 t.timeNow = time.Now 142 } 143 if t.logger == nil { 144 t.logger = log.NullLogger 145 } 146 // Set tracer-level tags 147 t.tags = append(t.tags, Tag{key: JaegerClientVersionTagKey, value: JaegerClientVersion}) 148 if hostname, err := os.Hostname(); err == nil { 149 t.tags = append(t.tags, Tag{key: TracerHostnameTagKey, value: hostname}) 150 } 151 if ipval, ok := t.getTag(TracerIPTagKey); ok { 152 ipv4, err := utils.ParseIPToUint32(ipval.(string)) 153 if err != nil { 154 t.hostIPv4 = 0 155 t.logger.Error("Unable to convert the externally provided ip to uint32: " + err.Error()) 156 } else { 157 t.hostIPv4 = ipv4 158 } 159 } else if ip, err := utils.HostIP(); err == nil { 160 t.tags = append(t.tags, Tag{key: TracerIPTagKey, value: ip.String()}) 161 t.hostIPv4 = utils.PackIPAsUint32(ip) 162 } else { 163 t.logger.Error("Unable to determine this host's IP address: " + err.Error()) 164 } 165 166 if t.options.gen128Bit { 167 if t.options.highTraceIDGenerator == nil { 168 t.options.highTraceIDGenerator = t.randomNumber 169 } 170 } else if t.options.highTraceIDGenerator != nil { 171 t.logger.Error("Overriding high trace ID generator but not generating " + 172 "128 bit trace IDs, consider enabling the \"Gen128Bit\" option") 173 } 174 if t.options.maxTagValueLength == 0 { 175 t.options.maxTagValueLength = DefaultMaxTagValueLength 176 } 177 t.process = Process{ 178 Service: serviceName, 179 UUID: strconv.FormatUint(t.randomNumber(), 16), 180 Tags: t.tags, 181 } 182 if throttler, ok := t.debugThrottler.(ProcessSetter); ok { 183 throttler.SetProcess(t.process) 184 } 185 186 return t, t 187} 188 189// addCodec adds registers injector and extractor for given propagation format if not already defined. 190func (t *Tracer) addCodec(format interface{}, injector Injector, extractor Extractor) { 191 if _, ok := t.injectors[format]; !ok { 192 t.injectors[format] = injector 193 } 194 if _, ok := t.extractors[format]; !ok { 195 t.extractors[format] = extractor 196 } 197} 198 199// StartSpan implements StartSpan() method of opentracing.Tracer. 200func (t *Tracer) StartSpan( 201 operationName string, 202 options ...opentracing.StartSpanOption, 203) opentracing.Span { 204 sso := opentracing.StartSpanOptions{} 205 for _, o := range options { 206 o.Apply(&sso) 207 } 208 return t.startSpanWithOptions(operationName, sso) 209} 210 211func (t *Tracer) startSpanWithOptions( 212 operationName string, 213 options opentracing.StartSpanOptions, 214) opentracing.Span { 215 if options.StartTime.IsZero() { 216 options.StartTime = t.timeNow() 217 } 218 219 // Predicate whether the given span context is an empty reference 220 // or may be used as parent / debug ID / baggage items source 221 isEmptyReference := func(ctx SpanContext) bool { 222 return !ctx.IsValid() && !ctx.isDebugIDContainerOnly() && len(ctx.baggage) == 0 223 } 224 225 var references []Reference 226 var parent SpanContext 227 var hasParent bool // need this because `parent` is a value, not reference 228 var ctx SpanContext 229 var isSelfRef bool 230 for _, ref := range options.References { 231 ctxRef, ok := ref.ReferencedContext.(SpanContext) 232 if !ok { 233 t.logger.Error(fmt.Sprintf( 234 "Reference contains invalid type of SpanReference: %s", 235 reflect.ValueOf(ref.ReferencedContext))) 236 continue 237 } 238 if isEmptyReference(ctxRef) { 239 continue 240 } 241 242 if ref.Type == selfRefType { 243 isSelfRef = true 244 ctx = ctxRef 245 continue 246 } 247 248 if ctxRef.IsValid() { 249 // we don't want empty context that contains only debug-id or baggage 250 references = append(references, Reference{Type: ref.Type, Context: ctxRef}) 251 } 252 253 if !hasParent { 254 parent = ctxRef 255 hasParent = ref.Type == opentracing.ChildOfRef 256 } 257 } 258 if !hasParent && !isEmptyReference(parent) { 259 // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from 260 // the FollowFromRef as the parent 261 hasParent = true 262 } 263 264 rpcServer := false 265 if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok { 266 rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum)) 267 } 268 269 var internalTags []Tag 270 newTrace := false 271 if !isSelfRef { 272 if !hasParent || !parent.IsValid() { 273 newTrace = true 274 ctx.traceID.Low = t.randomID() 275 if t.options.gen128Bit { 276 ctx.traceID.High = t.options.highTraceIDGenerator() 277 } 278 ctx.spanID = SpanID(ctx.traceID.Low) 279 ctx.parentID = 0 280 ctx.samplingState = &samplingState{ 281 localRootSpan: ctx.spanID, 282 } 283 if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) { 284 ctx.samplingState.setDebugAndSampled() 285 internalTags = append(internalTags, Tag{key: JaegerDebugHeader, value: parent.debugID}) 286 } 287 } else { 288 ctx.traceID = parent.traceID 289 if rpcServer && t.options.zipkinSharedRPCSpan { 290 // Support Zipkin's one-span-per-RPC model 291 ctx.spanID = parent.spanID 292 ctx.parentID = parent.parentID 293 } else { 294 ctx.spanID = SpanID(t.randomID()) 295 ctx.parentID = parent.spanID 296 } 297 ctx.samplingState = parent.samplingState 298 if parent.remote { 299 ctx.samplingState.setFinal() 300 ctx.samplingState.localRootSpan = ctx.spanID 301 } 302 } 303 if hasParent { 304 // copy baggage items 305 if l := len(parent.baggage); l > 0 { 306 ctx.baggage = make(map[string]string, len(parent.baggage)) 307 for k, v := range parent.baggage { 308 ctx.baggage[k] = v 309 } 310 } 311 } 312 } 313 314 sp := t.newSpan() 315 sp.context = ctx 316 sp.tracer = t 317 sp.operationName = operationName 318 sp.startTime = options.StartTime 319 sp.duration = 0 320 sp.references = references 321 sp.firstInProcess = rpcServer || sp.context.parentID == 0 322 323 if !sp.context.isSamplingFinalized() { 324 decision := t.sampler.OnCreateSpan(sp) 325 sp.applySamplingDecision(decision, false) 326 } 327 sp.observer = t.observer.OnStartSpan(sp, operationName, options) 328 329 if tagsTotalLength := len(options.Tags) + len(internalTags); tagsTotalLength > 0 { 330 if sp.tags == nil || cap(sp.tags) < tagsTotalLength { 331 sp.tags = make([]Tag, 0, tagsTotalLength) 332 } 333 sp.tags = append(sp.tags, internalTags...) 334 for k, v := range options.Tags { 335 sp.setTagInternal(k, v, false) 336 } 337 } 338 t.emitNewSpanMetrics(sp, newTrace) 339 return sp 340} 341 342// Inject implements Inject() method of opentracing.Tracer 343func (t *Tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error { 344 c, ok := ctx.(SpanContext) 345 if !ok { 346 return opentracing.ErrInvalidSpanContext 347 } 348 if injector, ok := t.injectors[format]; ok { 349 return injector.Inject(c, carrier) 350 } 351 return opentracing.ErrUnsupportedFormat 352} 353 354// Extract implements Extract() method of opentracing.Tracer 355func (t *Tracer) Extract( 356 format interface{}, 357 carrier interface{}, 358) (opentracing.SpanContext, error) { 359 if extractor, ok := t.extractors[format]; ok { 360 spanCtx, err := extractor.Extract(carrier) 361 if err != nil { 362 return nil, err // ensure returned spanCtx is nil 363 } 364 spanCtx.remote = true 365 return spanCtx, nil 366 } 367 return nil, opentracing.ErrUnsupportedFormat 368} 369 370// Close releases all resources used by the Tracer and flushes any remaining buffered spans. 371func (t *Tracer) Close() error { 372 t.logger.Debugf("closing tracer") 373 t.reporter.Close() 374 t.sampler.Close() 375 if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok { 376 _ = mgr.Close() 377 } 378 if throttler, ok := t.debugThrottler.(io.Closer); ok { 379 _ = throttler.Close() 380 } 381 return nil 382} 383 384// Tags returns a slice of tracer-level tags. 385func (t *Tracer) Tags() []opentracing.Tag { 386 tags := make([]opentracing.Tag, len(t.tags)) 387 for i, tag := range t.tags { 388 tags[i] = opentracing.Tag{Key: tag.key, Value: tag.value} 389 } 390 return tags 391} 392 393// getTag returns the value of specific tag, if not exists, return nil. 394// TODO only used by tests, move there. 395func (t *Tracer) getTag(key string) (interface{}, bool) { 396 for _, tag := range t.tags { 397 if tag.key == key { 398 return tag.value, true 399 } 400 } 401 return nil, false 402} 403 404// newSpan returns an instance of a clean Span object. 405// If options.PoolSpans is true, the spans are retrieved from an object pool. 406func (t *Tracer) newSpan() *Span { 407 return t.spanAllocator.Get() 408} 409 410// emitNewSpanMetrics generates metrics on the number of started spans and traces. 411// newTrace param: we cannot simply check for parentID==0 because in Zipkin model the 412// server-side RPC span has the exact same trace/span/parent IDs as the 413// calling client-side span, but obviously the server side span is 414// no longer a root span of the trace. 415func (t *Tracer) emitNewSpanMetrics(sp *Span, newTrace bool) { 416 if !sp.context.isSamplingFinalized() { 417 t.metrics.SpansStartedDelayedSampling.Inc(1) 418 if newTrace { 419 t.metrics.TracesStartedDelayedSampling.Inc(1) 420 } 421 // joining a trace is not possible, because sampling decision inherited from upstream is final 422 } else if sp.context.IsSampled() { 423 t.metrics.SpansStartedSampled.Inc(1) 424 if newTrace { 425 t.metrics.TracesStartedSampled.Inc(1) 426 } else if sp.firstInProcess { 427 t.metrics.TracesJoinedSampled.Inc(1) 428 } 429 } else { 430 t.metrics.SpansStartedNotSampled.Inc(1) 431 if newTrace { 432 t.metrics.TracesStartedNotSampled.Inc(1) 433 } else if sp.firstInProcess { 434 t.metrics.TracesJoinedNotSampled.Inc(1) 435 } 436 } 437} 438 439func (t *Tracer) reportSpan(sp *Span) { 440 ctx := sp.SpanContext() 441 442 if !ctx.isSamplingFinalized() { 443 t.metrics.SpansFinishedDelayedSampling.Inc(1) 444 } else if ctx.IsSampled() { 445 t.metrics.SpansFinishedSampled.Inc(1) 446 } else { 447 t.metrics.SpansFinishedNotSampled.Inc(1) 448 } 449 450 // Note: if the reporter is processing Span asynchronously then it needs to Retain() the span, 451 // and then Release() it when no longer needed. 452 // Otherwise, the span may be reused for another trace and its data may be overwritten. 453 if ctx.IsSampled() { 454 t.reporter.Report(sp) 455 } 456 457 sp.Release() 458} 459 460// randomID generates a random trace/span ID, using tracer.random() generator. 461// It never returns 0. 462func (t *Tracer) randomID() uint64 { 463 val := t.randomNumber() 464 for val == 0 { 465 val = t.randomNumber() 466 } 467 return val 468} 469 470// (NB) span must hold the lock before making this call 471func (t *Tracer) setBaggage(sp *Span, key, value string) { 472 t.baggageSetter.setBaggage(sp, key, value) 473} 474 475// (NB) span must hold the lock before making this call 476func (t *Tracer) isDebugAllowed(operation string) bool { 477 return t.debugThrottler.IsAllowed(operation) 478} 479 480// Sampler returns the sampler given to the tracer at creation. 481func (t *Tracer) Sampler() SamplerV2 { 482 return t.sampler 483} 484 485// SelfRef creates an opentracing compliant SpanReference from a jaeger 486// SpanContext. This is a factory function in order to encapsulate jaeger specific 487// types. 488func SelfRef(ctx SpanContext) opentracing.SpanReference { 489 return opentracing.SpanReference{ 490 Type: selfRefType, 491 ReferencedContext: ctx, 492 } 493} 494