1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger // import "go.opentelemetry.io/otel/exporters/jaeger"
16
17import (
18	"context"
19	"encoding/binary"
20	"encoding/json"
21	"fmt"
22	"sync"
23
24	"go.opentelemetry.io/otel/attribute"
25	"go.opentelemetry.io/otel/codes"
26	gen "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/jaeger"
27	"go.opentelemetry.io/otel/sdk/resource"
28	sdktrace "go.opentelemetry.io/otel/sdk/trace"
29	semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
30	"go.opentelemetry.io/otel/trace"
31)
32
33const (
34	keyInstrumentationLibraryName    = "otel.library.name"
35	keyInstrumentationLibraryVersion = "otel.library.version"
36	keyError                         = "error"
37	keySpanKind                      = "span.kind"
38	keyStatusCode                    = "otel.status_code"
39	keyStatusMessage                 = "otel.status_description"
40	keyDroppedAttributeCount         = "otel.event.dropped_attributes_count"
41	keyEventName                     = "event"
42)
43
44// New returns an OTel Exporter implementation that exports the collected
45// spans to Jaeger.
46func New(endpointOption EndpointOption) (*Exporter, error) {
47	uploader, err := endpointOption.newBatchUploader()
48	if err != nil {
49		return nil, err
50	}
51
52	// Fetch default service.name from default resource for backup
53	var defaultServiceName string
54	defaultResource := resource.Default()
55	if value, exists := defaultResource.Set().Value(semconv.ServiceNameKey); exists {
56		defaultServiceName = value.AsString()
57	}
58	if defaultServiceName == "" {
59		return nil, fmt.Errorf("failed to get service name from default resource")
60	}
61
62	stopCh := make(chan struct{})
63	e := &Exporter{
64		uploader:           uploader,
65		stopCh:             stopCh,
66		defaultServiceName: defaultServiceName,
67	}
68	return e, nil
69}
70
71// Exporter exports OpenTelemetry spans to a Jaeger agent or collector.
72type Exporter struct {
73	uploader           batchUploader
74	stopOnce           sync.Once
75	stopCh             chan struct{}
76	defaultServiceName string
77}
78
79var _ sdktrace.SpanExporter = (*Exporter)(nil)
80
81// ExportSpans transforms and exports OpenTelemetry spans to Jaeger.
82func (e *Exporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
83	// Return fast if context is already canceled or Exporter shutdown.
84	select {
85	case <-ctx.Done():
86		return ctx.Err()
87	case <-e.stopCh:
88		return nil
89	default:
90	}
91
92	// Cancel export if Exporter is shutdown.
93	var cancel context.CancelFunc
94	ctx, cancel = context.WithCancel(ctx)
95	defer cancel()
96	go func(ctx context.Context, cancel context.CancelFunc) {
97		select {
98		case <-ctx.Done():
99		case <-e.stopCh:
100			cancel()
101		}
102	}(ctx, cancel)
103
104	for _, batch := range jaegerBatchList(spans, e.defaultServiceName) {
105		if err := e.uploader.upload(ctx, batch); err != nil {
106			return err
107		}
108	}
109
110	return nil
111}
112
113// Shutdown stops the Exporter. This will close all connections and release
114// all resources held by the Exporter.
115func (e *Exporter) Shutdown(ctx context.Context) error {
116	// Stop any active and subsequent exports.
117	e.stopOnce.Do(func() { close(e.stopCh) })
118	select {
119	case <-ctx.Done():
120		return ctx.Err()
121	default:
122	}
123	return e.uploader.shutdown(ctx)
124}
125
126func spanToThrift(ss sdktrace.ReadOnlySpan) *gen.Span {
127	attr := ss.Attributes()
128	tags := make([]*gen.Tag, 0, len(attr))
129	for _, kv := range attr {
130		tag := keyValueToTag(kv)
131		if tag != nil {
132			tags = append(tags, tag)
133		}
134	}
135
136	if il := ss.InstrumentationLibrary(); il.Name != "" {
137		tags = append(tags, getStringTag(keyInstrumentationLibraryName, il.Name))
138		if il.Version != "" {
139			tags = append(tags, getStringTag(keyInstrumentationLibraryVersion, il.Version))
140		}
141	}
142
143	if ss.SpanKind() != trace.SpanKindInternal {
144		tags = append(tags,
145			getStringTag(keySpanKind, ss.SpanKind().String()),
146		)
147	}
148
149	if ss.Status().Code != codes.Unset {
150		tags = append(tags, getInt64Tag(keyStatusCode, int64(ss.Status().Code)))
151		if ss.Status().Description != "" {
152			tags = append(tags, getStringTag(keyStatusMessage, ss.Status().Description))
153		}
154
155		if ss.Status().Code == codes.Error {
156			tags = append(tags, getBoolTag(keyError, true))
157		}
158	}
159
160	var logs []*gen.Log
161	for _, a := range ss.Events() {
162		nTags := len(a.Attributes)
163		if a.Name != "" {
164			nTags++
165		}
166		if a.DroppedAttributeCount != 0 {
167			nTags++
168		}
169		fields := make([]*gen.Tag, 0, nTags)
170		if a.Name != "" {
171			// If an event contains an attribute with the same key, it needs
172			// to be given precedence and overwrite this.
173			fields = append(fields, getStringTag(keyEventName, a.Name))
174		}
175		for _, kv := range a.Attributes {
176			tag := keyValueToTag(kv)
177			if tag != nil {
178				fields = append(fields, tag)
179			}
180		}
181		if a.DroppedAttributeCount != 0 {
182			fields = append(fields, getInt64Tag(keyDroppedAttributeCount, int64(a.DroppedAttributeCount)))
183		}
184		logs = append(logs, &gen.Log{
185			Timestamp: a.Time.UnixNano() / 1000,
186			Fields:    fields,
187		})
188	}
189
190	var refs []*gen.SpanRef
191	for _, link := range ss.Links() {
192		tid := link.SpanContext.TraceID()
193		sid := link.SpanContext.SpanID()
194		refs = append(refs, &gen.SpanRef{
195			TraceIdHigh: int64(binary.BigEndian.Uint64(tid[0:8])),
196			TraceIdLow:  int64(binary.BigEndian.Uint64(tid[8:16])),
197			SpanId:      int64(binary.BigEndian.Uint64(sid[:])),
198			RefType:     gen.SpanRefType_FOLLOWS_FROM,
199		})
200	}
201
202	tid := ss.SpanContext().TraceID()
203	sid := ss.SpanContext().SpanID()
204	psid := ss.Parent().SpanID()
205	return &gen.Span{
206		TraceIdHigh:   int64(binary.BigEndian.Uint64(tid[0:8])),
207		TraceIdLow:    int64(binary.BigEndian.Uint64(tid[8:16])),
208		SpanId:        int64(binary.BigEndian.Uint64(sid[:])),
209		ParentSpanId:  int64(binary.BigEndian.Uint64(psid[:])),
210		OperationName: ss.Name(), // TODO: if span kind is added then add prefix "Sent"/"Recv"
211		Flags:         int32(ss.SpanContext().TraceFlags()),
212		StartTime:     ss.StartTime().UnixNano() / 1000,
213		Duration:      ss.EndTime().Sub(ss.StartTime()).Nanoseconds() / 1000,
214		Tags:          tags,
215		Logs:          logs,
216		References:    refs,
217	}
218}
219
220func keyValueToTag(keyValue attribute.KeyValue) *gen.Tag {
221	var tag *gen.Tag
222	switch keyValue.Value.Type() {
223	case attribute.STRING:
224		s := keyValue.Value.AsString()
225		tag = &gen.Tag{
226			Key:   string(keyValue.Key),
227			VStr:  &s,
228			VType: gen.TagType_STRING,
229		}
230	case attribute.BOOL:
231		b := keyValue.Value.AsBool()
232		tag = &gen.Tag{
233			Key:   string(keyValue.Key),
234			VBool: &b,
235			VType: gen.TagType_BOOL,
236		}
237	case attribute.INT64:
238		i := keyValue.Value.AsInt64()
239		tag = &gen.Tag{
240			Key:   string(keyValue.Key),
241			VLong: &i,
242			VType: gen.TagType_LONG,
243		}
244	case attribute.FLOAT64:
245		f := keyValue.Value.AsFloat64()
246		tag = &gen.Tag{
247			Key:     string(keyValue.Key),
248			VDouble: &f,
249			VType:   gen.TagType_DOUBLE,
250		}
251	case attribute.BOOLSLICE,
252		attribute.INT64SLICE,
253		attribute.FLOAT64SLICE,
254		attribute.STRINGSLICE:
255		json, _ := json.Marshal(keyValue.Value.AsInterface())
256		a := (string)(json)
257		tag = &gen.Tag{
258			Key:   string(keyValue.Key),
259			VStr:  &a,
260			VType: gen.TagType_STRING,
261		}
262	}
263	return tag
264}
265
266func getInt64Tag(k string, i int64) *gen.Tag {
267	return &gen.Tag{
268		Key:   k,
269		VLong: &i,
270		VType: gen.TagType_LONG,
271	}
272}
273
274func getStringTag(k, s string) *gen.Tag {
275	return &gen.Tag{
276		Key:   k,
277		VStr:  &s,
278		VType: gen.TagType_STRING,
279	}
280}
281
282func getBoolTag(k string, b bool) *gen.Tag {
283	return &gen.Tag{
284		Key:   k,
285		VBool: &b,
286		VType: gen.TagType_BOOL,
287	}
288}
289
290// jaegerBatchList transforms a slice of spans into a slice of jaeger Batch.
291func jaegerBatchList(ssl []sdktrace.ReadOnlySpan, defaultServiceName string) []*gen.Batch {
292	if len(ssl) == 0 {
293		return nil
294	}
295
296	batchDict := make(map[attribute.Distinct]*gen.Batch)
297
298	for _, ss := range ssl {
299		if ss == nil {
300			continue
301		}
302
303		resourceKey := ss.Resource().Equivalent()
304		batch, bOK := batchDict[resourceKey]
305		if !bOK {
306			batch = &gen.Batch{
307				Process: process(ss.Resource(), defaultServiceName),
308				Spans:   []*gen.Span{},
309			}
310		}
311		batch.Spans = append(batch.Spans, spanToThrift(ss))
312		batchDict[resourceKey] = batch
313	}
314
315	// Transform the categorized map into a slice
316	batchList := make([]*gen.Batch, 0, len(batchDict))
317	for _, batch := range batchDict {
318		batchList = append(batchList, batch)
319	}
320	return batchList
321}
322
323// process transforms an OTel Resource into a jaeger Process.
324func process(res *resource.Resource, defaultServiceName string) *gen.Process {
325	var process gen.Process
326
327	var serviceName attribute.KeyValue
328	if res != nil {
329		for iter := res.Iter(); iter.Next(); {
330			if iter.Attribute().Key == semconv.ServiceNameKey {
331				serviceName = iter.Attribute()
332				// Don't convert service.name into tag.
333				continue
334			}
335			if tag := keyValueToTag(iter.Attribute()); tag != nil {
336				process.Tags = append(process.Tags, tag)
337			}
338		}
339	}
340
341	// If no service.name is contained in a Span's Resource,
342	// that field MUST be populated from the default Resource.
343	if serviceName.Value.AsString() == "" {
344		serviceName = semconv.ServiceNameKey.String(defaultServiceName)
345	}
346	process.ServiceName = serviceName.Value.AsString()
347
348	return &process
349}
350