1// Copyright (c) 2017-2018 Uber Technologies, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package jaeger 16 17import ( 18 "sync" 19 "sync/atomic" 20 "time" 21 22 "github.com/opentracing/opentracing-go" 23 "github.com/opentracing/opentracing-go/ext" 24 "github.com/opentracing/opentracing-go/log" 25) 26 27// Span implements opentracing.Span 28type Span struct { 29 // referenceCounter used to increase the lifetime of 30 // the object before return it into the pool. 31 referenceCounter int32 32 33 sync.RWMutex 34 35 tracer *Tracer 36 37 // TODO: (breaking change) change to use a pointer 38 context SpanContext 39 40 // The name of the "operation" this span is an instance of. 41 // Known as a "span name" in some implementations. 42 operationName string 43 44 // firstInProcess, if true, indicates that this span is the root of the (sub)tree 45 // of spans in the current process. In other words it's true for the root spans, 46 // and the ingress spans when the process joins another trace. 47 firstInProcess bool 48 49 // startTime is the timestamp indicating when the span began, with microseconds precision. 50 startTime time.Time 51 52 // duration returns duration of the span with microseconds precision. 53 // Zero value means duration is unknown. 54 duration time.Duration 55 56 // tags attached to this span 57 tags []Tag 58 59 // The span's "micro-log" 60 logs []opentracing.LogRecord 61 62 // references for this span 63 references []Reference 64 65 observer ContribSpanObserver 66} 67 68// Tag is a simple key value wrapper. 69// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead. 70type Tag struct { 71 key string 72 value interface{} 73} 74 75// NewTag creates a new Tag. 76// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead. 77func NewTag(key string, value interface{}) Tag { 78 return Tag{key: key, value: value} 79} 80 81// SetOperationName sets or changes the operation name. 82func (s *Span) SetOperationName(operationName string) opentracing.Span { 83 s.Lock() 84 s.operationName = operationName 85 s.Unlock() 86 if !s.isSamplingFinalized() { 87 decision := s.tracer.sampler.OnSetOperationName(s, operationName) 88 s.applySamplingDecision(decision, true) 89 } 90 s.observer.OnSetOperationName(operationName) 91 return s 92} 93 94// SetTag implements SetTag() of opentracing.Span 95func (s *Span) SetTag(key string, value interface{}) opentracing.Span { 96 return s.setTagInternal(key, value, true) 97} 98 99func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span { 100 s.observer.OnSetTag(key, value) 101 if key == string(ext.SamplingPriority) && !setSamplingPriority(s, value) { 102 return s 103 } 104 if !s.isSamplingFinalized() { 105 decision := s.tracer.sampler.OnSetTag(s, key, value) 106 s.applySamplingDecision(decision, lock) 107 } 108 if s.isWriteable() { 109 if lock { 110 s.Lock() 111 defer s.Unlock() 112 } 113 s.appendTagNoLocking(key, value) 114 } 115 return s 116} 117 118// SpanContext returns span context 119func (s *Span) SpanContext() SpanContext { 120 s.Lock() 121 defer s.Unlock() 122 return s.context 123} 124 125// StartTime returns span start time 126func (s *Span) StartTime() time.Time { 127 s.Lock() 128 defer s.Unlock() 129 return s.startTime 130} 131 132// Duration returns span duration 133func (s *Span) Duration() time.Duration { 134 s.Lock() 135 defer s.Unlock() 136 return s.duration 137} 138 139// Tags returns tags for span 140func (s *Span) Tags() opentracing.Tags { 141 s.Lock() 142 defer s.Unlock() 143 var result = make(opentracing.Tags, len(s.tags)) 144 for _, tag := range s.tags { 145 result[tag.key] = tag.value 146 } 147 return result 148} 149 150// Logs returns micro logs for span 151func (s *Span) Logs() []opentracing.LogRecord { 152 s.Lock() 153 defer s.Unlock() 154 155 return append([]opentracing.LogRecord(nil), s.logs...) 156} 157 158// References returns references for this span 159func (s *Span) References() []opentracing.SpanReference { 160 s.Lock() 161 defer s.Unlock() 162 163 if s.references == nil || len(s.references) == 0 { 164 return nil 165 } 166 167 result := make([]opentracing.SpanReference, len(s.references)) 168 for i, r := range s.references { 169 result[i] = opentracing.SpanReference{Type: r.Type, ReferencedContext: r.Context} 170 } 171 return result 172} 173 174func (s *Span) appendTagNoLocking(key string, value interface{}) { 175 s.tags = append(s.tags, Tag{key: key, value: value}) 176} 177 178// LogFields implements opentracing.Span API 179func (s *Span) LogFields(fields ...log.Field) { 180 s.Lock() 181 defer s.Unlock() 182 if !s.context.IsSampled() { 183 return 184 } 185 s.logFieldsNoLocking(fields...) 186} 187 188// this function should only be called while holding a Write lock 189func (s *Span) logFieldsNoLocking(fields ...log.Field) { 190 lr := opentracing.LogRecord{ 191 Fields: fields, 192 Timestamp: time.Now(), 193 } 194 s.appendLogNoLocking(lr) 195} 196 197// LogKV implements opentracing.Span API 198func (s *Span) LogKV(alternatingKeyValues ...interface{}) { 199 s.RLock() 200 sampled := s.context.IsSampled() 201 s.RUnlock() 202 if !sampled { 203 return 204 } 205 fields, err := log.InterleavedKVToFields(alternatingKeyValues...) 206 if err != nil { 207 s.LogFields(log.Error(err), log.String("function", "LogKV")) 208 return 209 } 210 s.LogFields(fields...) 211} 212 213// LogEvent implements opentracing.Span API 214func (s *Span) LogEvent(event string) { 215 s.Log(opentracing.LogData{Event: event}) 216} 217 218// LogEventWithPayload implements opentracing.Span API 219func (s *Span) LogEventWithPayload(event string, payload interface{}) { 220 s.Log(opentracing.LogData{Event: event, Payload: payload}) 221} 222 223// Log implements opentracing.Span API 224func (s *Span) Log(ld opentracing.LogData) { 225 s.Lock() 226 defer s.Unlock() 227 if s.context.IsSampled() { 228 if ld.Timestamp.IsZero() { 229 ld.Timestamp = s.tracer.timeNow() 230 } 231 s.appendLogNoLocking(ld.ToLogRecord()) 232 } 233} 234 235// this function should only be called while holding a Write lock 236func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) { 237 // TODO add logic to limit number of logs per span (issue #46) 238 s.logs = append(s.logs, lr) 239} 240 241// SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext 242func (s *Span) SetBaggageItem(key, value string) opentracing.Span { 243 s.Lock() 244 defer s.Unlock() 245 s.tracer.setBaggage(s, key, value) 246 return s 247} 248 249// BaggageItem implements BaggageItem() of opentracing.SpanContext 250func (s *Span) BaggageItem(key string) string { 251 s.RLock() 252 defer s.RUnlock() 253 return s.context.baggage[key] 254} 255 256// Finish implements opentracing.Span API 257// After finishing the Span object it returns back to the allocator unless the reporter retains it again, 258// so after that, the Span object should no longer be used because it won't be valid anymore. 259func (s *Span) Finish() { 260 s.FinishWithOptions(opentracing.FinishOptions{}) 261} 262 263// FinishWithOptions implements opentracing.Span API 264func (s *Span) FinishWithOptions(options opentracing.FinishOptions) { 265 if options.FinishTime.IsZero() { 266 options.FinishTime = s.tracer.timeNow() 267 } 268 s.observer.OnFinish(options) 269 s.Lock() 270 s.duration = options.FinishTime.Sub(s.startTime) 271 s.Unlock() 272 if !s.isSamplingFinalized() { 273 decision := s.tracer.sampler.OnFinishSpan(s) 274 s.applySamplingDecision(decision, true) 275 } 276 if s.context.IsSampled() { 277 if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 { 278 s.Lock() 279 // Note: bulk logs are not subject to maxLogsPerSpan limit 280 if options.LogRecords != nil { 281 s.logs = append(s.logs, options.LogRecords...) 282 } 283 for _, ld := range options.BulkLogData { 284 s.logs = append(s.logs, ld.ToLogRecord()) 285 } 286 s.Unlock() 287 } 288 } 289 // call reportSpan even for non-sampled traces, to return span to the pool 290 // and update metrics counter 291 s.tracer.reportSpan(s) 292} 293 294// Context implements opentracing.Span API 295func (s *Span) Context() opentracing.SpanContext { 296 s.Lock() 297 defer s.Unlock() 298 return s.context 299} 300 301// Tracer implements opentracing.Span API 302func (s *Span) Tracer() opentracing.Tracer { 303 return s.tracer 304} 305 306func (s *Span) String() string { 307 s.RLock() 308 defer s.RUnlock() 309 return s.context.String() 310} 311 312// OperationName allows retrieving current operation name. 313func (s *Span) OperationName() string { 314 s.RLock() 315 defer s.RUnlock() 316 return s.operationName 317} 318 319// Retain increases object counter to increase the lifetime of the object 320func (s *Span) Retain() *Span { 321 atomic.AddInt32(&s.referenceCounter, 1) 322 return s 323} 324 325// Release decrements object counter and return to the 326// allocator manager when counter will below zero 327func (s *Span) Release() { 328 if atomic.AddInt32(&s.referenceCounter, -1) == -1 { 329 s.tracer.spanAllocator.Put(s) 330 } 331} 332 333// reset span state and release unused data 334func (s *Span) reset() { 335 s.firstInProcess = false 336 s.context = emptyContext 337 s.operationName = "" 338 s.tracer = nil 339 s.startTime = time.Time{} 340 s.duration = 0 341 s.observer = nil 342 atomic.StoreInt32(&s.referenceCounter, 0) 343 344 // Note: To reuse memory we can save the pointers on the heap 345 s.tags = s.tags[:0] 346 s.logs = s.logs[:0] 347 s.references = s.references[:0] 348} 349 350func (s *Span) serviceName() string { 351 return s.tracer.serviceName 352} 353 354func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) { 355 if !decision.Retryable { 356 s.context.samplingState.setFinal() 357 } 358 if decision.Sample { 359 s.context.samplingState.setSampled() 360 if len(decision.Tags) > 0 { 361 if lock { 362 s.Lock() 363 defer s.Unlock() 364 } 365 for _, tag := range decision.Tags { 366 s.appendTagNoLocking(tag.key, tag.value) 367 } 368 } 369 } 370} 371 372// Span can be written to if it is sampled or the sampling decision has not been finalized. 373func (s *Span) isWriteable() bool { 374 state := s.context.samplingState 375 return !state.isFinal() || state.isSampled() 376} 377 378func (s *Span) isSamplingFinalized() bool { 379 return s.context.samplingState.isFinal() 380} 381 382// setSamplingPriority returns true if the flag was updated successfully, false otherwise. 383// The behavior of setSamplingPriority is surprising 384// If noDebugFlagOnForcedSampling is set 385// setSamplingPriority(span, 1) always sets only flagSampled 386// If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes 387// setSamplingPriority(span, 1) sets both flagSampled and flagDebug 388// However, 389// setSamplingPriority(span, 0) always only resets flagSampled 390// 391// This means that doing a setSamplingPriority(span, 1) followed by setSamplingPriority(span, 0) can 392// leave flagDebug set 393func setSamplingPriority(s *Span, value interface{}) bool { 394 val, ok := value.(uint16) 395 if !ok { 396 return false 397 } 398 if val == 0 { 399 s.context.samplingState.unsetSampled() 400 s.context.samplingState.setFinal() 401 return true 402 } 403 if s.tracer.options.noDebugFlagOnForcedSampling { 404 s.context.samplingState.setSampled() 405 s.context.samplingState.setFinal() 406 return true 407 } else if s.tracer.isDebugAllowed(s.operationName) { 408 s.context.samplingState.setDebugAndSampled() 409 s.context.samplingState.setFinal() 410 return true 411 } 412 return false 413} 414 415// EnableFirehose enables firehose flag on the span context 416func EnableFirehose(s *Span) { 417 s.Lock() 418 defer s.Unlock() 419 s.context.samplingState.setFirehose() 420} 421