1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package jaeger // import "go.opentelemetry.io/otel/exporters/trace/jaeger" 16 17import ( 18 "context" 19 "encoding/binary" 20 "fmt" 21 "sync" 22 23 "google.golang.org/api/support/bundler" 24 25 "go.opentelemetry.io/otel" 26 "go.opentelemetry.io/otel/codes" 27 gen "go.opentelemetry.io/otel/exporters/trace/jaeger/internal/gen-go/jaeger" 28 "go.opentelemetry.io/otel/label" 29 export "go.opentelemetry.io/otel/sdk/export/trace" 30 sdktrace "go.opentelemetry.io/otel/sdk/trace" 31 "go.opentelemetry.io/otel/trace" 32) 33 34const ( 35 defaultServiceName = "OpenTelemetry" 36 37 keyInstrumentationLibraryName = "otel.instrumentation_library.name" 38 keyInstrumentationLibraryVersion = "otel.instrumentation_library.version" 39) 40 41type Option func(*options) 42 43// options are the options to be used when initializing a Jaeger export. 44type options struct { 45 // Process contains the information about the exporting process. 46 Process Process 47 48 // BufferMaxCount defines the total number of traces that can be buffered in memory 49 BufferMaxCount int 50 51 // BatchMaxCount defines the maximum number of spans sent in one batch 52 BatchMaxCount int 53 54 Config *sdktrace.Config 55 56 Disabled bool 57} 58 59// WithProcess sets the process with the information about the exporting process. 60func WithProcess(process Process) Option { 61 return func(o *options) { 62 o.Process = process 63 } 64} 65 66// WithBufferMaxCount defines the total number of traces that can be buffered in memory 67func WithBufferMaxCount(bufferMaxCount int) Option { 68 return func(o *options) { 69 o.BufferMaxCount = bufferMaxCount 70 } 71} 72 73// WithBatchMaxCount defines the maximum number of spans in one batch 74func WithBatchMaxCount(batchMaxCount int) Option { 75 return func(o *options) { 76 o.BatchMaxCount = batchMaxCount 77 } 78} 79 80// WithSDK sets the SDK config for the exporter pipeline. 81func WithSDK(config *sdktrace.Config) Option { 82 return func(o *options) { 83 o.Config = config 84 } 85} 86 87// WithDisabled option will cause pipeline methods to use 88// a no-op provider 89func WithDisabled(disabled bool) Option { 90 return func(o *options) { 91 o.Disabled = disabled 92 } 93} 94 95// NewRawExporter returns an OTel Exporter implementation that exports the 96// collected spans to Jaeger. 97// 98// It will IGNORE Disabled option. 99func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) { 100 uploader, err := endpointOption() 101 if err != nil { 102 return nil, err 103 } 104 105 o := options{} 106 opts = append(opts, WithProcessFromEnv()) 107 for _, opt := range opts { 108 opt(&o) 109 } 110 111 service := o.Process.ServiceName 112 if service == "" { 113 service = defaultServiceName 114 } 115 tags := make([]*gen.Tag, 0, len(o.Process.Tags)) 116 for _, tag := range o.Process.Tags { 117 t := keyValueToTag(tag) 118 if t != nil { 119 tags = append(tags, t) 120 } 121 } 122 e := &Exporter{ 123 uploader: uploader, 124 process: &gen.Process{ 125 ServiceName: service, 126 Tags: tags, 127 }, 128 o: o, 129 } 130 bundler := bundler.NewBundler((*gen.Span)(nil), func(bundle interface{}) { 131 if err := e.upload(bundle.([]*gen.Span)); err != nil { 132 otel.Handle(err) 133 } 134 }) 135 136 // Set BufferedByteLimit with the total number of spans that are permissible to be held in memory. 137 // This needs to be done since the size of messages is always set to 1. Failing to set this would allow 138 // 1G messages to be held in memory since that is the default value of BufferedByteLimit. 139 if o.BufferMaxCount != 0 { 140 bundler.BufferedByteLimit = o.BufferMaxCount 141 } 142 143 // The default value bundler uses is 10, increase to send larger batches 144 if o.BatchMaxCount != 0 { 145 bundler.BundleCountThreshold = o.BatchMaxCount 146 } 147 148 e.bundler = bundler 149 return e, nil 150} 151 152// NewExportPipeline sets up a complete export pipeline 153// with the recommended setup for trace provider 154func NewExportPipeline(endpointOption EndpointOption, opts ...Option) (trace.TracerProvider, func(), error) { 155 o := options{} 156 opts = append(opts, WithDisabledFromEnv()) 157 for _, opt := range opts { 158 opt(&o) 159 } 160 if o.Disabled { 161 return trace.NewNoopTracerProvider(), func() {}, nil 162 } 163 164 exporter, err := NewRawExporter(endpointOption, opts...) 165 if err != nil { 166 return nil, nil, err 167 } 168 169 pOpts := []sdktrace.TracerProviderOption{sdktrace.WithSyncer(exporter)} 170 if exporter.o.Config != nil { 171 pOpts = append(pOpts, sdktrace.WithConfig(*exporter.o.Config)) 172 } 173 tp := sdktrace.NewTracerProvider(pOpts...) 174 return tp, exporter.Flush, nil 175} 176 177// InstallNewPipeline instantiates a NewExportPipeline with the 178// recommended configuration and registers it globally. 179func InstallNewPipeline(endpointOption EndpointOption, opts ...Option) (func(), error) { 180 tp, flushFn, err := NewExportPipeline(endpointOption, opts...) 181 if err != nil { 182 return nil, err 183 } 184 185 otel.SetTracerProvider(tp) 186 return flushFn, nil 187} 188 189// Process contains the information exported to jaeger about the source 190// of the trace data. 191type Process struct { 192 // ServiceName is the Jaeger service name. 193 ServiceName string 194 195 // Tags are added to Jaeger Process exports 196 Tags []label.KeyValue 197} 198 199// Exporter is an implementation of an OTel SpanSyncer that uploads spans to 200// Jaeger. 201type Exporter struct { 202 process *gen.Process 203 bundler *bundler.Bundler 204 uploader batchUploader 205 o options 206 207 stoppedMu sync.RWMutex 208 stopped bool 209} 210 211var _ export.SpanExporter = (*Exporter)(nil) 212 213// ExportSpans exports SpanData to Jaeger. 214func (e *Exporter) ExportSpans(ctx context.Context, spans []*export.SpanData) error { 215 e.stoppedMu.RLock() 216 stopped := e.stopped 217 e.stoppedMu.RUnlock() 218 if stopped { 219 return nil 220 } 221 222 for _, span := range spans { 223 // TODO(jbd): Handle oversized bundlers. 224 err := e.bundler.Add(spanDataToThrift(span), 1) 225 if err != nil { 226 return fmt.Errorf("failed to bundle %q: %w", span.Name, err) 227 } 228 } 229 return nil 230} 231 232// flush is used to wrap the bundler's Flush method for testing. 233var flush = func(e *Exporter) { 234 e.bundler.Flush() 235} 236 237// Shutdown stops the exporter flushing any pending exports. 238func (e *Exporter) Shutdown(ctx context.Context) error { 239 e.stoppedMu.Lock() 240 e.stopped = true 241 e.stoppedMu.Unlock() 242 243 done := make(chan struct{}, 1) 244 // Shadow so if the goroutine is leaked in testing it doesn't cause a race 245 // condition when the file level var is reset. 246 go func(FlushFunc func(*Exporter)) { 247 // The OpenTelemetry specification is explicit in not having this 248 // method block so the preference here is to orphan this goroutine if 249 // the context is canceled or times out while this flushing is 250 // occurring. This is a consequence of the bundler Flush method not 251 // supporting a context. 252 FlushFunc(e) 253 done <- struct{}{} 254 }(flush) 255 select { 256 case <-ctx.Done(): 257 return ctx.Err() 258 case <-done: 259 } 260 return nil 261} 262 263func spanDataToThrift(data *export.SpanData) *gen.Span { 264 tags := make([]*gen.Tag, 0, len(data.Attributes)) 265 for _, kv := range data.Attributes { 266 tag := keyValueToTag(kv) 267 if tag != nil { 268 tags = append(tags, tag) 269 } 270 } 271 272 // TODO (jmacd): OTel has a broad "last value wins" 273 // semantic. Should resources be appended before span 274 // attributes, above, to allow span attributes to 275 // overwrite resource attributes? 276 if data.Resource != nil { 277 for iter := data.Resource.Iter(); iter.Next(); { 278 if tag := keyValueToTag(iter.Attribute()); tag != nil { 279 tags = append(tags, tag) 280 } 281 } 282 } 283 if il := data.InstrumentationLibrary; il.Name != "" { 284 tags = append(tags, getStringTag(keyInstrumentationLibraryName, il.Name)) 285 if il.Version != "" { 286 tags = append(tags, getStringTag(keyInstrumentationLibraryVersion, il.Version)) 287 } 288 } 289 290 tags = append(tags, 291 getInt64Tag("status.code", int64(data.StatusCode)), 292 getStringTag("status.message", data.StatusMessage), 293 getStringTag("span.kind", data.SpanKind.String()), 294 ) 295 296 // Ensure that if Status.Code is not OK, that we set the "error" tag on the Jaeger span. 297 // See Issue https://github.com/census-instrumentation/opencensus-go/issues/1041 298 if data.StatusCode != codes.Ok && data.StatusCode != codes.Unset { 299 tags = append(tags, getBoolTag("error", true)) 300 } 301 302 var logs []*gen.Log 303 for _, a := range data.MessageEvents { 304 fields := make([]*gen.Tag, 0, len(a.Attributes)) 305 for _, kv := range a.Attributes { 306 tag := keyValueToTag(kv) 307 if tag != nil { 308 fields = append(fields, tag) 309 } 310 } 311 fields = append(fields, getStringTag("name", a.Name)) 312 logs = append(logs, &gen.Log{ 313 Timestamp: a.Time.UnixNano() / 1000, 314 Fields: fields, 315 }) 316 } 317 318 var refs []*gen.SpanRef 319 for _, link := range data.Links { 320 refs = append(refs, &gen.SpanRef{ 321 TraceIdHigh: int64(binary.BigEndian.Uint64(link.TraceID[0:8])), 322 TraceIdLow: int64(binary.BigEndian.Uint64(link.TraceID[8:16])), 323 SpanId: int64(binary.BigEndian.Uint64(link.SpanID[:])), 324 // TODO(paivagustavo): properly set the reference type when specs are defined 325 // see https://github.com/open-telemetry/opentelemetry-specification/issues/65 326 RefType: gen.SpanRefType_CHILD_OF, 327 }) 328 } 329 330 return &gen.Span{ 331 TraceIdHigh: int64(binary.BigEndian.Uint64(data.SpanContext.TraceID[0:8])), 332 TraceIdLow: int64(binary.BigEndian.Uint64(data.SpanContext.TraceID[8:16])), 333 SpanId: int64(binary.BigEndian.Uint64(data.SpanContext.SpanID[:])), 334 ParentSpanId: int64(binary.BigEndian.Uint64(data.ParentSpanID[:])), 335 OperationName: data.Name, // TODO: if span kind is added then add prefix "Sent"/"Recv" 336 Flags: int32(data.SpanContext.TraceFlags), 337 StartTime: data.StartTime.UnixNano() / 1000, 338 Duration: data.EndTime.Sub(data.StartTime).Nanoseconds() / 1000, 339 Tags: tags, 340 Logs: logs, 341 References: refs, 342 } 343} 344 345func keyValueToTag(keyValue label.KeyValue) *gen.Tag { 346 var tag *gen.Tag 347 switch keyValue.Value.Type() { 348 case label.STRING: 349 s := keyValue.Value.AsString() 350 tag = &gen.Tag{ 351 Key: string(keyValue.Key), 352 VStr: &s, 353 VType: gen.TagType_STRING, 354 } 355 case label.BOOL: 356 b := keyValue.Value.AsBool() 357 tag = &gen.Tag{ 358 Key: string(keyValue.Key), 359 VBool: &b, 360 VType: gen.TagType_BOOL, 361 } 362 case label.INT32: 363 i := int64(keyValue.Value.AsInt32()) 364 tag = &gen.Tag{ 365 Key: string(keyValue.Key), 366 VLong: &i, 367 VType: gen.TagType_LONG, 368 } 369 case label.INT64: 370 i := keyValue.Value.AsInt64() 371 tag = &gen.Tag{ 372 Key: string(keyValue.Key), 373 VLong: &i, 374 VType: gen.TagType_LONG, 375 } 376 case label.UINT32: 377 i := int64(keyValue.Value.AsUint32()) 378 tag = &gen.Tag{ 379 Key: string(keyValue.Key), 380 VLong: &i, 381 VType: gen.TagType_LONG, 382 } 383 case label.UINT64: 384 // we'll ignore the value if it overflows 385 if i := int64(keyValue.Value.AsUint64()); i >= 0 { 386 tag = &gen.Tag{ 387 Key: string(keyValue.Key), 388 VLong: &i, 389 VType: gen.TagType_LONG, 390 } 391 } 392 case label.FLOAT32: 393 f := float64(keyValue.Value.AsFloat32()) 394 tag = &gen.Tag{ 395 Key: string(keyValue.Key), 396 VDouble: &f, 397 VType: gen.TagType_DOUBLE, 398 } 399 case label.FLOAT64: 400 f := keyValue.Value.AsFloat64() 401 tag = &gen.Tag{ 402 Key: string(keyValue.Key), 403 VDouble: &f, 404 VType: gen.TagType_DOUBLE, 405 } 406 } 407 return tag 408} 409 410func getInt64Tag(k string, i int64) *gen.Tag { 411 return &gen.Tag{ 412 Key: k, 413 VLong: &i, 414 VType: gen.TagType_LONG, 415 } 416} 417 418func getStringTag(k, s string) *gen.Tag { 419 return &gen.Tag{ 420 Key: k, 421 VStr: &s, 422 VType: gen.TagType_STRING, 423 } 424} 425 426func getBoolTag(k string, b bool) *gen.Tag { 427 return &gen.Tag{ 428 Key: k, 429 VBool: &b, 430 VType: gen.TagType_BOOL, 431 } 432} 433 434// Flush waits for exported trace spans to be uploaded. 435// 436// This is useful if your program is ending and you do not want to lose recent spans. 437func (e *Exporter) Flush() { 438 flush(e) 439} 440 441func (e *Exporter) upload(spans []*gen.Span) error { 442 batch := &gen.Batch{ 443 Spans: spans, 444 Process: e.process, 445 } 446 447 return e.uploader.upload(batch) 448} 449