1// Copyright (c) 2017-2018 Uber Technologies, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package jaeger 16 17import ( 18 "fmt" 19 "io" 20 "math/rand" 21 "os" 22 "reflect" 23 "strconv" 24 "sync" 25 "time" 26 27 "github.com/opentracing/opentracing-go" 28 "github.com/opentracing/opentracing-go/ext" 29 30 "github.com/uber/jaeger-client-go/internal/baggage" 31 "github.com/uber/jaeger-client-go/internal/throttler" 32 "github.com/uber/jaeger-client-go/log" 33 "github.com/uber/jaeger-client-go/utils" 34) 35 36// Tracer implements opentracing.Tracer. 37type Tracer struct { 38 serviceName string 39 hostIPv4 uint32 // this is for zipkin endpoint conversion 40 41 sampler Sampler 42 reporter Reporter 43 metrics Metrics 44 logger log.Logger 45 46 timeNow func() time.Time 47 randomNumber func() uint64 48 49 options struct { 50 poolSpans bool 51 gen128Bit bool // whether to generate 128bit trace IDs 52 zipkinSharedRPCSpan bool 53 highTraceIDGenerator func() uint64 // custom high trace ID generator 54 maxTagValueLength int 55 // more options to come 56 } 57 // pool for Span objects 58 spanPool sync.Pool 59 60 injectors map[interface{}]Injector 61 extractors map[interface{}]Extractor 62 63 observer compositeObserver 64 65 tags []Tag 66 process Process 67 68 baggageRestrictionManager baggage.RestrictionManager 69 baggageSetter *baggageSetter 70 71 debugThrottler throttler.Throttler 72} 73 74// NewTracer creates Tracer implementation that reports tracing to Jaeger. 75// The returned io.Closer can be used in shutdown hooks to ensure that the internal 76// queue of the Reporter is drained and all buffered spans are submitted to collectors. 77func NewTracer( 78 serviceName string, 79 sampler Sampler, 80 reporter Reporter, 81 options ...TracerOption, 82) (opentracing.Tracer, io.Closer) { 83 t := &Tracer{ 84 serviceName: serviceName, 85 sampler: sampler, 86 reporter: reporter, 87 injectors: make(map[interface{}]Injector), 88 extractors: make(map[interface{}]Extractor), 89 metrics: *NewNullMetrics(), 90 spanPool: sync.Pool{New: func() interface{} { 91 return &Span{} 92 }}, 93 } 94 95 for _, option := range options { 96 option(t) 97 } 98 99 // register default injectors/extractors unless they are already provided via options 100 textPropagator := newTextMapPropagator(getDefaultHeadersConfig(), t.metrics) 101 t.addCodec(opentracing.TextMap, textPropagator, textPropagator) 102 103 httpHeaderPropagator := newHTTPHeaderPropagator(getDefaultHeadersConfig(), t.metrics) 104 t.addCodec(opentracing.HTTPHeaders, httpHeaderPropagator, httpHeaderPropagator) 105 106 binaryPropagator := newBinaryPropagator(t) 107 t.addCodec(opentracing.Binary, binaryPropagator, binaryPropagator) 108 109 // TODO remove after TChannel supports OpenTracing 110 interopPropagator := &jaegerTraceContextPropagator{tracer: t} 111 t.addCodec(SpanContextFormat, interopPropagator, interopPropagator) 112 113 zipkinPropagator := &zipkinPropagator{tracer: t} 114 t.addCodec(ZipkinSpanFormat, zipkinPropagator, zipkinPropagator) 115 116 if t.baggageRestrictionManager != nil { 117 t.baggageSetter = newBaggageSetter(t.baggageRestrictionManager, &t.metrics) 118 } else { 119 t.baggageSetter = newBaggageSetter(baggage.NewDefaultRestrictionManager(0), &t.metrics) 120 } 121 if t.debugThrottler == nil { 122 t.debugThrottler = throttler.DefaultThrottler{} 123 } 124 125 if t.randomNumber == nil { 126 seedGenerator := utils.NewRand(time.Now().UnixNano()) 127 pool := sync.Pool{ 128 New: func() interface{} { 129 return rand.NewSource(seedGenerator.Int63()) 130 }, 131 } 132 133 t.randomNumber = func() uint64 { 134 generator := pool.Get().(rand.Source) 135 number := uint64(generator.Int63()) 136 pool.Put(generator) 137 return number 138 } 139 } 140 if t.timeNow == nil { 141 t.timeNow = time.Now 142 } 143 if t.logger == nil { 144 t.logger = log.NullLogger 145 } 146 // Set tracer-level tags 147 t.tags = append(t.tags, Tag{key: JaegerClientVersionTagKey, value: JaegerClientVersion}) 148 if hostname, err := os.Hostname(); err == nil { 149 t.tags = append(t.tags, Tag{key: TracerHostnameTagKey, value: hostname}) 150 } 151 if ip, err := utils.HostIP(); err == nil { 152 t.tags = append(t.tags, Tag{key: TracerIPTagKey, value: ip.String()}) 153 t.hostIPv4 = utils.PackIPAsUint32(ip) 154 } else { 155 t.logger.Error("Unable to determine this host's IP address: " + err.Error()) 156 } 157 158 if t.options.gen128Bit { 159 if t.options.highTraceIDGenerator == nil { 160 t.options.highTraceIDGenerator = t.randomNumber 161 } 162 } else if t.options.highTraceIDGenerator != nil { 163 t.logger.Error("Overriding high trace ID generator but not generating " + 164 "128 bit trace IDs, consider enabling the \"Gen128Bit\" option") 165 } 166 if t.options.maxTagValueLength == 0 { 167 t.options.maxTagValueLength = DefaultMaxTagValueLength 168 } 169 t.process = Process{ 170 Service: serviceName, 171 UUID: strconv.FormatUint(t.randomNumber(), 16), 172 Tags: t.tags, 173 } 174 if throttler, ok := t.debugThrottler.(ProcessSetter); ok { 175 throttler.SetProcess(t.process) 176 } 177 178 return t, t 179} 180 181// addCodec adds registers injector and extractor for given propagation format if not already defined. 182func (t *Tracer) addCodec(format interface{}, injector Injector, extractor Extractor) { 183 if _, ok := t.injectors[format]; !ok { 184 t.injectors[format] = injector 185 } 186 if _, ok := t.extractors[format]; !ok { 187 t.extractors[format] = extractor 188 } 189} 190 191// StartSpan implements StartSpan() method of opentracing.Tracer. 192func (t *Tracer) StartSpan( 193 operationName string, 194 options ...opentracing.StartSpanOption, 195) opentracing.Span { 196 sso := opentracing.StartSpanOptions{} 197 for _, o := range options { 198 o.Apply(&sso) 199 } 200 return t.startSpanWithOptions(operationName, sso) 201} 202 203func (t *Tracer) startSpanWithOptions( 204 operationName string, 205 options opentracing.StartSpanOptions, 206) opentracing.Span { 207 if options.StartTime.IsZero() { 208 options.StartTime = t.timeNow() 209 } 210 211 // Predicate whether the given span context is a valid reference 212 // which may be used as parent / debug ID / baggage items source 213 isValidReference := func(ctx SpanContext) bool { 214 return ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0 215 } 216 217 var references []Reference 218 var parent SpanContext 219 var hasParent bool // need this because `parent` is a value, not reference 220 for _, ref := range options.References { 221 ctx, ok := ref.ReferencedContext.(SpanContext) 222 if !ok { 223 t.logger.Error(fmt.Sprintf( 224 "Reference contains invalid type of SpanReference: %s", 225 reflect.ValueOf(ref.ReferencedContext))) 226 continue 227 } 228 if !isValidReference(ctx) { 229 continue 230 } 231 references = append(references, Reference{Type: ref.Type, Context: ctx}) 232 if !hasParent { 233 parent = ctx 234 hasParent = ref.Type == opentracing.ChildOfRef 235 } 236 } 237 if !hasParent && isValidReference(parent) { 238 // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from 239 // the FollowFromRef as the parent 240 hasParent = true 241 } 242 243 rpcServer := false 244 if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok { 245 rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum)) 246 } 247 248 var samplerTags []Tag 249 var ctx SpanContext 250 newTrace := false 251 if !hasParent || !parent.IsValid() { 252 newTrace = true 253 ctx.traceID.Low = t.randomID() 254 if t.options.gen128Bit { 255 ctx.traceID.High = t.options.highTraceIDGenerator() 256 } 257 ctx.spanID = SpanID(ctx.traceID.Low) 258 ctx.parentID = 0 259 ctx.flags = byte(0) 260 if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) { 261 ctx.flags |= (flagSampled | flagDebug) 262 samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}} 263 } else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled { 264 ctx.flags |= flagSampled 265 samplerTags = tags 266 } 267 } else { 268 ctx.traceID = parent.traceID 269 if rpcServer && t.options.zipkinSharedRPCSpan { 270 // Support Zipkin's one-span-per-RPC model 271 ctx.spanID = parent.spanID 272 ctx.parentID = parent.parentID 273 } else { 274 ctx.spanID = SpanID(t.randomID()) 275 ctx.parentID = parent.spanID 276 } 277 ctx.flags = parent.flags 278 } 279 if hasParent { 280 // copy baggage items 281 if l := len(parent.baggage); l > 0 { 282 ctx.baggage = make(map[string]string, len(parent.baggage)) 283 for k, v := range parent.baggage { 284 ctx.baggage[k] = v 285 } 286 } 287 } 288 289 sp := t.newSpan() 290 sp.context = ctx 291 sp.observer = t.observer.OnStartSpan(sp, operationName, options) 292 return t.startSpanInternal( 293 sp, 294 operationName, 295 options.StartTime, 296 samplerTags, 297 options.Tags, 298 newTrace, 299 rpcServer, 300 references, 301 ) 302} 303 304// Inject implements Inject() method of opentracing.Tracer 305func (t *Tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error { 306 c, ok := ctx.(SpanContext) 307 if !ok { 308 return opentracing.ErrInvalidSpanContext 309 } 310 if injector, ok := t.injectors[format]; ok { 311 return injector.Inject(c, carrier) 312 } 313 return opentracing.ErrUnsupportedFormat 314} 315 316// Extract implements Extract() method of opentracing.Tracer 317func (t *Tracer) Extract( 318 format interface{}, 319 carrier interface{}, 320) (opentracing.SpanContext, error) { 321 if extractor, ok := t.extractors[format]; ok { 322 return extractor.Extract(carrier) 323 } 324 return nil, opentracing.ErrUnsupportedFormat 325} 326 327// Close releases all resources used by the Tracer and flushes any remaining buffered spans. 328func (t *Tracer) Close() error { 329 t.reporter.Close() 330 t.sampler.Close() 331 if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok { 332 mgr.Close() 333 } 334 if throttler, ok := t.debugThrottler.(io.Closer); ok { 335 throttler.Close() 336 } 337 return nil 338} 339 340// Tags returns a slice of tracer-level tags. 341func (t *Tracer) Tags() []opentracing.Tag { 342 tags := make([]opentracing.Tag, len(t.tags)) 343 for i, tag := range t.tags { 344 tags[i] = opentracing.Tag{Key: tag.key, Value: tag.value} 345 } 346 return tags 347} 348 349// newSpan returns an instance of a clean Span object. 350// If options.PoolSpans is true, the spans are retrieved from an object pool. 351func (t *Tracer) newSpan() *Span { 352 if !t.options.poolSpans { 353 return &Span{} 354 } 355 sp := t.spanPool.Get().(*Span) 356 sp.context = emptyContext 357 sp.tracer = nil 358 sp.tags = nil 359 sp.logs = nil 360 return sp 361} 362 363func (t *Tracer) startSpanInternal( 364 sp *Span, 365 operationName string, 366 startTime time.Time, 367 internalTags []Tag, 368 tags opentracing.Tags, 369 newTrace bool, 370 rpcServer bool, 371 references []Reference, 372) *Span { 373 sp.tracer = t 374 sp.operationName = operationName 375 sp.startTime = startTime 376 sp.duration = 0 377 sp.references = references 378 sp.firstInProcess = rpcServer || sp.context.parentID == 0 379 if len(tags) > 0 || len(internalTags) > 0 { 380 sp.tags = make([]Tag, len(internalTags), len(tags)+len(internalTags)) 381 copy(sp.tags, internalTags) 382 for k, v := range tags { 383 sp.observer.OnSetTag(k, v) 384 if k == string(ext.SamplingPriority) && !setSamplingPriority(sp, v) { 385 continue 386 } 387 sp.setTagNoLocking(k, v) 388 } 389 } 390 // emit metrics 391 if sp.context.IsSampled() { 392 t.metrics.SpansStartedSampled.Inc(1) 393 if newTrace { 394 // We cannot simply check for parentID==0 because in Zipkin model the 395 // server-side RPC span has the exact same trace/span/parent IDs as the 396 // calling client-side span, but obviously the server side span is 397 // no longer a root span of the trace. 398 t.metrics.TracesStartedSampled.Inc(1) 399 } else if sp.firstInProcess { 400 t.metrics.TracesJoinedSampled.Inc(1) 401 } 402 } else { 403 t.metrics.SpansStartedNotSampled.Inc(1) 404 if newTrace { 405 t.metrics.TracesStartedNotSampled.Inc(1) 406 } else if sp.firstInProcess { 407 t.metrics.TracesJoinedNotSampled.Inc(1) 408 } 409 } 410 return sp 411} 412 413func (t *Tracer) reportSpan(sp *Span) { 414 t.metrics.SpansFinished.Inc(1) 415 if sp.context.IsSampled() { 416 t.reporter.Report(sp) 417 } 418 if t.options.poolSpans { 419 t.spanPool.Put(sp) 420 } 421} 422 423// randomID generates a random trace/span ID, using tracer.random() generator. 424// It never returns 0. 425func (t *Tracer) randomID() uint64 { 426 val := t.randomNumber() 427 for val == 0 { 428 val = t.randomNumber() 429 } 430 return val 431} 432 433// (NB) span must hold the lock before making this call 434func (t *Tracer) setBaggage(sp *Span, key, value string) { 435 t.baggageSetter.setBaggage(sp, key, value) 436} 437 438// (NB) span must hold the lock before making this call 439func (t *Tracer) isDebugAllowed(operation string) bool { 440 return t.debugThrottler.IsAllowed(operation) 441} 442