1// Unless explicitly stated otherwise all files in this repository are licensed
2// under the Apache License Version 2.0.
3// This product includes software developed at Datadog (https://www.datadoghq.com/).
4// Copyright 2016-2019 Datadog, Inc.
5
6package tracer
7
8import (
9	"sync"
10
11	"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
12	"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
13	"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
14)
15
16var _ ddtrace.SpanContext = (*spanContext)(nil)
17
18// SpanContext represents a span state that can propagate to descendant spans
19// and across process boundaries. It contains all the information needed to
20// spawn a direct descendant of the span that it belongs to. It can be used
21// to create distributed tracing by propagating it using the provided interfaces.
22type spanContext struct {
23	// the below group should propagate only locally
24
25	trace *trace // reference to the trace that this span belongs too
26	span  *span  // reference to the span that hosts this context
27	drop  bool   // when true, the span will not be sent to the agent
28
29	// the below group should propagate cross-process
30
31	traceID uint64
32	spanID  uint64
33
34	mu      sync.RWMutex // guards below fields
35	baggage map[string]string
36	origin  string // e.g. "synthetics"
37}
38
39// newSpanContext creates a new SpanContext to serve as context for the given
40// span. If the provided parent is not nil, the context will inherit the trace,
41// baggage and other values from it. This method also pushes the span into the
42// new context's trace and as a result, it should not be called multiple times
43// for the same span.
44func newSpanContext(span *span, parent *spanContext) *spanContext {
45	context := &spanContext{
46		traceID: span.TraceID,
47		spanID:  span.SpanID,
48		span:    span,
49	}
50	if parent != nil {
51		context.trace = parent.trace
52		context.drop = parent.drop
53		context.origin = parent.origin
54		parent.ForeachBaggageItem(func(k, v string) bool {
55			context.setBaggageItem(k, v)
56			return true
57		})
58	}
59	if context.trace == nil {
60		context.trace = newTrace()
61	}
62	if context.trace.root == nil {
63		// first span in the trace can safely be assumed to be the root
64		context.trace.root = span
65	}
66	// put span in context's trace
67	context.trace.push(span)
68	return context
69}
70
71// SpanID implements ddtrace.SpanContext.
72func (c *spanContext) SpanID() uint64 { return c.spanID }
73
74// TraceID implements ddtrace.SpanContext.
75func (c *spanContext) TraceID() uint64 { return c.traceID }
76
77// ForeachBaggageItem implements ddtrace.SpanContext.
78func (c *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {
79	c.mu.RLock()
80	defer c.mu.RUnlock()
81	for k, v := range c.baggage {
82		if !handler(k, v) {
83			break
84		}
85	}
86}
87
88func (c *spanContext) setSamplingPriority(p int) {
89	if c.trace == nil {
90		c.trace = newTrace()
91	}
92	c.trace.setSamplingPriority(float64(p))
93}
94
95func (c *spanContext) samplingPriority() int {
96	if c.trace == nil {
97		return 0
98	}
99	return c.trace.samplingPriority()
100}
101
102func (c *spanContext) hasSamplingPriority() bool {
103	return c.trace != nil && c.trace.hasSamplingPriority()
104}
105
106func (c *spanContext) setBaggageItem(key, val string) {
107	c.mu.Lock()
108	defer c.mu.Unlock()
109	if c.baggage == nil {
110		c.baggage = make(map[string]string, 1)
111	}
112	c.baggage[key] = val
113}
114
115func (c *spanContext) baggageItem(key string) string {
116	c.mu.RLock()
117	defer c.mu.RUnlock()
118	return c.baggage[key]
119}
120
121// finish marks this span as finished in the trace.
122func (c *spanContext) finish() { c.trace.finishedOne(c.span) }
123
124// trace contains shared context information about a trace, such as sampling
125// priority, the root reference and a buffer of the spans which are part of the
126// trace, if these exist.
127type trace struct {
128	mu       sync.RWMutex // guards below fields
129	spans    []*span      // all the spans that are part of this trace
130	finished int          // the number of finished spans
131	full     bool         // signifies that the span buffer is full
132	priority *float64     // sampling priority
133	locked   bool         // specifies if the sampling priority can be altered
134
135	// root specifies the root of the trace, if known; it is nil when a span
136	// context is extracted from a carrier, at which point there are no spans in
137	// the trace yet.
138	root *span
139}
140
141var (
142	// traceStartSize is the initial size of our trace buffer,
143	// by default we allocate for a handful of spans within the trace,
144	// reasonable as span is actually way bigger, and avoids re-allocating
145	// over and over. Could be fine-tuned at runtime.
146	traceStartSize = 10
147	// traceMaxSize is the maximum number of spans we keep in memory.
148	// This is to avoid memory leaks, if above that value, spans are randomly
149	// dropped and ignore, resulting in corrupted tracing data, but ensuring
150	// original program continues to work as expected.
151	traceMaxSize = int(1e5)
152)
153
154// newTrace creates a new trace using the given callback which will be called
155// upon completion of the trace.
156func newTrace() *trace {
157	return &trace{spans: make([]*span, 0, traceStartSize)}
158}
159
160func (t *trace) hasSamplingPriority() bool {
161	t.mu.RLock()
162	defer t.mu.RUnlock()
163	return t.priority != nil
164}
165
166func (t *trace) samplingPriority() int {
167	t.mu.RLock()
168	defer t.mu.RUnlock()
169	if t.priority == nil {
170		return 0
171	}
172	return int(*t.priority)
173}
174
175func (t *trace) setSamplingPriority(p float64) {
176	t.mu.Lock()
177	defer t.mu.Unlock()
178	t.setSamplingPriorityLocked(p)
179}
180
181func (t *trace) setSamplingPriorityLocked(p float64) {
182	if t.locked {
183		return
184	}
185	if t.root == nil {
186		// this trace is distributed (no local root); modifications
187		// to the sampling priority are not allowed.
188		t.locked = true
189	}
190	if t.priority == nil {
191		t.priority = new(float64)
192	}
193	*t.priority = p
194}
195
196// push pushes a new span into the trace. If the buffer is full, it returns
197// a errBufferFull error.
198func (t *trace) push(sp *span) {
199	t.mu.Lock()
200	defer t.mu.Unlock()
201	if t.full {
202		return
203	}
204	if len(t.spans) >= traceMaxSize {
205		// capacity is reached, we will not be able to complete this trace.
206		t.full = true
207		t.spans = nil // GC
208		log.Error("trace buffer full (%d), dropping trace", traceMaxSize)
209		return
210	}
211	if v, ok := sp.Metrics[keySamplingPriority]; ok {
212		t.setSamplingPriorityLocked(v)
213	}
214	t.spans = append(t.spans, sp)
215}
216
217// finishedOne aknowledges that another span in the trace has finished, and checks
218// if the trace is complete, in which case it calls the onFinish function. It uses
219// the given priority, if non-nil, to mark the root span.
220func (t *trace) finishedOne(s *span) {
221	t.mu.Lock()
222	defer t.mu.Unlock()
223	if t.full {
224		// capacity has been reached, the buffer is no longer tracking
225		// all the spans in the trace, so the below conditions will not
226		// be accurate and would trigger a pre-mature flush, exposing us
227		// to a race condition where spans can be modified while flushing.
228		return
229	}
230	t.finished++
231	if s == t.root && t.priority != nil {
232		// after the root has finished we lock down the priority;
233		// we won't be able to make changes to a span after finishing
234		// without causing a race condition.
235		t.root.setMetric(keySamplingPriority, *t.priority)
236		t.locked = true
237	}
238	if len(t.spans) != t.finished {
239		return
240	}
241	if tr, ok := internal.GetGlobalTracer().(*tracer); ok {
242		// we have a tracer that can receive completed traces.
243		tr.pushTrace(t.spans)
244	}
245	t.spans = nil
246	t.finished = 0 // important, because a buffer can be used for several flushes
247}
248