1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package jaeger // import "go.opentelemetry.io/otel/exporters/trace/jaeger" 16 17import ( 18 "context" 19 "encoding/binary" 20 "encoding/json" 21 "fmt" 22 "sync" 23 24 "go.opentelemetry.io/otel" 25 "go.opentelemetry.io/otel/attribute" 26 "go.opentelemetry.io/otel/codes" 27 gen "go.opentelemetry.io/otel/exporters/trace/jaeger/internal/gen-go/jaeger" 28 "go.opentelemetry.io/otel/sdk/resource" 29 sdktrace "go.opentelemetry.io/otel/sdk/trace" 30 "go.opentelemetry.io/otel/semconv" 31 "go.opentelemetry.io/otel/trace" 32) 33 34const ( 35 keyInstrumentationLibraryName = "otel.library.name" 36 keyInstrumentationLibraryVersion = "otel.library.version" 37 keyError = "error" 38 keySpanKind = "span.kind" 39 keyStatusCode = "otel.status_code" 40 keyStatusMessage = "otel.status_description" 41 keyDroppedAttributeCount = "otel.event.dropped_attributes_count" 42 keyEventName = "event" 43) 44 45// NewRawExporter returns an OTel Exporter implementation that exports the 46// collected spans to Jaeger. 47func NewRawExporter(endpointOption EndpointOption) (*Exporter, error) { 48 uploader, err := endpointOption() 49 if err != nil { 50 return nil, err 51 } 52 53 // Fetch default service.name from default resource for backup 54 var defaultServiceName string 55 defaultResource := resource.Default() 56 if value, exists := defaultResource.Set().Value(semconv.ServiceNameKey); exists { 57 defaultServiceName = value.AsString() 58 } 59 if defaultServiceName == "" { 60 return nil, fmt.Errorf("failed to get service name from default resource") 61 } 62 63 stopCh := make(chan struct{}) 64 e := &Exporter{ 65 uploader: uploader, 66 stopCh: stopCh, 67 defaultServiceName: defaultServiceName, 68 } 69 return e, nil 70} 71 72// NewExportPipeline sets up a complete export pipeline 73// with the recommended setup for trace provider 74func NewExportPipeline(endpointOption EndpointOption) (*sdktrace.TracerProvider, error) { 75 exporter, err := NewRawExporter(endpointOption) 76 if err != nil { 77 return nil, err 78 } 79 80 tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter)) 81 return tp, nil 82} 83 84// InstallNewPipeline instantiates a NewExportPipeline with the 85// recommended configuration and registers it globally. 86func InstallNewPipeline(endpointOption EndpointOption) (*sdktrace.TracerProvider, error) { 87 tp, err := NewExportPipeline(endpointOption) 88 if err != nil { 89 return tp, err 90 } 91 92 otel.SetTracerProvider(tp) 93 return tp, nil 94} 95 96// Exporter exports OpenTelemetry spans to a Jaeger agent or collector. 97type Exporter struct { 98 uploader batchUploader 99 stopOnce sync.Once 100 stopCh chan struct{} 101 defaultServiceName string 102} 103 104var _ sdktrace.SpanExporter = (*Exporter)(nil) 105 106// ExportSpans transforms and exports OpenTelemetry spans to Jaeger. 107func (e *Exporter) ExportSpans(ctx context.Context, spans []*sdktrace.SpanSnapshot) error { 108 // Return fast if context is already canceled or Exporter shutdown. 109 select { 110 case <-ctx.Done(): 111 return ctx.Err() 112 case <-e.stopCh: 113 return nil 114 default: 115 } 116 117 // Cancel export if Exporter is shutdown. 118 var cancel context.CancelFunc 119 ctx, cancel = context.WithCancel(ctx) 120 defer cancel() 121 go func(ctx context.Context, cancel context.CancelFunc) { 122 select { 123 case <-ctx.Done(): 124 case <-e.stopCh: 125 cancel() 126 } 127 }(ctx, cancel) 128 129 for _, batch := range jaegerBatchList(spans, e.defaultServiceName) { 130 if err := e.uploader.upload(ctx, batch); err != nil { 131 return err 132 } 133 } 134 135 return nil 136} 137 138// Shutdown stops the Exporter. This will close all connections and release 139// all resources held by the Exporter. 140func (e *Exporter) Shutdown(ctx context.Context) error { 141 // Stop any active and subsequent exports. 142 e.stopOnce.Do(func() { close(e.stopCh) }) 143 select { 144 case <-ctx.Done(): 145 return ctx.Err() 146 default: 147 } 148 return e.uploader.shutdown(ctx) 149} 150 151func spanSnapshotToThrift(ss *sdktrace.SpanSnapshot) *gen.Span { 152 tags := make([]*gen.Tag, 0, len(ss.Attributes)) 153 for _, kv := range ss.Attributes { 154 tag := keyValueToTag(kv) 155 if tag != nil { 156 tags = append(tags, tag) 157 } 158 } 159 160 if il := ss.InstrumentationLibrary; il.Name != "" { 161 tags = append(tags, getStringTag(keyInstrumentationLibraryName, il.Name)) 162 if il.Version != "" { 163 tags = append(tags, getStringTag(keyInstrumentationLibraryVersion, il.Version)) 164 } 165 } 166 167 if ss.SpanKind != trace.SpanKindInternal { 168 tags = append(tags, 169 getStringTag(keySpanKind, ss.SpanKind.String()), 170 ) 171 } 172 173 if ss.StatusCode != codes.Unset { 174 tags = append(tags, getInt64Tag(keyStatusCode, int64(ss.StatusCode))) 175 if ss.StatusMessage != "" { 176 tags = append(tags, getStringTag(keyStatusMessage, ss.StatusMessage)) 177 } 178 179 if ss.StatusCode == codes.Error { 180 tags = append(tags, getBoolTag(keyError, true)) 181 } 182 } 183 184 var logs []*gen.Log 185 for _, a := range ss.MessageEvents { 186 nTags := len(a.Attributes) 187 if a.Name != "" { 188 nTags++ 189 } 190 if a.DroppedAttributeCount != 0 { 191 nTags++ 192 } 193 fields := make([]*gen.Tag, 0, nTags) 194 if a.Name != "" { 195 // If an event contains an attribute with the same key, it needs 196 // to be given precedence and overwrite this. 197 fields = append(fields, getStringTag(keyEventName, a.Name)) 198 } 199 for _, kv := range a.Attributes { 200 tag := keyValueToTag(kv) 201 if tag != nil { 202 fields = append(fields, tag) 203 } 204 } 205 if a.DroppedAttributeCount != 0 { 206 fields = append(fields, getInt64Tag(keyDroppedAttributeCount, int64(a.DroppedAttributeCount))) 207 } 208 logs = append(logs, &gen.Log{ 209 Timestamp: a.Time.UnixNano() / 1000, 210 Fields: fields, 211 }) 212 } 213 214 var refs []*gen.SpanRef 215 for _, link := range ss.Links { 216 tid := link.TraceID() 217 sid := link.SpanID() 218 refs = append(refs, &gen.SpanRef{ 219 TraceIdHigh: int64(binary.BigEndian.Uint64(tid[0:8])), 220 TraceIdLow: int64(binary.BigEndian.Uint64(tid[8:16])), 221 SpanId: int64(binary.BigEndian.Uint64(sid[:])), 222 RefType: gen.SpanRefType_FOLLOWS_FROM, 223 }) 224 } 225 226 tid := ss.SpanContext.TraceID() 227 sid := ss.SpanContext.SpanID() 228 psid := ss.Parent.SpanID() 229 return &gen.Span{ 230 TraceIdHigh: int64(binary.BigEndian.Uint64(tid[0:8])), 231 TraceIdLow: int64(binary.BigEndian.Uint64(tid[8:16])), 232 SpanId: int64(binary.BigEndian.Uint64(sid[:])), 233 ParentSpanId: int64(binary.BigEndian.Uint64(psid[:])), 234 OperationName: ss.Name, // TODO: if span kind is added then add prefix "Sent"/"Recv" 235 Flags: int32(ss.SpanContext.TraceFlags()), 236 StartTime: ss.StartTime.UnixNano() / 1000, 237 Duration: ss.EndTime.Sub(ss.StartTime).Nanoseconds() / 1000, 238 Tags: tags, 239 Logs: logs, 240 References: refs, 241 } 242} 243 244func keyValueToTag(keyValue attribute.KeyValue) *gen.Tag { 245 var tag *gen.Tag 246 switch keyValue.Value.Type() { 247 case attribute.STRING: 248 s := keyValue.Value.AsString() 249 tag = &gen.Tag{ 250 Key: string(keyValue.Key), 251 VStr: &s, 252 VType: gen.TagType_STRING, 253 } 254 case attribute.BOOL: 255 b := keyValue.Value.AsBool() 256 tag = &gen.Tag{ 257 Key: string(keyValue.Key), 258 VBool: &b, 259 VType: gen.TagType_BOOL, 260 } 261 case attribute.INT64: 262 i := keyValue.Value.AsInt64() 263 tag = &gen.Tag{ 264 Key: string(keyValue.Key), 265 VLong: &i, 266 VType: gen.TagType_LONG, 267 } 268 case attribute.FLOAT64: 269 f := keyValue.Value.AsFloat64() 270 tag = &gen.Tag{ 271 Key: string(keyValue.Key), 272 VDouble: &f, 273 VType: gen.TagType_DOUBLE, 274 } 275 case attribute.ARRAY: 276 json, _ := json.Marshal(keyValue.Value.AsArray()) 277 a := (string)(json) 278 tag = &gen.Tag{ 279 Key: string(keyValue.Key), 280 VStr: &a, 281 VType: gen.TagType_STRING, 282 } 283 } 284 return tag 285} 286 287func getInt64Tag(k string, i int64) *gen.Tag { 288 return &gen.Tag{ 289 Key: k, 290 VLong: &i, 291 VType: gen.TagType_LONG, 292 } 293} 294 295func getStringTag(k, s string) *gen.Tag { 296 return &gen.Tag{ 297 Key: k, 298 VStr: &s, 299 VType: gen.TagType_STRING, 300 } 301} 302 303func getBoolTag(k string, b bool) *gen.Tag { 304 return &gen.Tag{ 305 Key: k, 306 VBool: &b, 307 VType: gen.TagType_BOOL, 308 } 309} 310 311// jaegerBatchList transforms a slice of SpanSnapshot into a slice of jaeger 312// Batch. 313func jaegerBatchList(ssl []*sdktrace.SpanSnapshot, defaultServiceName string) []*gen.Batch { 314 if len(ssl) == 0 { 315 return nil 316 } 317 318 batchDict := make(map[attribute.Distinct]*gen.Batch) 319 320 for _, ss := range ssl { 321 if ss == nil { 322 continue 323 } 324 325 resourceKey := ss.Resource.Equivalent() 326 batch, bOK := batchDict[resourceKey] 327 if !bOK { 328 batch = &gen.Batch{ 329 Process: process(ss.Resource, defaultServiceName), 330 Spans: []*gen.Span{}, 331 } 332 } 333 batch.Spans = append(batch.Spans, spanSnapshotToThrift(ss)) 334 batchDict[resourceKey] = batch 335 } 336 337 // Transform the categorized map into a slice 338 batchList := make([]*gen.Batch, 0, len(batchDict)) 339 for _, batch := range batchDict { 340 batchList = append(batchList, batch) 341 } 342 return batchList 343} 344 345// process transforms an OTel Resource into a jaeger Process. 346func process(res *resource.Resource, defaultServiceName string) *gen.Process { 347 var process gen.Process 348 349 var serviceName attribute.KeyValue 350 if res != nil { 351 for iter := res.Iter(); iter.Next(); { 352 if iter.Attribute().Key == semconv.ServiceNameKey { 353 serviceName = iter.Attribute() 354 // Don't convert service.name into tag. 355 continue 356 } 357 if tag := keyValueToTag(iter.Attribute()); tag != nil { 358 process.Tags = append(process.Tags, tag) 359 } 360 } 361 } 362 363 // If no service.name is contained in a Span's Resource, 364 // that field MUST be populated from the default Resource. 365 if serviceName.Value.AsString() == "" { 366 serviceName = semconv.ServiceNameKey.String(defaultServiceName) 367 } 368 process.ServiceName = serviceName.Value.AsString() 369 370 return &process 371} 372