1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger // import "go.opentelemetry.io/otel/exporters/trace/jaeger"
16
17import (
18	"context"
19	"encoding/binary"
20	"encoding/json"
21	"fmt"
22	"sync"
23
24	"go.opentelemetry.io/otel"
25	"go.opentelemetry.io/otel/attribute"
26	"go.opentelemetry.io/otel/codes"
27	gen "go.opentelemetry.io/otel/exporters/trace/jaeger/internal/gen-go/jaeger"
28	"go.opentelemetry.io/otel/sdk/resource"
29	sdktrace "go.opentelemetry.io/otel/sdk/trace"
30	"go.opentelemetry.io/otel/semconv"
31	"go.opentelemetry.io/otel/trace"
32)
33
34const (
35	keyInstrumentationLibraryName    = "otel.library.name"
36	keyInstrumentationLibraryVersion = "otel.library.version"
37	keyError                         = "error"
38	keySpanKind                      = "span.kind"
39	keyStatusCode                    = "otel.status_code"
40	keyStatusMessage                 = "otel.status_description"
41	keyDroppedAttributeCount         = "otel.event.dropped_attributes_count"
42	keyEventName                     = "event"
43)
44
45// NewRawExporter returns an OTel Exporter implementation that exports the
46// collected spans to Jaeger.
47func NewRawExporter(endpointOption EndpointOption) (*Exporter, error) {
48	uploader, err := endpointOption()
49	if err != nil {
50		return nil, err
51	}
52
53	// Fetch default service.name from default resource for backup
54	var defaultServiceName string
55	defaultResource := resource.Default()
56	if value, exists := defaultResource.Set().Value(semconv.ServiceNameKey); exists {
57		defaultServiceName = value.AsString()
58	}
59	if defaultServiceName == "" {
60		return nil, fmt.Errorf("failed to get service name from default resource")
61	}
62
63	stopCh := make(chan struct{})
64	e := &Exporter{
65		uploader:           uploader,
66		stopCh:             stopCh,
67		defaultServiceName: defaultServiceName,
68	}
69	return e, nil
70}
71
72// NewExportPipeline sets up a complete export pipeline
73// with the recommended setup for trace provider
74func NewExportPipeline(endpointOption EndpointOption) (*sdktrace.TracerProvider, error) {
75	exporter, err := NewRawExporter(endpointOption)
76	if err != nil {
77		return nil, err
78	}
79
80	tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter))
81	return tp, nil
82}
83
84// InstallNewPipeline instantiates a NewExportPipeline with the
85// recommended configuration and registers it globally.
86func InstallNewPipeline(endpointOption EndpointOption) (*sdktrace.TracerProvider, error) {
87	tp, err := NewExportPipeline(endpointOption)
88	if err != nil {
89		return tp, err
90	}
91
92	otel.SetTracerProvider(tp)
93	return tp, nil
94}
95
96// Exporter exports OpenTelemetry spans to a Jaeger agent or collector.
97type Exporter struct {
98	uploader           batchUploader
99	stopOnce           sync.Once
100	stopCh             chan struct{}
101	defaultServiceName string
102}
103
104var _ sdktrace.SpanExporter = (*Exporter)(nil)
105
106// ExportSpans transforms and exports OpenTelemetry spans to Jaeger.
107func (e *Exporter) ExportSpans(ctx context.Context, spans []*sdktrace.SpanSnapshot) error {
108	// Return fast if context is already canceled or Exporter shutdown.
109	select {
110	case <-ctx.Done():
111		return ctx.Err()
112	case <-e.stopCh:
113		return nil
114	default:
115	}
116
117	// Cancel export if Exporter is shutdown.
118	var cancel context.CancelFunc
119	ctx, cancel = context.WithCancel(ctx)
120	defer cancel()
121	go func(ctx context.Context, cancel context.CancelFunc) {
122		select {
123		case <-ctx.Done():
124		case <-e.stopCh:
125			cancel()
126		}
127	}(ctx, cancel)
128
129	for _, batch := range jaegerBatchList(spans, e.defaultServiceName) {
130		if err := e.uploader.upload(ctx, batch); err != nil {
131			return err
132		}
133	}
134
135	return nil
136}
137
138// Shutdown stops the Exporter. This will close all connections and release
139// all resources held by the Exporter.
140func (e *Exporter) Shutdown(ctx context.Context) error {
141	// Stop any active and subsequent exports.
142	e.stopOnce.Do(func() { close(e.stopCh) })
143	select {
144	case <-ctx.Done():
145		return ctx.Err()
146	default:
147	}
148	return e.uploader.shutdown(ctx)
149}
150
151func spanSnapshotToThrift(ss *sdktrace.SpanSnapshot) *gen.Span {
152	tags := make([]*gen.Tag, 0, len(ss.Attributes))
153	for _, kv := range ss.Attributes {
154		tag := keyValueToTag(kv)
155		if tag != nil {
156			tags = append(tags, tag)
157		}
158	}
159
160	if il := ss.InstrumentationLibrary; il.Name != "" {
161		tags = append(tags, getStringTag(keyInstrumentationLibraryName, il.Name))
162		if il.Version != "" {
163			tags = append(tags, getStringTag(keyInstrumentationLibraryVersion, il.Version))
164		}
165	}
166
167	if ss.SpanKind != trace.SpanKindInternal {
168		tags = append(tags,
169			getStringTag(keySpanKind, ss.SpanKind.String()),
170		)
171	}
172
173	if ss.StatusCode != codes.Unset {
174		tags = append(tags, getInt64Tag(keyStatusCode, int64(ss.StatusCode)))
175		if ss.StatusMessage != "" {
176			tags = append(tags, getStringTag(keyStatusMessage, ss.StatusMessage))
177		}
178
179		if ss.StatusCode == codes.Error {
180			tags = append(tags, getBoolTag(keyError, true))
181		}
182	}
183
184	var logs []*gen.Log
185	for _, a := range ss.MessageEvents {
186		nTags := len(a.Attributes)
187		if a.Name != "" {
188			nTags++
189		}
190		if a.DroppedAttributeCount != 0 {
191			nTags++
192		}
193		fields := make([]*gen.Tag, 0, nTags)
194		if a.Name != "" {
195			// If an event contains an attribute with the same key, it needs
196			// to be given precedence and overwrite this.
197			fields = append(fields, getStringTag(keyEventName, a.Name))
198		}
199		for _, kv := range a.Attributes {
200			tag := keyValueToTag(kv)
201			if tag != nil {
202				fields = append(fields, tag)
203			}
204		}
205		if a.DroppedAttributeCount != 0 {
206			fields = append(fields, getInt64Tag(keyDroppedAttributeCount, int64(a.DroppedAttributeCount)))
207		}
208		logs = append(logs, &gen.Log{
209			Timestamp: a.Time.UnixNano() / 1000,
210			Fields:    fields,
211		})
212	}
213
214	var refs []*gen.SpanRef
215	for _, link := range ss.Links {
216		tid := link.TraceID()
217		sid := link.SpanID()
218		refs = append(refs, &gen.SpanRef{
219			TraceIdHigh: int64(binary.BigEndian.Uint64(tid[0:8])),
220			TraceIdLow:  int64(binary.BigEndian.Uint64(tid[8:16])),
221			SpanId:      int64(binary.BigEndian.Uint64(sid[:])),
222			RefType:     gen.SpanRefType_FOLLOWS_FROM,
223		})
224	}
225
226	tid := ss.SpanContext.TraceID()
227	sid := ss.SpanContext.SpanID()
228	psid := ss.Parent.SpanID()
229	return &gen.Span{
230		TraceIdHigh:   int64(binary.BigEndian.Uint64(tid[0:8])),
231		TraceIdLow:    int64(binary.BigEndian.Uint64(tid[8:16])),
232		SpanId:        int64(binary.BigEndian.Uint64(sid[:])),
233		ParentSpanId:  int64(binary.BigEndian.Uint64(psid[:])),
234		OperationName: ss.Name, // TODO: if span kind is added then add prefix "Sent"/"Recv"
235		Flags:         int32(ss.SpanContext.TraceFlags()),
236		StartTime:     ss.StartTime.UnixNano() / 1000,
237		Duration:      ss.EndTime.Sub(ss.StartTime).Nanoseconds() / 1000,
238		Tags:          tags,
239		Logs:          logs,
240		References:    refs,
241	}
242}
243
244func keyValueToTag(keyValue attribute.KeyValue) *gen.Tag {
245	var tag *gen.Tag
246	switch keyValue.Value.Type() {
247	case attribute.STRING:
248		s := keyValue.Value.AsString()
249		tag = &gen.Tag{
250			Key:   string(keyValue.Key),
251			VStr:  &s,
252			VType: gen.TagType_STRING,
253		}
254	case attribute.BOOL:
255		b := keyValue.Value.AsBool()
256		tag = &gen.Tag{
257			Key:   string(keyValue.Key),
258			VBool: &b,
259			VType: gen.TagType_BOOL,
260		}
261	case attribute.INT64:
262		i := keyValue.Value.AsInt64()
263		tag = &gen.Tag{
264			Key:   string(keyValue.Key),
265			VLong: &i,
266			VType: gen.TagType_LONG,
267		}
268	case attribute.FLOAT64:
269		f := keyValue.Value.AsFloat64()
270		tag = &gen.Tag{
271			Key:     string(keyValue.Key),
272			VDouble: &f,
273			VType:   gen.TagType_DOUBLE,
274		}
275	case attribute.ARRAY:
276		json, _ := json.Marshal(keyValue.Value.AsArray())
277		a := (string)(json)
278		tag = &gen.Tag{
279			Key:   string(keyValue.Key),
280			VStr:  &a,
281			VType: gen.TagType_STRING,
282		}
283	}
284	return tag
285}
286
287func getInt64Tag(k string, i int64) *gen.Tag {
288	return &gen.Tag{
289		Key:   k,
290		VLong: &i,
291		VType: gen.TagType_LONG,
292	}
293}
294
295func getStringTag(k, s string) *gen.Tag {
296	return &gen.Tag{
297		Key:   k,
298		VStr:  &s,
299		VType: gen.TagType_STRING,
300	}
301}
302
303func getBoolTag(k string, b bool) *gen.Tag {
304	return &gen.Tag{
305		Key:   k,
306		VBool: &b,
307		VType: gen.TagType_BOOL,
308	}
309}
310
311// jaegerBatchList transforms a slice of SpanSnapshot into a slice of jaeger
312// Batch.
313func jaegerBatchList(ssl []*sdktrace.SpanSnapshot, defaultServiceName string) []*gen.Batch {
314	if len(ssl) == 0 {
315		return nil
316	}
317
318	batchDict := make(map[attribute.Distinct]*gen.Batch)
319
320	for _, ss := range ssl {
321		if ss == nil {
322			continue
323		}
324
325		resourceKey := ss.Resource.Equivalent()
326		batch, bOK := batchDict[resourceKey]
327		if !bOK {
328			batch = &gen.Batch{
329				Process: process(ss.Resource, defaultServiceName),
330				Spans:   []*gen.Span{},
331			}
332		}
333		batch.Spans = append(batch.Spans, spanSnapshotToThrift(ss))
334		batchDict[resourceKey] = batch
335	}
336
337	// Transform the categorized map into a slice
338	batchList := make([]*gen.Batch, 0, len(batchDict))
339	for _, batch := range batchDict {
340		batchList = append(batchList, batch)
341	}
342	return batchList
343}
344
345// process transforms an OTel Resource into a jaeger Process.
346func process(res *resource.Resource, defaultServiceName string) *gen.Process {
347	var process gen.Process
348
349	var serviceName attribute.KeyValue
350	if res != nil {
351		for iter := res.Iter(); iter.Next(); {
352			if iter.Attribute().Key == semconv.ServiceNameKey {
353				serviceName = iter.Attribute()
354				// Don't convert service.name into tag.
355				continue
356			}
357			if tag := keyValueToTag(iter.Attribute()); tag != nil {
358				process.Tags = append(process.Tags, tag)
359			}
360		}
361	}
362
363	// If no service.name is contained in a Span's Resource,
364	// that field MUST be populated from the default Resource.
365	if serviceName.Value.AsString() == "" {
366		serviceName = semconv.ServiceNameKey.String(defaultServiceName)
367	}
368	process.ServiceName = serviceName.Value.AsString()
369
370	return &process
371}
372