1// Copyright (c) 2017-2018 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18	"fmt"
19	"io"
20	"math/rand"
21	"os"
22	"reflect"
23	"strconv"
24	"sync"
25	"time"
26
27	"github.com/opentracing/opentracing-go"
28	"github.com/opentracing/opentracing-go/ext"
29
30	"github.com/uber/jaeger-client-go/internal/baggage"
31	"github.com/uber/jaeger-client-go/internal/throttler"
32	"github.com/uber/jaeger-client-go/log"
33	"github.com/uber/jaeger-client-go/utils"
34)
35
36// Tracer implements opentracing.Tracer.
37type Tracer struct {
38	serviceName string
39	hostIPv4    uint32 // this is for zipkin endpoint conversion
40
41	sampler  SamplerV2
42	reporter Reporter
43	metrics  Metrics
44	logger   log.DebugLogger
45
46	timeNow      func() time.Time
47	randomNumber func() uint64
48
49	options struct {
50		gen128Bit                   bool // whether to generate 128bit trace IDs
51		zipkinSharedRPCSpan         bool
52		highTraceIDGenerator        func() uint64 // custom high trace ID generator
53		maxTagValueLength           int
54		noDebugFlagOnForcedSampling bool
55		maxLogsPerSpan              int
56		// more options to come
57	}
58	// allocator of Span objects
59	spanAllocator SpanAllocator
60
61	injectors  map[interface{}]Injector
62	extractors map[interface{}]Extractor
63
64	observer compositeObserver
65
66	tags    []Tag
67	process Process
68
69	baggageRestrictionManager baggage.RestrictionManager
70	baggageSetter             *baggageSetter
71
72	debugThrottler throttler.Throttler
73}
74
75// NewTracer creates Tracer implementation that reports tracing to Jaeger.
76// The returned io.Closer can be used in shutdown hooks to ensure that the internal
77// queue of the Reporter is drained and all buffered spans are submitted to collectors.
78// TODO (breaking change) return *Tracer only, without closer.
79func NewTracer(
80	serviceName string,
81	sampler Sampler,
82	reporter Reporter,
83	options ...TracerOption,
84) (opentracing.Tracer, io.Closer) {
85	t := &Tracer{
86		serviceName:   serviceName,
87		sampler:       samplerV1toV2(sampler),
88		reporter:      reporter,
89		injectors:     make(map[interface{}]Injector),
90		extractors:    make(map[interface{}]Extractor),
91		metrics:       *NewNullMetrics(),
92		spanAllocator: simpleSpanAllocator{},
93	}
94
95	for _, option := range options {
96		option(t)
97	}
98
99	// register default injectors/extractors unless they are already provided via options
100	textPropagator := NewTextMapPropagator(getDefaultHeadersConfig(), t.metrics)
101	t.addCodec(opentracing.TextMap, textPropagator, textPropagator)
102
103	httpHeaderPropagator := NewHTTPHeaderPropagator(getDefaultHeadersConfig(), t.metrics)
104	t.addCodec(opentracing.HTTPHeaders, httpHeaderPropagator, httpHeaderPropagator)
105
106	binaryPropagator := NewBinaryPropagator(t)
107	t.addCodec(opentracing.Binary, binaryPropagator, binaryPropagator)
108
109	// TODO remove after TChannel supports OpenTracing
110	interopPropagator := &jaegerTraceContextPropagator{tracer: t}
111	t.addCodec(SpanContextFormat, interopPropagator, interopPropagator)
112
113	zipkinPropagator := &zipkinPropagator{tracer: t}
114	t.addCodec(ZipkinSpanFormat, zipkinPropagator, zipkinPropagator)
115
116	if t.baggageRestrictionManager != nil {
117		t.baggageSetter = newBaggageSetter(t.baggageRestrictionManager, &t.metrics)
118	} else {
119		t.baggageSetter = newBaggageSetter(baggage.NewDefaultRestrictionManager(0), &t.metrics)
120	}
121	if t.debugThrottler == nil {
122		t.debugThrottler = throttler.DefaultThrottler{}
123	}
124
125	if t.randomNumber == nil {
126		seedGenerator := utils.NewRand(time.Now().UnixNano())
127		pool := sync.Pool{
128			New: func() interface{} {
129				return rand.NewSource(seedGenerator.Int63())
130			},
131		}
132
133		t.randomNumber = func() uint64 {
134			generator := pool.Get().(rand.Source)
135			number := uint64(generator.Int63())
136			pool.Put(generator)
137			return number
138		}
139	}
140	if t.timeNow == nil {
141		t.timeNow = time.Now
142	}
143	if t.logger == nil {
144		t.logger = log.NullLogger
145	}
146	// Set tracer-level tags
147	t.tags = append(t.tags, Tag{key: JaegerClientVersionTagKey, value: JaegerClientVersion})
148	if hostname, err := os.Hostname(); err == nil {
149		t.tags = append(t.tags, Tag{key: TracerHostnameTagKey, value: hostname})
150	}
151	if ipval, ok := t.getTag(TracerIPTagKey); ok {
152		ipv4, err := utils.ParseIPToUint32(ipval.(string))
153		if err != nil {
154			t.hostIPv4 = 0
155			t.logger.Error("Unable to convert the externally provided ip to uint32: " + err.Error())
156		} else {
157			t.hostIPv4 = ipv4
158		}
159	} else if ip, err := utils.HostIP(); err == nil {
160		t.tags = append(t.tags, Tag{key: TracerIPTagKey, value: ip.String()})
161		t.hostIPv4 = utils.PackIPAsUint32(ip)
162	} else {
163		t.logger.Error("Unable to determine this host's IP address: " + err.Error())
164	}
165
166	if t.options.gen128Bit {
167		if t.options.highTraceIDGenerator == nil {
168			t.options.highTraceIDGenerator = t.randomNumber
169		}
170	} else if t.options.highTraceIDGenerator != nil {
171		t.logger.Error("Overriding high trace ID generator but not generating " +
172			"128 bit trace IDs, consider enabling the \"Gen128Bit\" option")
173	}
174	if t.options.maxTagValueLength == 0 {
175		t.options.maxTagValueLength = DefaultMaxTagValueLength
176	}
177	t.process = Process{
178		Service: serviceName,
179		UUID:    strconv.FormatUint(t.randomNumber(), 16),
180		Tags:    t.tags,
181	}
182	if throttler, ok := t.debugThrottler.(ProcessSetter); ok {
183		throttler.SetProcess(t.process)
184	}
185
186	return t, t
187}
188
189// addCodec adds registers injector and extractor for given propagation format if not already defined.
190func (t *Tracer) addCodec(format interface{}, injector Injector, extractor Extractor) {
191	if _, ok := t.injectors[format]; !ok {
192		t.injectors[format] = injector
193	}
194	if _, ok := t.extractors[format]; !ok {
195		t.extractors[format] = extractor
196	}
197}
198
199// StartSpan implements StartSpan() method of opentracing.Tracer.
200func (t *Tracer) StartSpan(
201	operationName string,
202	options ...opentracing.StartSpanOption,
203) opentracing.Span {
204	sso := opentracing.StartSpanOptions{}
205	for _, o := range options {
206		o.Apply(&sso)
207	}
208	return t.startSpanWithOptions(operationName, sso)
209}
210
211func (t *Tracer) startSpanWithOptions(
212	operationName string,
213	options opentracing.StartSpanOptions,
214) opentracing.Span {
215	if options.StartTime.IsZero() {
216		options.StartTime = t.timeNow()
217	}
218
219	// Predicate whether the given span context is an empty reference
220	// or may be used as parent / debug ID / baggage items source
221	isEmptyReference := func(ctx SpanContext) bool {
222		return !ctx.IsValid() && !ctx.isDebugIDContainerOnly() && len(ctx.baggage) == 0
223	}
224
225	var references []Reference
226	var parent SpanContext
227	var hasParent bool // need this because `parent` is a value, not reference
228	var ctx SpanContext
229	var isSelfRef bool
230	for _, ref := range options.References {
231		ctxRef, ok := ref.ReferencedContext.(SpanContext)
232		if !ok {
233			t.logger.Error(fmt.Sprintf(
234				"Reference contains invalid type of SpanReference: %s",
235				reflect.ValueOf(ref.ReferencedContext)))
236			continue
237		}
238		if isEmptyReference(ctxRef) {
239			continue
240		}
241
242		if ref.Type == selfRefType {
243			isSelfRef = true
244			ctx = ctxRef
245			continue
246		}
247
248		if ctxRef.IsValid() {
249			// we don't want empty context that contains only debug-id or baggage
250			references = append(references, Reference{Type: ref.Type, Context: ctxRef})
251		}
252
253		if !hasParent {
254			parent = ctxRef
255			hasParent = ref.Type == opentracing.ChildOfRef
256		}
257	}
258	if !hasParent && !isEmptyReference(parent) {
259		// If ChildOfRef wasn't found but a FollowFromRef exists, use the context from
260		// the FollowFromRef as the parent
261		hasParent = true
262	}
263
264	rpcServer := false
265	if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok {
266		rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum))
267	}
268
269	var internalTags []Tag
270	newTrace := false
271	if !isSelfRef {
272		if !hasParent || !parent.IsValid() {
273			newTrace = true
274			ctx.traceID.Low = t.randomID()
275			if t.options.gen128Bit {
276				ctx.traceID.High = t.options.highTraceIDGenerator()
277			}
278			ctx.spanID = SpanID(ctx.traceID.Low)
279			ctx.parentID = 0
280			ctx.samplingState = &samplingState{
281				localRootSpan: ctx.spanID,
282			}
283			if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) {
284				ctx.samplingState.setDebugAndSampled()
285				internalTags = append(internalTags, Tag{key: JaegerDebugHeader, value: parent.debugID})
286			}
287		} else {
288			ctx.traceID = parent.traceID
289			if rpcServer && t.options.zipkinSharedRPCSpan {
290				// Support Zipkin's one-span-per-RPC model
291				ctx.spanID = parent.spanID
292				ctx.parentID = parent.parentID
293			} else {
294				ctx.spanID = SpanID(t.randomID())
295				ctx.parentID = parent.spanID
296			}
297			ctx.samplingState = parent.samplingState
298			if parent.remote {
299				ctx.samplingState.setFinal()
300				ctx.samplingState.localRootSpan = ctx.spanID
301			}
302		}
303		if hasParent {
304			// copy baggage items
305			if l := len(parent.baggage); l > 0 {
306				ctx.baggage = make(map[string]string, len(parent.baggage))
307				for k, v := range parent.baggage {
308					ctx.baggage[k] = v
309				}
310			}
311		}
312	}
313
314	sp := t.newSpan()
315	sp.context = ctx
316	sp.tracer = t
317	sp.operationName = operationName
318	sp.startTime = options.StartTime
319	sp.duration = 0
320	sp.references = references
321	sp.firstInProcess = rpcServer || sp.context.parentID == 0
322
323	if !sp.context.isSamplingFinalized() {
324		decision := t.sampler.OnCreateSpan(sp)
325		sp.applySamplingDecision(decision, false)
326	}
327	sp.observer = t.observer.OnStartSpan(sp, operationName, options)
328
329	if tagsTotalLength := len(options.Tags) + len(internalTags); tagsTotalLength > 0 {
330		if sp.tags == nil || cap(sp.tags) < tagsTotalLength {
331			sp.tags = make([]Tag, 0, tagsTotalLength)
332		}
333		sp.tags = append(sp.tags, internalTags...)
334		for k, v := range options.Tags {
335			sp.setTagInternal(k, v, false)
336		}
337	}
338	t.emitNewSpanMetrics(sp, newTrace)
339	return sp
340}
341
342// Inject implements Inject() method of opentracing.Tracer
343func (t *Tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error {
344	c, ok := ctx.(SpanContext)
345	if !ok {
346		return opentracing.ErrInvalidSpanContext
347	}
348	if injector, ok := t.injectors[format]; ok {
349		return injector.Inject(c, carrier)
350	}
351	return opentracing.ErrUnsupportedFormat
352}
353
354// Extract implements Extract() method of opentracing.Tracer
355func (t *Tracer) Extract(
356	format interface{},
357	carrier interface{},
358) (opentracing.SpanContext, error) {
359	if extractor, ok := t.extractors[format]; ok {
360		spanCtx, err := extractor.Extract(carrier)
361		if err != nil {
362			return nil, err // ensure returned spanCtx is nil
363		}
364		spanCtx.remote = true
365		return spanCtx, nil
366	}
367	return nil, opentracing.ErrUnsupportedFormat
368}
369
370// Close releases all resources used by the Tracer and flushes any remaining buffered spans.
371func (t *Tracer) Close() error {
372	t.logger.Debugf("closing tracer")
373	t.reporter.Close()
374	t.sampler.Close()
375	if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok {
376		_ = mgr.Close()
377	}
378	if throttler, ok := t.debugThrottler.(io.Closer); ok {
379		_ = throttler.Close()
380	}
381	return nil
382}
383
384// Tags returns a slice of tracer-level tags.
385func (t *Tracer) Tags() []opentracing.Tag {
386	tags := make([]opentracing.Tag, len(t.tags))
387	for i, tag := range t.tags {
388		tags[i] = opentracing.Tag{Key: tag.key, Value: tag.value}
389	}
390	return tags
391}
392
393// getTag returns the value of specific tag, if not exists, return nil.
394// TODO only used by tests, move there.
395func (t *Tracer) getTag(key string) (interface{}, bool) {
396	for _, tag := range t.tags {
397		if tag.key == key {
398			return tag.value, true
399		}
400	}
401	return nil, false
402}
403
404// newSpan returns an instance of a clean Span object.
405// If options.PoolSpans is true, the spans are retrieved from an object pool.
406func (t *Tracer) newSpan() *Span {
407	return t.spanAllocator.Get()
408}
409
410// emitNewSpanMetrics generates metrics on the number of started spans and traces.
411// newTrace param: we cannot simply check for parentID==0 because in Zipkin model the
412// server-side RPC span has the exact same trace/span/parent IDs as the
413// calling client-side span, but obviously the server side span is
414// no longer a root span of the trace.
415func (t *Tracer) emitNewSpanMetrics(sp *Span, newTrace bool) {
416	if !sp.context.isSamplingFinalized() {
417		t.metrics.SpansStartedDelayedSampling.Inc(1)
418		if newTrace {
419			t.metrics.TracesStartedDelayedSampling.Inc(1)
420		}
421		// joining a trace is not possible, because sampling decision inherited from upstream is final
422	} else if sp.context.IsSampled() {
423		t.metrics.SpansStartedSampled.Inc(1)
424		if newTrace {
425			t.metrics.TracesStartedSampled.Inc(1)
426		} else if sp.firstInProcess {
427			t.metrics.TracesJoinedSampled.Inc(1)
428		}
429	} else {
430		t.metrics.SpansStartedNotSampled.Inc(1)
431		if newTrace {
432			t.metrics.TracesStartedNotSampled.Inc(1)
433		} else if sp.firstInProcess {
434			t.metrics.TracesJoinedNotSampled.Inc(1)
435		}
436	}
437}
438
439func (t *Tracer) reportSpan(sp *Span) {
440	ctx := sp.SpanContext()
441
442	if !ctx.isSamplingFinalized() {
443		t.metrics.SpansFinishedDelayedSampling.Inc(1)
444	} else if ctx.IsSampled() {
445		t.metrics.SpansFinishedSampled.Inc(1)
446	} else {
447		t.metrics.SpansFinishedNotSampled.Inc(1)
448	}
449
450	// Note: if the reporter is processing Span asynchronously then it needs to Retain() the span,
451	// and then Release() it when no longer needed.
452	// Otherwise, the span may be reused for another trace and its data may be overwritten.
453	if ctx.IsSampled() {
454		t.reporter.Report(sp)
455	}
456
457	sp.Release()
458}
459
460// randomID generates a random trace/span ID, using tracer.random() generator.
461// It never returns 0.
462func (t *Tracer) randomID() uint64 {
463	val := t.randomNumber()
464	for val == 0 {
465		val = t.randomNumber()
466	}
467	return val
468}
469
470// (NB) span must hold the lock before making this call
471func (t *Tracer) setBaggage(sp *Span, key, value string) {
472	t.baggageSetter.setBaggage(sp, key, value)
473}
474
475// (NB) span must hold the lock before making this call
476func (t *Tracer) isDebugAllowed(operation string) bool {
477	return t.debugThrottler.IsAllowed(operation)
478}
479
480// Sampler returns the sampler given to the tracer at creation.
481func (t *Tracer) Sampler() SamplerV2 {
482	return t.sampler
483}
484
485// SelfRef creates an opentracing compliant SpanReference from a jaeger
486// SpanContext. This is a factory function in order to encapsulate jaeger specific
487// types.
488func SelfRef(ctx SpanContext) opentracing.SpanReference {
489	return opentracing.SpanReference{
490		Type:              selfRefType,
491		ReferencedContext: ctx,
492	}
493}
494