1// Copyright (c) 2017-2018 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18	"fmt"
19	"io"
20	"math/rand"
21	"os"
22	"reflect"
23	"strconv"
24	"sync"
25	"time"
26
27	"github.com/opentracing/opentracing-go"
28	"github.com/opentracing/opentracing-go/ext"
29
30	"github.com/uber/jaeger-client-go/internal/baggage"
31	"github.com/uber/jaeger-client-go/internal/throttler"
32	"github.com/uber/jaeger-client-go/log"
33	"github.com/uber/jaeger-client-go/utils"
34)
35
36// Tracer implements opentracing.Tracer.
37type Tracer struct {
38	serviceName string
39	hostIPv4    uint32 // this is for zipkin endpoint conversion
40
41	sampler  Sampler
42	reporter Reporter
43	metrics  Metrics
44	logger   log.Logger
45
46	timeNow      func() time.Time
47	randomNumber func() uint64
48
49	options struct {
50		poolSpans            bool
51		gen128Bit            bool // whether to generate 128bit trace IDs
52		zipkinSharedRPCSpan  bool
53		highTraceIDGenerator func() uint64 // custom high trace ID generator
54		maxTagValueLength    int
55		// more options to come
56	}
57	// pool for Span objects
58	spanPool sync.Pool
59
60	injectors  map[interface{}]Injector
61	extractors map[interface{}]Extractor
62
63	observer compositeObserver
64
65	tags    []Tag
66	process Process
67
68	baggageRestrictionManager baggage.RestrictionManager
69	baggageSetter             *baggageSetter
70
71	debugThrottler throttler.Throttler
72}
73
74// NewTracer creates Tracer implementation that reports tracing to Jaeger.
75// The returned io.Closer can be used in shutdown hooks to ensure that the internal
76// queue of the Reporter is drained and all buffered spans are submitted to collectors.
77func NewTracer(
78	serviceName string,
79	sampler Sampler,
80	reporter Reporter,
81	options ...TracerOption,
82) (opentracing.Tracer, io.Closer) {
83	t := &Tracer{
84		serviceName: serviceName,
85		sampler:     sampler,
86		reporter:    reporter,
87		injectors:   make(map[interface{}]Injector),
88		extractors:  make(map[interface{}]Extractor),
89		metrics:     *NewNullMetrics(),
90		spanPool: sync.Pool{New: func() interface{} {
91			return &Span{}
92		}},
93	}
94
95	for _, option := range options {
96		option(t)
97	}
98
99	// register default injectors/extractors unless they are already provided via options
100	textPropagator := newTextMapPropagator(getDefaultHeadersConfig(), t.metrics)
101	t.addCodec(opentracing.TextMap, textPropagator, textPropagator)
102
103	httpHeaderPropagator := newHTTPHeaderPropagator(getDefaultHeadersConfig(), t.metrics)
104	t.addCodec(opentracing.HTTPHeaders, httpHeaderPropagator, httpHeaderPropagator)
105
106	binaryPropagator := newBinaryPropagator(t)
107	t.addCodec(opentracing.Binary, binaryPropagator, binaryPropagator)
108
109	// TODO remove after TChannel supports OpenTracing
110	interopPropagator := &jaegerTraceContextPropagator{tracer: t}
111	t.addCodec(SpanContextFormat, interopPropagator, interopPropagator)
112
113	zipkinPropagator := &zipkinPropagator{tracer: t}
114	t.addCodec(ZipkinSpanFormat, zipkinPropagator, zipkinPropagator)
115
116	if t.baggageRestrictionManager != nil {
117		t.baggageSetter = newBaggageSetter(t.baggageRestrictionManager, &t.metrics)
118	} else {
119		t.baggageSetter = newBaggageSetter(baggage.NewDefaultRestrictionManager(0), &t.metrics)
120	}
121	if t.debugThrottler == nil {
122		t.debugThrottler = throttler.DefaultThrottler{}
123	}
124
125	if t.randomNumber == nil {
126		seedGenerator := utils.NewRand(time.Now().UnixNano())
127		pool := sync.Pool{
128			New: func() interface{} {
129				return rand.NewSource(seedGenerator.Int63())
130			},
131		}
132
133		t.randomNumber = func() uint64 {
134			generator := pool.Get().(rand.Source)
135			number := uint64(generator.Int63())
136			pool.Put(generator)
137			return number
138		}
139	}
140	if t.timeNow == nil {
141		t.timeNow = time.Now
142	}
143	if t.logger == nil {
144		t.logger = log.NullLogger
145	}
146	// Set tracer-level tags
147	t.tags = append(t.tags, Tag{key: JaegerClientVersionTagKey, value: JaegerClientVersion})
148	if hostname, err := os.Hostname(); err == nil {
149		t.tags = append(t.tags, Tag{key: TracerHostnameTagKey, value: hostname})
150	}
151	if ip, err := utils.HostIP(); err == nil {
152		t.tags = append(t.tags, Tag{key: TracerIPTagKey, value: ip.String()})
153		t.hostIPv4 = utils.PackIPAsUint32(ip)
154	} else {
155		t.logger.Error("Unable to determine this host's IP address: " + err.Error())
156	}
157
158	if t.options.gen128Bit {
159		if t.options.highTraceIDGenerator == nil {
160			t.options.highTraceIDGenerator = t.randomNumber
161		}
162	} else if t.options.highTraceIDGenerator != nil {
163		t.logger.Error("Overriding high trace ID generator but not generating " +
164			"128 bit trace IDs, consider enabling the \"Gen128Bit\" option")
165	}
166	if t.options.maxTagValueLength == 0 {
167		t.options.maxTagValueLength = DefaultMaxTagValueLength
168	}
169	t.process = Process{
170		Service: serviceName,
171		UUID:    strconv.FormatUint(t.randomNumber(), 16),
172		Tags:    t.tags,
173	}
174	if throttler, ok := t.debugThrottler.(ProcessSetter); ok {
175		throttler.SetProcess(t.process)
176	}
177
178	return t, t
179}
180
181// addCodec adds registers injector and extractor for given propagation format if not already defined.
182func (t *Tracer) addCodec(format interface{}, injector Injector, extractor Extractor) {
183	if _, ok := t.injectors[format]; !ok {
184		t.injectors[format] = injector
185	}
186	if _, ok := t.extractors[format]; !ok {
187		t.extractors[format] = extractor
188	}
189}
190
191// StartSpan implements StartSpan() method of opentracing.Tracer.
192func (t *Tracer) StartSpan(
193	operationName string,
194	options ...opentracing.StartSpanOption,
195) opentracing.Span {
196	sso := opentracing.StartSpanOptions{}
197	for _, o := range options {
198		o.Apply(&sso)
199	}
200	return t.startSpanWithOptions(operationName, sso)
201}
202
203func (t *Tracer) startSpanWithOptions(
204	operationName string,
205	options opentracing.StartSpanOptions,
206) opentracing.Span {
207	if options.StartTime.IsZero() {
208		options.StartTime = t.timeNow()
209	}
210
211	// Predicate whether the given span context is a valid reference
212	// which may be used as parent / debug ID / baggage items source
213	isValidReference := func(ctx SpanContext) bool {
214		return ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0
215	}
216
217	var references []Reference
218	var parent SpanContext
219	var hasParent bool // need this because `parent` is a value, not reference
220	for _, ref := range options.References {
221		ctx, ok := ref.ReferencedContext.(SpanContext)
222		if !ok {
223			t.logger.Error(fmt.Sprintf(
224				"Reference contains invalid type of SpanReference: %s",
225				reflect.ValueOf(ref.ReferencedContext)))
226			continue
227		}
228		if !isValidReference(ctx) {
229			continue
230		}
231		references = append(references, Reference{Type: ref.Type, Context: ctx})
232		if !hasParent {
233			parent = ctx
234			hasParent = ref.Type == opentracing.ChildOfRef
235		}
236	}
237	if !hasParent && isValidReference(parent) {
238		// If ChildOfRef wasn't found but a FollowFromRef exists, use the context from
239		// the FollowFromRef as the parent
240		hasParent = true
241	}
242
243	rpcServer := false
244	if v, ok := options.Tags[ext.SpanKindRPCServer.Key]; ok {
245		rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum))
246	}
247
248	var samplerTags []Tag
249	var ctx SpanContext
250	newTrace := false
251	if !hasParent || !parent.IsValid() {
252		newTrace = true
253		ctx.traceID.Low = t.randomID()
254		if t.options.gen128Bit {
255			ctx.traceID.High = t.options.highTraceIDGenerator()
256		}
257		ctx.spanID = SpanID(ctx.traceID.Low)
258		ctx.parentID = 0
259		ctx.flags = byte(0)
260		if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) {
261			ctx.flags |= (flagSampled | flagDebug)
262			samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}}
263		} else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled {
264			ctx.flags |= flagSampled
265			samplerTags = tags
266		}
267	} else {
268		ctx.traceID = parent.traceID
269		if rpcServer && t.options.zipkinSharedRPCSpan {
270			// Support Zipkin's one-span-per-RPC model
271			ctx.spanID = parent.spanID
272			ctx.parentID = parent.parentID
273		} else {
274			ctx.spanID = SpanID(t.randomID())
275			ctx.parentID = parent.spanID
276		}
277		ctx.flags = parent.flags
278	}
279	if hasParent {
280		// copy baggage items
281		if l := len(parent.baggage); l > 0 {
282			ctx.baggage = make(map[string]string, len(parent.baggage))
283			for k, v := range parent.baggage {
284				ctx.baggage[k] = v
285			}
286		}
287	}
288
289	sp := t.newSpan()
290	sp.context = ctx
291	sp.observer = t.observer.OnStartSpan(sp, operationName, options)
292	return t.startSpanInternal(
293		sp,
294		operationName,
295		options.StartTime,
296		samplerTags,
297		options.Tags,
298		newTrace,
299		rpcServer,
300		references,
301	)
302}
303
304// Inject implements Inject() method of opentracing.Tracer
305func (t *Tracer) Inject(ctx opentracing.SpanContext, format interface{}, carrier interface{}) error {
306	c, ok := ctx.(SpanContext)
307	if !ok {
308		return opentracing.ErrInvalidSpanContext
309	}
310	if injector, ok := t.injectors[format]; ok {
311		return injector.Inject(c, carrier)
312	}
313	return opentracing.ErrUnsupportedFormat
314}
315
316// Extract implements Extract() method of opentracing.Tracer
317func (t *Tracer) Extract(
318	format interface{},
319	carrier interface{},
320) (opentracing.SpanContext, error) {
321	if extractor, ok := t.extractors[format]; ok {
322		return extractor.Extract(carrier)
323	}
324	return nil, opentracing.ErrUnsupportedFormat
325}
326
327// Close releases all resources used by the Tracer and flushes any remaining buffered spans.
328func (t *Tracer) Close() error {
329	t.reporter.Close()
330	t.sampler.Close()
331	if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok {
332		mgr.Close()
333	}
334	if throttler, ok := t.debugThrottler.(io.Closer); ok {
335		throttler.Close()
336	}
337	return nil
338}
339
340// Tags returns a slice of tracer-level tags.
341func (t *Tracer) Tags() []opentracing.Tag {
342	tags := make([]opentracing.Tag, len(t.tags))
343	for i, tag := range t.tags {
344		tags[i] = opentracing.Tag{Key: tag.key, Value: tag.value}
345	}
346	return tags
347}
348
349// newSpan returns an instance of a clean Span object.
350// If options.PoolSpans is true, the spans are retrieved from an object pool.
351func (t *Tracer) newSpan() *Span {
352	if !t.options.poolSpans {
353		return &Span{}
354	}
355	sp := t.spanPool.Get().(*Span)
356	sp.context = emptyContext
357	sp.tracer = nil
358	sp.tags = nil
359	sp.logs = nil
360	return sp
361}
362
363func (t *Tracer) startSpanInternal(
364	sp *Span,
365	operationName string,
366	startTime time.Time,
367	internalTags []Tag,
368	tags opentracing.Tags,
369	newTrace bool,
370	rpcServer bool,
371	references []Reference,
372) *Span {
373	sp.tracer = t
374	sp.operationName = operationName
375	sp.startTime = startTime
376	sp.duration = 0
377	sp.references = references
378	sp.firstInProcess = rpcServer || sp.context.parentID == 0
379	if len(tags) > 0 || len(internalTags) > 0 {
380		sp.tags = make([]Tag, len(internalTags), len(tags)+len(internalTags))
381		copy(sp.tags, internalTags)
382		for k, v := range tags {
383			sp.observer.OnSetTag(k, v)
384			if k == string(ext.SamplingPriority) && !setSamplingPriority(sp, v) {
385				continue
386			}
387			sp.setTagNoLocking(k, v)
388		}
389	}
390	// emit metrics
391	if sp.context.IsSampled() {
392		t.metrics.SpansStartedSampled.Inc(1)
393		if newTrace {
394			// We cannot simply check for parentID==0 because in Zipkin model the
395			// server-side RPC span has the exact same trace/span/parent IDs as the
396			// calling client-side span, but obviously the server side span is
397			// no longer a root span of the trace.
398			t.metrics.TracesStartedSampled.Inc(1)
399		} else if sp.firstInProcess {
400			t.metrics.TracesJoinedSampled.Inc(1)
401		}
402	} else {
403		t.metrics.SpansStartedNotSampled.Inc(1)
404		if newTrace {
405			t.metrics.TracesStartedNotSampled.Inc(1)
406		} else if sp.firstInProcess {
407			t.metrics.TracesJoinedNotSampled.Inc(1)
408		}
409	}
410	return sp
411}
412
413func (t *Tracer) reportSpan(sp *Span) {
414	t.metrics.SpansFinished.Inc(1)
415	if sp.context.IsSampled() {
416		t.reporter.Report(sp)
417	}
418	if t.options.poolSpans {
419		t.spanPool.Put(sp)
420	}
421}
422
423// randomID generates a random trace/span ID, using tracer.random() generator.
424// It never returns 0.
425func (t *Tracer) randomID() uint64 {
426	val := t.randomNumber()
427	for val == 0 {
428		val = t.randomNumber()
429	}
430	return val
431}
432
433// (NB) span must hold the lock before making this call
434func (t *Tracer) setBaggage(sp *Span, key, value string) {
435	t.baggageSetter.setBaggage(sp, key, value)
436}
437
438// (NB) span must hold the lock before making this call
439func (t *Tracer) isDebugAllowed(operation string) bool {
440	return t.debugThrottler.IsAllowed(operation)
441}
442