1// Unless explicitly stated otherwise all files in this repository are licensed 2// under the Apache License Version 2.0. 3// This product includes software developed at Datadog (https://www.datadoghq.com/). 4// Copyright 2016-2019 Datadog, Inc. 5 6package tracer 7 8import ( 9 "sync" 10 11 "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" 12 "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal" 13 "gopkg.in/DataDog/dd-trace-go.v1/internal/log" 14) 15 16var _ ddtrace.SpanContext = (*spanContext)(nil) 17 18// SpanContext represents a span state that can propagate to descendant spans 19// and across process boundaries. It contains all the information needed to 20// spawn a direct descendant of the span that it belongs to. It can be used 21// to create distributed tracing by propagating it using the provided interfaces. 22type spanContext struct { 23 // the below group should propagate only locally 24 25 trace *trace // reference to the trace that this span belongs too 26 span *span // reference to the span that hosts this context 27 drop bool // when true, the span will not be sent to the agent 28 29 // the below group should propagate cross-process 30 31 traceID uint64 32 spanID uint64 33 34 mu sync.RWMutex // guards below fields 35 baggage map[string]string 36 origin string // e.g. "synthetics" 37} 38 39// newSpanContext creates a new SpanContext to serve as context for the given 40// span. If the provided parent is not nil, the context will inherit the trace, 41// baggage and other values from it. This method also pushes the span into the 42// new context's trace and as a result, it should not be called multiple times 43// for the same span. 44func newSpanContext(span *span, parent *spanContext) *spanContext { 45 context := &spanContext{ 46 traceID: span.TraceID, 47 spanID: span.SpanID, 48 span: span, 49 } 50 if parent != nil { 51 context.trace = parent.trace 52 context.drop = parent.drop 53 context.origin = parent.origin 54 parent.ForeachBaggageItem(func(k, v string) bool { 55 context.setBaggageItem(k, v) 56 return true 57 }) 58 } 59 if context.trace == nil { 60 context.trace = newTrace() 61 } 62 if context.trace.root == nil { 63 // first span in the trace can safely be assumed to be the root 64 context.trace.root = span 65 } 66 // put span in context's trace 67 context.trace.push(span) 68 return context 69} 70 71// SpanID implements ddtrace.SpanContext. 72func (c *spanContext) SpanID() uint64 { return c.spanID } 73 74// TraceID implements ddtrace.SpanContext. 75func (c *spanContext) TraceID() uint64 { return c.traceID } 76 77// ForeachBaggageItem implements ddtrace.SpanContext. 78func (c *spanContext) ForeachBaggageItem(handler func(k, v string) bool) { 79 c.mu.RLock() 80 defer c.mu.RUnlock() 81 for k, v := range c.baggage { 82 if !handler(k, v) { 83 break 84 } 85 } 86} 87 88func (c *spanContext) setSamplingPriority(p int) { 89 if c.trace == nil { 90 c.trace = newTrace() 91 } 92 c.trace.setSamplingPriority(float64(p)) 93} 94 95func (c *spanContext) samplingPriority() int { 96 if c.trace == nil { 97 return 0 98 } 99 return c.trace.samplingPriority() 100} 101 102func (c *spanContext) hasSamplingPriority() bool { 103 return c.trace != nil && c.trace.hasSamplingPriority() 104} 105 106func (c *spanContext) setBaggageItem(key, val string) { 107 c.mu.Lock() 108 defer c.mu.Unlock() 109 if c.baggage == nil { 110 c.baggage = make(map[string]string, 1) 111 } 112 c.baggage[key] = val 113} 114 115func (c *spanContext) baggageItem(key string) string { 116 c.mu.RLock() 117 defer c.mu.RUnlock() 118 return c.baggage[key] 119} 120 121// finish marks this span as finished in the trace. 122func (c *spanContext) finish() { c.trace.finishedOne(c.span) } 123 124// trace contains shared context information about a trace, such as sampling 125// priority, the root reference and a buffer of the spans which are part of the 126// trace, if these exist. 127type trace struct { 128 mu sync.RWMutex // guards below fields 129 spans []*span // all the spans that are part of this trace 130 finished int // the number of finished spans 131 full bool // signifies that the span buffer is full 132 priority *float64 // sampling priority 133 locked bool // specifies if the sampling priority can be altered 134 135 // root specifies the root of the trace, if known; it is nil when a span 136 // context is extracted from a carrier, at which point there are no spans in 137 // the trace yet. 138 root *span 139} 140 141var ( 142 // traceStartSize is the initial size of our trace buffer, 143 // by default we allocate for a handful of spans within the trace, 144 // reasonable as span is actually way bigger, and avoids re-allocating 145 // over and over. Could be fine-tuned at runtime. 146 traceStartSize = 10 147 // traceMaxSize is the maximum number of spans we keep in memory. 148 // This is to avoid memory leaks, if above that value, spans are randomly 149 // dropped and ignore, resulting in corrupted tracing data, but ensuring 150 // original program continues to work as expected. 151 traceMaxSize = int(1e5) 152) 153 154// newTrace creates a new trace using the given callback which will be called 155// upon completion of the trace. 156func newTrace() *trace { 157 return &trace{spans: make([]*span, 0, traceStartSize)} 158} 159 160func (t *trace) hasSamplingPriority() bool { 161 t.mu.RLock() 162 defer t.mu.RUnlock() 163 return t.priority != nil 164} 165 166func (t *trace) samplingPriority() int { 167 t.mu.RLock() 168 defer t.mu.RUnlock() 169 if t.priority == nil { 170 return 0 171 } 172 return int(*t.priority) 173} 174 175func (t *trace) setSamplingPriority(p float64) { 176 t.mu.Lock() 177 defer t.mu.Unlock() 178 t.setSamplingPriorityLocked(p) 179} 180 181func (t *trace) setSamplingPriorityLocked(p float64) { 182 if t.locked { 183 return 184 } 185 if t.root == nil { 186 // this trace is distributed (no local root); modifications 187 // to the sampling priority are not allowed. 188 t.locked = true 189 } 190 if t.priority == nil { 191 t.priority = new(float64) 192 } 193 *t.priority = p 194} 195 196// push pushes a new span into the trace. If the buffer is full, it returns 197// a errBufferFull error. 198func (t *trace) push(sp *span) { 199 t.mu.Lock() 200 defer t.mu.Unlock() 201 if t.full { 202 return 203 } 204 if len(t.spans) >= traceMaxSize { 205 // capacity is reached, we will not be able to complete this trace. 206 t.full = true 207 t.spans = nil // GC 208 log.Error("trace buffer full (%d), dropping trace", traceMaxSize) 209 return 210 } 211 if v, ok := sp.Metrics[keySamplingPriority]; ok { 212 t.setSamplingPriorityLocked(v) 213 } 214 t.spans = append(t.spans, sp) 215} 216 217// finishedOne aknowledges that another span in the trace has finished, and checks 218// if the trace is complete, in which case it calls the onFinish function. It uses 219// the given priority, if non-nil, to mark the root span. 220func (t *trace) finishedOne(s *span) { 221 t.mu.Lock() 222 defer t.mu.Unlock() 223 if t.full { 224 // capacity has been reached, the buffer is no longer tracking 225 // all the spans in the trace, so the below conditions will not 226 // be accurate and would trigger a pre-mature flush, exposing us 227 // to a race condition where spans can be modified while flushing. 228 return 229 } 230 t.finished++ 231 if s == t.root && t.priority != nil { 232 // after the root has finished we lock down the priority; 233 // we won't be able to make changes to a span after finishing 234 // without causing a race condition. 235 t.root.setMetric(keySamplingPriority, *t.priority) 236 t.locked = true 237 } 238 if len(t.spans) != t.finished { 239 return 240 } 241 if tr, ok := internal.GetGlobalTracer().(*tracer); ok { 242 // we have a tracer that can receive completed traces. 243 tr.pushTrace(t.spans) 244 } 245 t.spans = nil 246 t.finished = 0 // important, because a buffer can be used for several flushes 247} 248