1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package jaeger // import "go.opentelemetry.io/otel/exporters/jaeger" 16 17import ( 18 "context" 19 "encoding/binary" 20 "encoding/json" 21 "fmt" 22 "sync" 23 24 "go.opentelemetry.io/otel/attribute" 25 "go.opentelemetry.io/otel/codes" 26 gen "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/jaeger" 27 "go.opentelemetry.io/otel/sdk/resource" 28 sdktrace "go.opentelemetry.io/otel/sdk/trace" 29 semconv "go.opentelemetry.io/otel/semconv/v1.7.0" 30 "go.opentelemetry.io/otel/trace" 31) 32 33const ( 34 keyInstrumentationLibraryName = "otel.library.name" 35 keyInstrumentationLibraryVersion = "otel.library.version" 36 keyError = "error" 37 keySpanKind = "span.kind" 38 keyStatusCode = "otel.status_code" 39 keyStatusMessage = "otel.status_description" 40 keyDroppedAttributeCount = "otel.event.dropped_attributes_count" 41 keyEventName = "event" 42) 43 44// New returns an OTel Exporter implementation that exports the collected 45// spans to Jaeger. 46func New(endpointOption EndpointOption) (*Exporter, error) { 47 uploader, err := endpointOption.newBatchUploader() 48 if err != nil { 49 return nil, err 50 } 51 52 // Fetch default service.name from default resource for backup 53 var defaultServiceName string 54 defaultResource := resource.Default() 55 if value, exists := defaultResource.Set().Value(semconv.ServiceNameKey); exists { 56 defaultServiceName = value.AsString() 57 } 58 if defaultServiceName == "" { 59 return nil, fmt.Errorf("failed to get service name from default resource") 60 } 61 62 stopCh := make(chan struct{}) 63 e := &Exporter{ 64 uploader: uploader, 65 stopCh: stopCh, 66 defaultServiceName: defaultServiceName, 67 } 68 return e, nil 69} 70 71// Exporter exports OpenTelemetry spans to a Jaeger agent or collector. 72type Exporter struct { 73 uploader batchUploader 74 stopOnce sync.Once 75 stopCh chan struct{} 76 defaultServiceName string 77} 78 79var _ sdktrace.SpanExporter = (*Exporter)(nil) 80 81// ExportSpans transforms and exports OpenTelemetry spans to Jaeger. 82func (e *Exporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { 83 // Return fast if context is already canceled or Exporter shutdown. 84 select { 85 case <-ctx.Done(): 86 return ctx.Err() 87 case <-e.stopCh: 88 return nil 89 default: 90 } 91 92 // Cancel export if Exporter is shutdown. 93 var cancel context.CancelFunc 94 ctx, cancel = context.WithCancel(ctx) 95 defer cancel() 96 go func(ctx context.Context, cancel context.CancelFunc) { 97 select { 98 case <-ctx.Done(): 99 case <-e.stopCh: 100 cancel() 101 } 102 }(ctx, cancel) 103 104 for _, batch := range jaegerBatchList(spans, e.defaultServiceName) { 105 if err := e.uploader.upload(ctx, batch); err != nil { 106 return err 107 } 108 } 109 110 return nil 111} 112 113// Shutdown stops the Exporter. This will close all connections and release 114// all resources held by the Exporter. 115func (e *Exporter) Shutdown(ctx context.Context) error { 116 // Stop any active and subsequent exports. 117 e.stopOnce.Do(func() { close(e.stopCh) }) 118 select { 119 case <-ctx.Done(): 120 return ctx.Err() 121 default: 122 } 123 return e.uploader.shutdown(ctx) 124} 125 126func spanToThrift(ss sdktrace.ReadOnlySpan) *gen.Span { 127 attr := ss.Attributes() 128 tags := make([]*gen.Tag, 0, len(attr)) 129 for _, kv := range attr { 130 tag := keyValueToTag(kv) 131 if tag != nil { 132 tags = append(tags, tag) 133 } 134 } 135 136 if il := ss.InstrumentationLibrary(); il.Name != "" { 137 tags = append(tags, getStringTag(keyInstrumentationLibraryName, il.Name)) 138 if il.Version != "" { 139 tags = append(tags, getStringTag(keyInstrumentationLibraryVersion, il.Version)) 140 } 141 } 142 143 if ss.SpanKind() != trace.SpanKindInternal { 144 tags = append(tags, 145 getStringTag(keySpanKind, ss.SpanKind().String()), 146 ) 147 } 148 149 if ss.Status().Code != codes.Unset { 150 tags = append(tags, getInt64Tag(keyStatusCode, int64(ss.Status().Code))) 151 if ss.Status().Description != "" { 152 tags = append(tags, getStringTag(keyStatusMessage, ss.Status().Description)) 153 } 154 155 if ss.Status().Code == codes.Error { 156 tags = append(tags, getBoolTag(keyError, true)) 157 } 158 } 159 160 var logs []*gen.Log 161 for _, a := range ss.Events() { 162 nTags := len(a.Attributes) 163 if a.Name != "" { 164 nTags++ 165 } 166 if a.DroppedAttributeCount != 0 { 167 nTags++ 168 } 169 fields := make([]*gen.Tag, 0, nTags) 170 if a.Name != "" { 171 // If an event contains an attribute with the same key, it needs 172 // to be given precedence and overwrite this. 173 fields = append(fields, getStringTag(keyEventName, a.Name)) 174 } 175 for _, kv := range a.Attributes { 176 tag := keyValueToTag(kv) 177 if tag != nil { 178 fields = append(fields, tag) 179 } 180 } 181 if a.DroppedAttributeCount != 0 { 182 fields = append(fields, getInt64Tag(keyDroppedAttributeCount, int64(a.DroppedAttributeCount))) 183 } 184 logs = append(logs, &gen.Log{ 185 Timestamp: a.Time.UnixNano() / 1000, 186 Fields: fields, 187 }) 188 } 189 190 var refs []*gen.SpanRef 191 for _, link := range ss.Links() { 192 tid := link.SpanContext.TraceID() 193 sid := link.SpanContext.SpanID() 194 refs = append(refs, &gen.SpanRef{ 195 TraceIdHigh: int64(binary.BigEndian.Uint64(tid[0:8])), 196 TraceIdLow: int64(binary.BigEndian.Uint64(tid[8:16])), 197 SpanId: int64(binary.BigEndian.Uint64(sid[:])), 198 RefType: gen.SpanRefType_FOLLOWS_FROM, 199 }) 200 } 201 202 tid := ss.SpanContext().TraceID() 203 sid := ss.SpanContext().SpanID() 204 psid := ss.Parent().SpanID() 205 return &gen.Span{ 206 TraceIdHigh: int64(binary.BigEndian.Uint64(tid[0:8])), 207 TraceIdLow: int64(binary.BigEndian.Uint64(tid[8:16])), 208 SpanId: int64(binary.BigEndian.Uint64(sid[:])), 209 ParentSpanId: int64(binary.BigEndian.Uint64(psid[:])), 210 OperationName: ss.Name(), // TODO: if span kind is added then add prefix "Sent"/"Recv" 211 Flags: int32(ss.SpanContext().TraceFlags()), 212 StartTime: ss.StartTime().UnixNano() / 1000, 213 Duration: ss.EndTime().Sub(ss.StartTime()).Nanoseconds() / 1000, 214 Tags: tags, 215 Logs: logs, 216 References: refs, 217 } 218} 219 220func keyValueToTag(keyValue attribute.KeyValue) *gen.Tag { 221 var tag *gen.Tag 222 switch keyValue.Value.Type() { 223 case attribute.STRING: 224 s := keyValue.Value.AsString() 225 tag = &gen.Tag{ 226 Key: string(keyValue.Key), 227 VStr: &s, 228 VType: gen.TagType_STRING, 229 } 230 case attribute.BOOL: 231 b := keyValue.Value.AsBool() 232 tag = &gen.Tag{ 233 Key: string(keyValue.Key), 234 VBool: &b, 235 VType: gen.TagType_BOOL, 236 } 237 case attribute.INT64: 238 i := keyValue.Value.AsInt64() 239 tag = &gen.Tag{ 240 Key: string(keyValue.Key), 241 VLong: &i, 242 VType: gen.TagType_LONG, 243 } 244 case attribute.FLOAT64: 245 f := keyValue.Value.AsFloat64() 246 tag = &gen.Tag{ 247 Key: string(keyValue.Key), 248 VDouble: &f, 249 VType: gen.TagType_DOUBLE, 250 } 251 case attribute.BOOLSLICE, 252 attribute.INT64SLICE, 253 attribute.FLOAT64SLICE, 254 attribute.STRINGSLICE: 255 json, _ := json.Marshal(keyValue.Value.AsInterface()) 256 a := (string)(json) 257 tag = &gen.Tag{ 258 Key: string(keyValue.Key), 259 VStr: &a, 260 VType: gen.TagType_STRING, 261 } 262 } 263 return tag 264} 265 266func getInt64Tag(k string, i int64) *gen.Tag { 267 return &gen.Tag{ 268 Key: k, 269 VLong: &i, 270 VType: gen.TagType_LONG, 271 } 272} 273 274func getStringTag(k, s string) *gen.Tag { 275 return &gen.Tag{ 276 Key: k, 277 VStr: &s, 278 VType: gen.TagType_STRING, 279 } 280} 281 282func getBoolTag(k string, b bool) *gen.Tag { 283 return &gen.Tag{ 284 Key: k, 285 VBool: &b, 286 VType: gen.TagType_BOOL, 287 } 288} 289 290// jaegerBatchList transforms a slice of spans into a slice of jaeger Batch. 291func jaegerBatchList(ssl []sdktrace.ReadOnlySpan, defaultServiceName string) []*gen.Batch { 292 if len(ssl) == 0 { 293 return nil 294 } 295 296 batchDict := make(map[attribute.Distinct]*gen.Batch) 297 298 for _, ss := range ssl { 299 if ss == nil { 300 continue 301 } 302 303 resourceKey := ss.Resource().Equivalent() 304 batch, bOK := batchDict[resourceKey] 305 if !bOK { 306 batch = &gen.Batch{ 307 Process: process(ss.Resource(), defaultServiceName), 308 Spans: []*gen.Span{}, 309 } 310 } 311 batch.Spans = append(batch.Spans, spanToThrift(ss)) 312 batchDict[resourceKey] = batch 313 } 314 315 // Transform the categorized map into a slice 316 batchList := make([]*gen.Batch, 0, len(batchDict)) 317 for _, batch := range batchDict { 318 batchList = append(batchList, batch) 319 } 320 return batchList 321} 322 323// process transforms an OTel Resource into a jaeger Process. 324func process(res *resource.Resource, defaultServiceName string) *gen.Process { 325 var process gen.Process 326 327 var serviceName attribute.KeyValue 328 if res != nil { 329 for iter := res.Iter(); iter.Next(); { 330 if iter.Attribute().Key == semconv.ServiceNameKey { 331 serviceName = iter.Attribute() 332 // Don't convert service.name into tag. 333 continue 334 } 335 if tag := keyValueToTag(iter.Attribute()); tag != nil { 336 process.Tags = append(process.Tags, tag) 337 } 338 } 339 } 340 341 // If no service.name is contained in a Span's Resource, 342 // that field MUST be populated from the default Resource. 343 if serviceName.Value.AsString() == "" { 344 serviceName = semconv.ServiceNameKey.String(defaultServiceName) 345 } 346 process.ServiceName = serviceName.Value.AsString() 347 348 return &process 349} 350