1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger // import "go.opentelemetry.io/otel/exporters/trace/jaeger"
16
17import (
18	"context"
19	"encoding/binary"
20	"fmt"
21	"sync"
22
23	"google.golang.org/api/support/bundler"
24
25	"go.opentelemetry.io/otel"
26	"go.opentelemetry.io/otel/codes"
27	gen "go.opentelemetry.io/otel/exporters/trace/jaeger/internal/gen-go/jaeger"
28	"go.opentelemetry.io/otel/label"
29	export "go.opentelemetry.io/otel/sdk/export/trace"
30	sdktrace "go.opentelemetry.io/otel/sdk/trace"
31	"go.opentelemetry.io/otel/trace"
32)
33
34const (
35	defaultServiceName = "OpenTelemetry"
36
37	keyInstrumentationLibraryName    = "otel.instrumentation_library.name"
38	keyInstrumentationLibraryVersion = "otel.instrumentation_library.version"
39)
40
41type Option func(*options)
42
43// options are the options to be used when initializing a Jaeger export.
44type options struct {
45	// Process contains the information about the exporting process.
46	Process Process
47
48	// BufferMaxCount defines the total number of traces that can be buffered in memory
49	BufferMaxCount int
50
51	// BatchMaxCount defines the maximum number of spans sent in one batch
52	BatchMaxCount int
53
54	Config *sdktrace.Config
55
56	Disabled bool
57}
58
59// WithProcess sets the process with the information about the exporting process.
60func WithProcess(process Process) Option {
61	return func(o *options) {
62		o.Process = process
63	}
64}
65
66// WithBufferMaxCount defines the total number of traces that can be buffered in memory
67func WithBufferMaxCount(bufferMaxCount int) Option {
68	return func(o *options) {
69		o.BufferMaxCount = bufferMaxCount
70	}
71}
72
73// WithBatchMaxCount defines the maximum number of spans in one batch
74func WithBatchMaxCount(batchMaxCount int) Option {
75	return func(o *options) {
76		o.BatchMaxCount = batchMaxCount
77	}
78}
79
80// WithSDK sets the SDK config for the exporter pipeline.
81func WithSDK(config *sdktrace.Config) Option {
82	return func(o *options) {
83		o.Config = config
84	}
85}
86
87// WithDisabled option will cause pipeline methods to use
88// a no-op provider
89func WithDisabled(disabled bool) Option {
90	return func(o *options) {
91		o.Disabled = disabled
92	}
93}
94
95// NewRawExporter returns an OTel Exporter implementation that exports the
96// collected spans to Jaeger.
97//
98// It will IGNORE Disabled option.
99func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) {
100	uploader, err := endpointOption()
101	if err != nil {
102		return nil, err
103	}
104
105	o := options{}
106	opts = append(opts, WithProcessFromEnv())
107	for _, opt := range opts {
108		opt(&o)
109	}
110
111	service := o.Process.ServiceName
112	if service == "" {
113		service = defaultServiceName
114	}
115	tags := make([]*gen.Tag, 0, len(o.Process.Tags))
116	for _, tag := range o.Process.Tags {
117		t := keyValueToTag(tag)
118		if t != nil {
119			tags = append(tags, t)
120		}
121	}
122	e := &Exporter{
123		uploader: uploader,
124		process: &gen.Process{
125			ServiceName: service,
126			Tags:        tags,
127		},
128		o: o,
129	}
130	bundler := bundler.NewBundler((*gen.Span)(nil), func(bundle interface{}) {
131		if err := e.upload(bundle.([]*gen.Span)); err != nil {
132			otel.Handle(err)
133		}
134	})
135
136	// Set BufferedByteLimit with the total number of spans that are permissible to be held in memory.
137	// This needs to be done since the size of messages is always set to 1. Failing to set this would allow
138	// 1G messages to be held in memory since that is the default value of BufferedByteLimit.
139	if o.BufferMaxCount != 0 {
140		bundler.BufferedByteLimit = o.BufferMaxCount
141	}
142
143	// The default value bundler uses is 10, increase to send larger batches
144	if o.BatchMaxCount != 0 {
145		bundler.BundleCountThreshold = o.BatchMaxCount
146	}
147
148	e.bundler = bundler
149	return e, nil
150}
151
152// NewExportPipeline sets up a complete export pipeline
153// with the recommended setup for trace provider
154func NewExportPipeline(endpointOption EndpointOption, opts ...Option) (trace.TracerProvider, func(), error) {
155	o := options{}
156	opts = append(opts, WithDisabledFromEnv())
157	for _, opt := range opts {
158		opt(&o)
159	}
160	if o.Disabled {
161		return trace.NewNoopTracerProvider(), func() {}, nil
162	}
163
164	exporter, err := NewRawExporter(endpointOption, opts...)
165	if err != nil {
166		return nil, nil, err
167	}
168
169	pOpts := []sdktrace.TracerProviderOption{sdktrace.WithSyncer(exporter)}
170	if exporter.o.Config != nil {
171		pOpts = append(pOpts, sdktrace.WithConfig(*exporter.o.Config))
172	}
173	tp := sdktrace.NewTracerProvider(pOpts...)
174	return tp, exporter.Flush, nil
175}
176
177// InstallNewPipeline instantiates a NewExportPipeline with the
178// recommended configuration and registers it globally.
179func InstallNewPipeline(endpointOption EndpointOption, opts ...Option) (func(), error) {
180	tp, flushFn, err := NewExportPipeline(endpointOption, opts...)
181	if err != nil {
182		return nil, err
183	}
184
185	otel.SetTracerProvider(tp)
186	return flushFn, nil
187}
188
189// Process contains the information exported to jaeger about the source
190// of the trace data.
191type Process struct {
192	// ServiceName is the Jaeger service name.
193	ServiceName string
194
195	// Tags are added to Jaeger Process exports
196	Tags []label.KeyValue
197}
198
199// Exporter is an implementation of an OTel SpanSyncer that uploads spans to
200// Jaeger.
201type Exporter struct {
202	process  *gen.Process
203	bundler  *bundler.Bundler
204	uploader batchUploader
205	o        options
206
207	stoppedMu sync.RWMutex
208	stopped   bool
209}
210
211var _ export.SpanExporter = (*Exporter)(nil)
212
213// ExportSpans exports SpanData to Jaeger.
214func (e *Exporter) ExportSpans(ctx context.Context, spans []*export.SpanData) error {
215	e.stoppedMu.RLock()
216	stopped := e.stopped
217	e.stoppedMu.RUnlock()
218	if stopped {
219		return nil
220	}
221
222	for _, span := range spans {
223		// TODO(jbd): Handle oversized bundlers.
224		err := e.bundler.Add(spanDataToThrift(span), 1)
225		if err != nil {
226			return fmt.Errorf("failed to bundle %q: %w", span.Name, err)
227		}
228	}
229	return nil
230}
231
232// flush is used to wrap the bundler's Flush method for testing.
233var flush = func(e *Exporter) {
234	e.bundler.Flush()
235}
236
237// Shutdown stops the exporter flushing any pending exports.
238func (e *Exporter) Shutdown(ctx context.Context) error {
239	e.stoppedMu.Lock()
240	e.stopped = true
241	e.stoppedMu.Unlock()
242
243	done := make(chan struct{}, 1)
244	// Shadow so if the goroutine is leaked in testing it doesn't cause a race
245	// condition when the file level var is reset.
246	go func(FlushFunc func(*Exporter)) {
247		// The OpenTelemetry specification is explicit in not having this
248		// method block so the preference here is to orphan this goroutine if
249		// the context is canceled or times out while this flushing is
250		// occurring. This is a consequence of the bundler Flush method not
251		// supporting a context.
252		FlushFunc(e)
253		done <- struct{}{}
254	}(flush)
255	select {
256	case <-ctx.Done():
257		return ctx.Err()
258	case <-done:
259	}
260	return nil
261}
262
263func spanDataToThrift(data *export.SpanData) *gen.Span {
264	tags := make([]*gen.Tag, 0, len(data.Attributes))
265	for _, kv := range data.Attributes {
266		tag := keyValueToTag(kv)
267		if tag != nil {
268			tags = append(tags, tag)
269		}
270	}
271
272	// TODO (jmacd): OTel has a broad "last value wins"
273	// semantic. Should resources be appended before span
274	// attributes, above, to allow span attributes to
275	// overwrite resource attributes?
276	if data.Resource != nil {
277		for iter := data.Resource.Iter(); iter.Next(); {
278			if tag := keyValueToTag(iter.Attribute()); tag != nil {
279				tags = append(tags, tag)
280			}
281		}
282	}
283	if il := data.InstrumentationLibrary; il.Name != "" {
284		tags = append(tags, getStringTag(keyInstrumentationLibraryName, il.Name))
285		if il.Version != "" {
286			tags = append(tags, getStringTag(keyInstrumentationLibraryVersion, il.Version))
287		}
288	}
289
290	tags = append(tags,
291		getInt64Tag("status.code", int64(data.StatusCode)),
292		getStringTag("status.message", data.StatusMessage),
293		getStringTag("span.kind", data.SpanKind.String()),
294	)
295
296	// Ensure that if Status.Code is not OK, that we set the "error" tag on the Jaeger span.
297	// See Issue https://github.com/census-instrumentation/opencensus-go/issues/1041
298	if data.StatusCode != codes.Ok && data.StatusCode != codes.Unset {
299		tags = append(tags, getBoolTag("error", true))
300	}
301
302	var logs []*gen.Log
303	for _, a := range data.MessageEvents {
304		fields := make([]*gen.Tag, 0, len(a.Attributes))
305		for _, kv := range a.Attributes {
306			tag := keyValueToTag(kv)
307			if tag != nil {
308				fields = append(fields, tag)
309			}
310		}
311		fields = append(fields, getStringTag("name", a.Name))
312		logs = append(logs, &gen.Log{
313			Timestamp: a.Time.UnixNano() / 1000,
314			Fields:    fields,
315		})
316	}
317
318	var refs []*gen.SpanRef
319	for _, link := range data.Links {
320		refs = append(refs, &gen.SpanRef{
321			TraceIdHigh: int64(binary.BigEndian.Uint64(link.TraceID[0:8])),
322			TraceIdLow:  int64(binary.BigEndian.Uint64(link.TraceID[8:16])),
323			SpanId:      int64(binary.BigEndian.Uint64(link.SpanID[:])),
324			// TODO(paivagustavo): properly set the reference type when specs are defined
325			//  see https://github.com/open-telemetry/opentelemetry-specification/issues/65
326			RefType: gen.SpanRefType_CHILD_OF,
327		})
328	}
329
330	return &gen.Span{
331		TraceIdHigh:   int64(binary.BigEndian.Uint64(data.SpanContext.TraceID[0:8])),
332		TraceIdLow:    int64(binary.BigEndian.Uint64(data.SpanContext.TraceID[8:16])),
333		SpanId:        int64(binary.BigEndian.Uint64(data.SpanContext.SpanID[:])),
334		ParentSpanId:  int64(binary.BigEndian.Uint64(data.ParentSpanID[:])),
335		OperationName: data.Name, // TODO: if span kind is added then add prefix "Sent"/"Recv"
336		Flags:         int32(data.SpanContext.TraceFlags),
337		StartTime:     data.StartTime.UnixNano() / 1000,
338		Duration:      data.EndTime.Sub(data.StartTime).Nanoseconds() / 1000,
339		Tags:          tags,
340		Logs:          logs,
341		References:    refs,
342	}
343}
344
345func keyValueToTag(keyValue label.KeyValue) *gen.Tag {
346	var tag *gen.Tag
347	switch keyValue.Value.Type() {
348	case label.STRING:
349		s := keyValue.Value.AsString()
350		tag = &gen.Tag{
351			Key:   string(keyValue.Key),
352			VStr:  &s,
353			VType: gen.TagType_STRING,
354		}
355	case label.BOOL:
356		b := keyValue.Value.AsBool()
357		tag = &gen.Tag{
358			Key:   string(keyValue.Key),
359			VBool: &b,
360			VType: gen.TagType_BOOL,
361		}
362	case label.INT32:
363		i := int64(keyValue.Value.AsInt32())
364		tag = &gen.Tag{
365			Key:   string(keyValue.Key),
366			VLong: &i,
367			VType: gen.TagType_LONG,
368		}
369	case label.INT64:
370		i := keyValue.Value.AsInt64()
371		tag = &gen.Tag{
372			Key:   string(keyValue.Key),
373			VLong: &i,
374			VType: gen.TagType_LONG,
375		}
376	case label.UINT32:
377		i := int64(keyValue.Value.AsUint32())
378		tag = &gen.Tag{
379			Key:   string(keyValue.Key),
380			VLong: &i,
381			VType: gen.TagType_LONG,
382		}
383	case label.UINT64:
384		// we'll ignore the value if it overflows
385		if i := int64(keyValue.Value.AsUint64()); i >= 0 {
386			tag = &gen.Tag{
387				Key:   string(keyValue.Key),
388				VLong: &i,
389				VType: gen.TagType_LONG,
390			}
391		}
392	case label.FLOAT32:
393		f := float64(keyValue.Value.AsFloat32())
394		tag = &gen.Tag{
395			Key:     string(keyValue.Key),
396			VDouble: &f,
397			VType:   gen.TagType_DOUBLE,
398		}
399	case label.FLOAT64:
400		f := keyValue.Value.AsFloat64()
401		tag = &gen.Tag{
402			Key:     string(keyValue.Key),
403			VDouble: &f,
404			VType:   gen.TagType_DOUBLE,
405		}
406	}
407	return tag
408}
409
410func getInt64Tag(k string, i int64) *gen.Tag {
411	return &gen.Tag{
412		Key:   k,
413		VLong: &i,
414		VType: gen.TagType_LONG,
415	}
416}
417
418func getStringTag(k, s string) *gen.Tag {
419	return &gen.Tag{
420		Key:   k,
421		VStr:  &s,
422		VType: gen.TagType_STRING,
423	}
424}
425
426func getBoolTag(k string, b bool) *gen.Tag {
427	return &gen.Tag{
428		Key:   k,
429		VBool: &b,
430		VType: gen.TagType_BOOL,
431	}
432}
433
434// Flush waits for exported trace spans to be uploaded.
435//
436// This is useful if your program is ending and you do not want to lose recent spans.
437func (e *Exporter) Flush() {
438	flush(e)
439}
440
441func (e *Exporter) upload(spans []*gen.Span) error {
442	batch := &gen.Batch{
443		Spans:   spans,
444		Process: e.process,
445	}
446
447	return e.uploader.upload(batch)
448}
449