1// Copyright (c) 2017-2018 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18	"sync"
19	"sync/atomic"
20	"time"
21
22	"github.com/opentracing/opentracing-go"
23	"github.com/opentracing/opentracing-go/ext"
24	"github.com/opentracing/opentracing-go/log"
25)
26
27// Span implements opentracing.Span
28type Span struct {
29	// referenceCounter used to increase the lifetime of
30	// the object before return it into the pool.
31	referenceCounter int32
32
33	sync.RWMutex
34
35	tracer *Tracer
36
37	// TODO: (breaking change) change to use a pointer
38	context SpanContext
39
40	// The name of the "operation" this span is an instance of.
41	// Known as a "span name" in some implementations.
42	operationName string
43
44	// firstInProcess, if true, indicates that this span is the root of the (sub)tree
45	// of spans in the current process. In other words it's true for the root spans,
46	// and the ingress spans when the process joins another trace.
47	firstInProcess bool
48
49	// startTime is the timestamp indicating when the span began, with microseconds precision.
50	startTime time.Time
51
52	// duration returns duration of the span with microseconds precision.
53	// Zero value means duration is unknown.
54	duration time.Duration
55
56	// tags attached to this span
57	tags []Tag
58
59	// The span's "micro-log"
60	logs []opentracing.LogRecord
61
62	// references for this span
63	references []Reference
64
65	observer ContribSpanObserver
66}
67
68// Tag is a simple key value wrapper.
69// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
70type Tag struct {
71	key   string
72	value interface{}
73}
74
75// NewTag creates a new Tag.
76// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
77func NewTag(key string, value interface{}) Tag {
78	return Tag{key: key, value: value}
79}
80
81// SetOperationName sets or changes the operation name.
82func (s *Span) SetOperationName(operationName string) opentracing.Span {
83	s.Lock()
84	s.operationName = operationName
85	s.Unlock()
86	if !s.isSamplingFinalized() {
87		decision := s.tracer.sampler.OnSetOperationName(s, operationName)
88		s.applySamplingDecision(decision, true)
89	}
90	s.observer.OnSetOperationName(operationName)
91	return s
92}
93
94// SetTag implements SetTag() of opentracing.Span
95func (s *Span) SetTag(key string, value interface{}) opentracing.Span {
96	return s.setTagInternal(key, value, true)
97}
98
99func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span {
100	s.observer.OnSetTag(key, value)
101	if key == string(ext.SamplingPriority) && !setSamplingPriority(s, value) {
102		return s
103	}
104	if !s.isSamplingFinalized() {
105		decision := s.tracer.sampler.OnSetTag(s, key, value)
106		s.applySamplingDecision(decision, lock)
107	}
108	if s.isWriteable() {
109		if lock {
110			s.Lock()
111			defer s.Unlock()
112		}
113		s.appendTagNoLocking(key, value)
114	}
115	return s
116}
117
118// SpanContext returns span context
119func (s *Span) SpanContext() SpanContext {
120	s.Lock()
121	defer s.Unlock()
122	return s.context
123}
124
125// StartTime returns span start time
126func (s *Span) StartTime() time.Time {
127	s.Lock()
128	defer s.Unlock()
129	return s.startTime
130}
131
132// Duration returns span duration
133func (s *Span) Duration() time.Duration {
134	s.Lock()
135	defer s.Unlock()
136	return s.duration
137}
138
139// Tags returns tags for span
140func (s *Span) Tags() opentracing.Tags {
141	s.Lock()
142	defer s.Unlock()
143	var result = make(opentracing.Tags, len(s.tags))
144	for _, tag := range s.tags {
145		result[tag.key] = tag.value
146	}
147	return result
148}
149
150// Logs returns micro logs for span
151func (s *Span) Logs() []opentracing.LogRecord {
152	s.Lock()
153	defer s.Unlock()
154
155	return append([]opentracing.LogRecord(nil), s.logs...)
156}
157
158// References returns references for this span
159func (s *Span) References() []opentracing.SpanReference {
160	s.Lock()
161	defer s.Unlock()
162
163	if s.references == nil || len(s.references) == 0 {
164		return nil
165	}
166
167	result := make([]opentracing.SpanReference, len(s.references))
168	for i, r := range s.references {
169		result[i] = opentracing.SpanReference{Type: r.Type, ReferencedContext: r.Context}
170	}
171	return result
172}
173
174func (s *Span) appendTagNoLocking(key string, value interface{}) {
175	s.tags = append(s.tags, Tag{key: key, value: value})
176}
177
178// LogFields implements opentracing.Span API
179func (s *Span) LogFields(fields ...log.Field) {
180	s.Lock()
181	defer s.Unlock()
182	if !s.context.IsSampled() {
183		return
184	}
185	s.logFieldsNoLocking(fields...)
186}
187
188// this function should only be called while holding a Write lock
189func (s *Span) logFieldsNoLocking(fields ...log.Field) {
190	lr := opentracing.LogRecord{
191		Fields:    fields,
192		Timestamp: time.Now(),
193	}
194	s.appendLogNoLocking(lr)
195}
196
197// LogKV implements opentracing.Span API
198func (s *Span) LogKV(alternatingKeyValues ...interface{}) {
199	s.RLock()
200	sampled := s.context.IsSampled()
201	s.RUnlock()
202	if !sampled {
203		return
204	}
205	fields, err := log.InterleavedKVToFields(alternatingKeyValues...)
206	if err != nil {
207		s.LogFields(log.Error(err), log.String("function", "LogKV"))
208		return
209	}
210	s.LogFields(fields...)
211}
212
213// LogEvent implements opentracing.Span API
214func (s *Span) LogEvent(event string) {
215	s.Log(opentracing.LogData{Event: event})
216}
217
218// LogEventWithPayload implements opentracing.Span API
219func (s *Span) LogEventWithPayload(event string, payload interface{}) {
220	s.Log(opentracing.LogData{Event: event, Payload: payload})
221}
222
223// Log implements opentracing.Span API
224func (s *Span) Log(ld opentracing.LogData) {
225	s.Lock()
226	defer s.Unlock()
227	if s.context.IsSampled() {
228		if ld.Timestamp.IsZero() {
229			ld.Timestamp = s.tracer.timeNow()
230		}
231		s.appendLogNoLocking(ld.ToLogRecord())
232	}
233}
234
235// this function should only be called while holding a Write lock
236func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
237	// TODO add logic to limit number of logs per span (issue #46)
238	s.logs = append(s.logs, lr)
239}
240
241// SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext
242func (s *Span) SetBaggageItem(key, value string) opentracing.Span {
243	s.Lock()
244	defer s.Unlock()
245	s.tracer.setBaggage(s, key, value)
246	return s
247}
248
249// BaggageItem implements BaggageItem() of opentracing.SpanContext
250func (s *Span) BaggageItem(key string) string {
251	s.RLock()
252	defer s.RUnlock()
253	return s.context.baggage[key]
254}
255
256// Finish implements opentracing.Span API
257// After finishing the Span object it returns back to the allocator unless the reporter retains it again,
258// so after that, the Span object should no longer be used because it won't be valid anymore.
259func (s *Span) Finish() {
260	s.FinishWithOptions(opentracing.FinishOptions{})
261}
262
263// FinishWithOptions implements opentracing.Span API
264func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
265	if options.FinishTime.IsZero() {
266		options.FinishTime = s.tracer.timeNow()
267	}
268	s.observer.OnFinish(options)
269	s.Lock()
270	s.duration = options.FinishTime.Sub(s.startTime)
271	s.Unlock()
272	if !s.isSamplingFinalized() {
273		decision := s.tracer.sampler.OnFinishSpan(s)
274		s.applySamplingDecision(decision, true)
275	}
276	if s.context.IsSampled() {
277		if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 {
278			s.Lock()
279			// Note: bulk logs are not subject to maxLogsPerSpan limit
280			if options.LogRecords != nil {
281				s.logs = append(s.logs, options.LogRecords...)
282			}
283			for _, ld := range options.BulkLogData {
284				s.logs = append(s.logs, ld.ToLogRecord())
285			}
286			s.Unlock()
287		}
288	}
289	// call reportSpan even for non-sampled traces, to return span to the pool
290	// and update metrics counter
291	s.tracer.reportSpan(s)
292}
293
294// Context implements opentracing.Span API
295func (s *Span) Context() opentracing.SpanContext {
296	s.Lock()
297	defer s.Unlock()
298	return s.context
299}
300
301// Tracer implements opentracing.Span API
302func (s *Span) Tracer() opentracing.Tracer {
303	return s.tracer
304}
305
306func (s *Span) String() string {
307	s.RLock()
308	defer s.RUnlock()
309	return s.context.String()
310}
311
312// OperationName allows retrieving current operation name.
313func (s *Span) OperationName() string {
314	s.RLock()
315	defer s.RUnlock()
316	return s.operationName
317}
318
319// Retain increases object counter to increase the lifetime of the object
320func (s *Span) Retain() *Span {
321	atomic.AddInt32(&s.referenceCounter, 1)
322	return s
323}
324
325// Release decrements object counter and return to the
326// allocator manager  when counter will below zero
327func (s *Span) Release() {
328	if atomic.AddInt32(&s.referenceCounter, -1) == -1 {
329		s.tracer.spanAllocator.Put(s)
330	}
331}
332
333// reset span state and release unused data
334func (s *Span) reset() {
335	s.firstInProcess = false
336	s.context = emptyContext
337	s.operationName = ""
338	s.tracer = nil
339	s.startTime = time.Time{}
340	s.duration = 0
341	s.observer = nil
342	atomic.StoreInt32(&s.referenceCounter, 0)
343
344	// Note: To reuse memory we can save the pointers on the heap
345	s.tags = s.tags[:0]
346	s.logs = s.logs[:0]
347	s.references = s.references[:0]
348}
349
350func (s *Span) serviceName() string {
351	return s.tracer.serviceName
352}
353
354func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
355	if !decision.Retryable {
356		s.context.samplingState.setFinal()
357	}
358	if decision.Sample {
359		s.context.samplingState.setSampled()
360		if len(decision.Tags) > 0 {
361			if lock {
362				s.Lock()
363				defer s.Unlock()
364			}
365			for _, tag := range decision.Tags {
366				s.appendTagNoLocking(tag.key, tag.value)
367			}
368		}
369	}
370}
371
372// Span can be written to if it is sampled or the sampling decision has not been finalized.
373func (s *Span) isWriteable() bool {
374	state := s.context.samplingState
375	return !state.isFinal() || state.isSampled()
376}
377
378func (s *Span) isSamplingFinalized() bool {
379	return s.context.samplingState.isFinal()
380}
381
382// setSamplingPriority returns true if the flag was updated successfully, false otherwise.
383// The behavior of setSamplingPriority is surprising
384// If noDebugFlagOnForcedSampling is set
385//     setSamplingPriority(span, 1) always sets only flagSampled
386// If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes
387//     setSamplingPriority(span, 1) sets both flagSampled and flagDebug
388// However,
389//     setSamplingPriority(span, 0) always only resets flagSampled
390//
391// This means that doing a setSamplingPriority(span, 1) followed by setSamplingPriority(span, 0) can
392// leave flagDebug set
393func setSamplingPriority(s *Span, value interface{}) bool {
394	val, ok := value.(uint16)
395	if !ok {
396		return false
397	}
398	if val == 0 {
399		s.context.samplingState.unsetSampled()
400		s.context.samplingState.setFinal()
401		return true
402	}
403	if s.tracer.options.noDebugFlagOnForcedSampling {
404		s.context.samplingState.setSampled()
405		s.context.samplingState.setFinal()
406		return true
407	} else if s.tracer.isDebugAllowed(s.operationName) {
408		s.context.samplingState.setDebugAndSampled()
409		s.context.samplingState.setFinal()
410		return true
411	}
412	return false
413}
414
415// EnableFirehose enables firehose flag on the span context
416func EnableFirehose(s *Span) {
417	s.Lock()
418	defer s.Unlock()
419	s.context.samplingState.setFirehose()
420}
421