1// Copyright (c) 2017-2018 Uber Technologies, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package jaeger 16 17import ( 18 "sync" 19 "sync/atomic" 20 "time" 21 22 "github.com/opentracing/opentracing-go" 23 "github.com/opentracing/opentracing-go/ext" 24 "github.com/opentracing/opentracing-go/log" 25) 26 27// Span implements opentracing.Span 28type Span struct { 29 // referenceCounter used to increase the lifetime of 30 // the object before return it into the pool. 31 referenceCounter int32 32 33 sync.RWMutex 34 35 tracer *Tracer 36 37 // TODO: (breaking change) change to use a pointer 38 context SpanContext 39 40 // The name of the "operation" this span is an instance of. 41 // Known as a "span name" in some implementations. 42 operationName string 43 44 // firstInProcess, if true, indicates that this span is the root of the (sub)tree 45 // of spans in the current process. In other words it's true for the root spans, 46 // and the ingress spans when the process joins another trace. 47 firstInProcess bool 48 49 // startTime is the timestamp indicating when the span began, with microseconds precision. 50 startTime time.Time 51 52 // duration returns duration of the span with microseconds precision. 53 // Zero value means duration is unknown. 54 duration time.Duration 55 56 // tags attached to this span 57 tags []Tag 58 59 // The span's "micro-log" 60 logs []opentracing.LogRecord 61 62 // The number of logs dropped because of MaxLogsPerSpan. 63 numDroppedLogs int 64 65 // references for this span 66 references []Reference 67 68 observer ContribSpanObserver 69} 70 71// Tag is a simple key value wrapper. 72// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead. 73type Tag struct { 74 key string 75 value interface{} 76} 77 78// NewTag creates a new Tag. 79// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead. 80func NewTag(key string, value interface{}) Tag { 81 return Tag{key: key, value: value} 82} 83 84// SetOperationName sets or changes the operation name. 85func (s *Span) SetOperationName(operationName string) opentracing.Span { 86 s.Lock() 87 s.operationName = operationName 88 ctx := s.context 89 s.Unlock() 90 if !ctx.isSamplingFinalized() { 91 decision := s.tracer.sampler.OnSetOperationName(s, operationName) 92 s.applySamplingDecision(decision, true) 93 } 94 s.observer.OnSetOperationName(operationName) 95 return s 96} 97 98// SetTag implements SetTag() of opentracing.Span 99func (s *Span) SetTag(key string, value interface{}) opentracing.Span { 100 return s.setTagInternal(key, value, true) 101} 102 103func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span { 104 var ctx SpanContext 105 var operationName string 106 if lock { 107 ctx = s.SpanContext() 108 operationName = s.OperationName() 109 } else { 110 ctx = s.context 111 operationName = s.operationName 112 } 113 114 s.observer.OnSetTag(key, value) 115 if key == string(ext.SamplingPriority) && !setSamplingPriority(ctx.samplingState, operationName, s.tracer, value) { 116 return s 117 } 118 if !ctx.isSamplingFinalized() { 119 decision := s.tracer.sampler.OnSetTag(s, key, value) 120 s.applySamplingDecision(decision, lock) 121 } 122 if ctx.isWriteable() { 123 if lock { 124 s.Lock() 125 defer s.Unlock() 126 } 127 s.appendTagNoLocking(key, value) 128 } 129 return s 130} 131 132// SpanContext returns span context 133func (s *Span) SpanContext() SpanContext { 134 s.Lock() 135 defer s.Unlock() 136 return s.context 137} 138 139// StartTime returns span start time 140func (s *Span) StartTime() time.Time { 141 s.Lock() 142 defer s.Unlock() 143 return s.startTime 144} 145 146// Duration returns span duration 147func (s *Span) Duration() time.Duration { 148 s.Lock() 149 defer s.Unlock() 150 return s.duration 151} 152 153// Tags returns tags for span 154func (s *Span) Tags() opentracing.Tags { 155 s.Lock() 156 defer s.Unlock() 157 var result = make(opentracing.Tags, len(s.tags)) 158 for _, tag := range s.tags { 159 result[tag.key] = tag.value 160 } 161 return result 162} 163 164// Logs returns micro logs for span 165func (s *Span) Logs() []opentracing.LogRecord { 166 s.Lock() 167 defer s.Unlock() 168 169 logs := append([]opentracing.LogRecord(nil), s.logs...) 170 if s.numDroppedLogs != 0 { 171 fixLogs(logs, s.numDroppedLogs) 172 } 173 174 return logs 175} 176 177// References returns references for this span 178func (s *Span) References() []opentracing.SpanReference { 179 s.Lock() 180 defer s.Unlock() 181 182 if s.references == nil || len(s.references) == 0 { 183 return nil 184 } 185 186 result := make([]opentracing.SpanReference, len(s.references)) 187 for i, r := range s.references { 188 result[i] = opentracing.SpanReference{Type: r.Type, ReferencedContext: r.Context} 189 } 190 return result 191} 192 193func (s *Span) appendTagNoLocking(key string, value interface{}) { 194 s.tags = append(s.tags, Tag{key: key, value: value}) 195} 196 197// LogFields implements opentracing.Span API 198func (s *Span) LogFields(fields ...log.Field) { 199 s.Lock() 200 defer s.Unlock() 201 if !s.context.IsSampled() { 202 return 203 } 204 s.logFieldsNoLocking(fields...) 205} 206 207// this function should only be called while holding a Write lock 208func (s *Span) logFieldsNoLocking(fields ...log.Field) { 209 lr := opentracing.LogRecord{ 210 Fields: fields, 211 Timestamp: time.Now(), 212 } 213 s.appendLogNoLocking(lr) 214} 215 216// LogKV implements opentracing.Span API 217func (s *Span) LogKV(alternatingKeyValues ...interface{}) { 218 s.RLock() 219 sampled := s.context.IsSampled() 220 s.RUnlock() 221 if !sampled { 222 return 223 } 224 fields, err := log.InterleavedKVToFields(alternatingKeyValues...) 225 if err != nil { 226 s.LogFields(log.Error(err), log.String("function", "LogKV")) 227 return 228 } 229 s.LogFields(fields...) 230} 231 232// LogEvent implements opentracing.Span API 233func (s *Span) LogEvent(event string) { 234 s.Log(opentracing.LogData{Event: event}) 235} 236 237// LogEventWithPayload implements opentracing.Span API 238func (s *Span) LogEventWithPayload(event string, payload interface{}) { 239 s.Log(opentracing.LogData{Event: event, Payload: payload}) 240} 241 242// Log implements opentracing.Span API 243func (s *Span) Log(ld opentracing.LogData) { 244 s.Lock() 245 defer s.Unlock() 246 if s.context.IsSampled() { 247 if ld.Timestamp.IsZero() { 248 ld.Timestamp = s.tracer.timeNow() 249 } 250 s.appendLogNoLocking(ld.ToLogRecord()) 251 } 252} 253 254// this function should only be called while holding a Write lock 255func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) { 256 maxLogs := s.tracer.options.maxLogsPerSpan 257 if maxLogs == 0 || len(s.logs) < maxLogs { 258 s.logs = append(s.logs, lr) 259 return 260 } 261 262 // We have too many logs. We don't touch the first numOld logs; we treat the 263 // rest as a circular buffer and overwrite the oldest log among those. 264 numOld := (maxLogs - 1) / 2 265 numNew := maxLogs - numOld 266 s.logs[numOld+s.numDroppedLogs%numNew] = lr 267 s.numDroppedLogs++ 268} 269 270// rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at 271// the end (i.e. pos circular left shifts). 272func rotateLogBuffer(buf []opentracing.LogRecord, pos int) { 273 // This algorithm is described in: 274 // http://www.cplusplus.com/reference/algorithm/rotate 275 for first, middle, next := 0, pos, pos; first != middle; { 276 buf[first], buf[next] = buf[next], buf[first] 277 first++ 278 next++ 279 if next == len(buf) { 280 next = middle 281 } else if first == middle { 282 middle = next 283 } 284 } 285} 286 287func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) { 288 // We dropped some log events, which means that we used part of Logs as a 289 // circular buffer (see appendLog). De-circularize it. 290 numOld := (len(logs) - 1) / 2 291 numNew := len(logs) - numOld 292 rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew) 293 294 // Replace the log in the middle (the oldest "new" log) with information 295 // about the dropped logs. This means that we are effectively dropping one 296 // more "new" log. 297 numDropped := numDroppedLogs + 1 298 logs[numOld] = opentracing.LogRecord{ 299 // Keep the timestamp of the last dropped event. 300 Timestamp: logs[numOld].Timestamp, 301 Fields: []log.Field{ 302 log.String("event", "dropped Span logs"), 303 log.Int("dropped_log_count", numDropped), 304 log.String("component", "jaeger-client"), 305 }, 306 } 307} 308 309func (s *Span) fixLogsIfDropped() { 310 if s.numDroppedLogs == 0 { 311 return 312 } 313 fixLogs(s.logs, s.numDroppedLogs) 314 s.numDroppedLogs = 0 315} 316 317// SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext. 318// The call is proxied via tracer.baggageSetter to allow policies to be applied 319// before allowing to set/replace baggage keys. 320// The setter eventually stores a new SpanContext with extended baggage: 321// 322// span.context = span.context.WithBaggageItem(key, value) 323// 324// See SpanContext.WithBaggageItem() for explanation why it's done this way. 325func (s *Span) SetBaggageItem(key, value string) opentracing.Span { 326 s.Lock() 327 defer s.Unlock() 328 s.tracer.setBaggage(s, key, value) 329 return s 330} 331 332// BaggageItem implements BaggageItem() of opentracing.SpanContext 333func (s *Span) BaggageItem(key string) string { 334 s.RLock() 335 defer s.RUnlock() 336 return s.context.baggage[key] 337} 338 339// Finish implements opentracing.Span API 340// After finishing the Span object it returns back to the allocator unless the reporter retains it again, 341// so after that, the Span object should no longer be used because it won't be valid anymore. 342func (s *Span) Finish() { 343 s.FinishWithOptions(opentracing.FinishOptions{}) 344} 345 346// FinishWithOptions implements opentracing.Span API 347func (s *Span) FinishWithOptions(options opentracing.FinishOptions) { 348 if options.FinishTime.IsZero() { 349 options.FinishTime = s.tracer.timeNow() 350 } 351 s.observer.OnFinish(options) 352 s.Lock() 353 s.duration = options.FinishTime.Sub(s.startTime) 354 ctx := s.context 355 s.Unlock() 356 if !ctx.isSamplingFinalized() { 357 decision := s.tracer.sampler.OnFinishSpan(s) 358 s.applySamplingDecision(decision, true) 359 } 360 if ctx.IsSampled() { 361 s.Lock() 362 s.fixLogsIfDropped() 363 if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 { 364 // Note: bulk logs are not subject to maxLogsPerSpan limit 365 if options.LogRecords != nil { 366 s.logs = append(s.logs, options.LogRecords...) 367 } 368 for _, ld := range options.BulkLogData { 369 s.logs = append(s.logs, ld.ToLogRecord()) 370 } 371 } 372 s.Unlock() 373 } 374 // call reportSpan even for non-sampled traces, to return span to the pool 375 // and update metrics counter 376 s.tracer.reportSpan(s) 377} 378 379// Context implements opentracing.Span API 380func (s *Span) Context() opentracing.SpanContext { 381 s.Lock() 382 defer s.Unlock() 383 return s.context 384} 385 386// Tracer implements opentracing.Span API 387func (s *Span) Tracer() opentracing.Tracer { 388 return s.tracer 389} 390 391func (s *Span) String() string { 392 s.RLock() 393 defer s.RUnlock() 394 return s.context.String() 395} 396 397// OperationName allows retrieving current operation name. 398func (s *Span) OperationName() string { 399 s.RLock() 400 defer s.RUnlock() 401 return s.operationName 402} 403 404// Retain increases object counter to increase the lifetime of the object 405func (s *Span) Retain() *Span { 406 atomic.AddInt32(&s.referenceCounter, 1) 407 return s 408} 409 410// Release decrements object counter and return to the 411// allocator manager when counter will below zero 412func (s *Span) Release() { 413 if atomic.AddInt32(&s.referenceCounter, -1) == -1 { 414 s.tracer.spanAllocator.Put(s) 415 } 416} 417 418// reset span state and release unused data 419func (s *Span) reset() { 420 s.firstInProcess = false 421 s.context = emptyContext 422 s.operationName = "" 423 s.tracer = nil 424 s.startTime = time.Time{} 425 s.duration = 0 426 s.observer = nil 427 atomic.StoreInt32(&s.referenceCounter, 0) 428 429 // Note: To reuse memory we can save the pointers on the heap 430 s.tags = s.tags[:0] 431 s.logs = s.logs[:0] 432 s.numDroppedLogs = 0 433 s.references = s.references[:0] 434} 435 436func (s *Span) serviceName() string { 437 return s.tracer.serviceName 438} 439 440func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) { 441 var ctx SpanContext 442 if lock { 443 ctx = s.SpanContext() 444 } else { 445 ctx = s.context 446 } 447 448 if !decision.Retryable { 449 ctx.samplingState.setFinal() 450 } 451 if decision.Sample { 452 ctx.samplingState.setSampled() 453 if len(decision.Tags) > 0 { 454 if lock { 455 s.Lock() 456 defer s.Unlock() 457 } 458 for _, tag := range decision.Tags { 459 s.appendTagNoLocking(tag.key, tag.value) 460 } 461 } 462 } 463} 464 465// setSamplingPriority returns true if the flag was updated successfully, false otherwise. 466// The behavior of setSamplingPriority is surprising 467// If noDebugFlagOnForcedSampling is set 468// setSamplingPriority(..., 1) always sets only flagSampled 469// If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes 470// setSamplingPriority(..., 1) sets both flagSampled and flagDebug 471// However, 472// setSamplingPriority(..., 0) always only resets flagSampled 473// 474// This means that doing a setSamplingPriority(..., 1) followed by setSamplingPriority(..., 0) can 475// leave flagDebug set 476func setSamplingPriority(state *samplingState, operationName string, tracer *Tracer, value interface{}) bool { 477 val, ok := value.(uint16) 478 if !ok { 479 return false 480 } 481 if val == 0 { 482 state.unsetSampled() 483 state.setFinal() 484 return true 485 } 486 if tracer.options.noDebugFlagOnForcedSampling { 487 state.setSampled() 488 state.setFinal() 489 return true 490 } else if tracer.isDebugAllowed(operationName) { 491 state.setDebugAndSampled() 492 state.setFinal() 493 return true 494 } 495 return false 496} 497 498// EnableFirehose enables firehose flag on the span context 499func EnableFirehose(s *Span) { 500 s.Lock() 501 defer s.Unlock() 502 s.context.samplingState.setFirehose() 503} 504