1// Package lightstep implements the LightStep OpenTracing client for Go.
2package lightstep
3
4import (
5	"context"
6	"fmt"
7	"runtime"
8	"sync"
9	"time"
10
11	"github.com/opentracing/opentracing-go"
12)
13
14// Tracer extends the `opentracing.Tracer` interface with methods for manual
15// flushing and closing. To access these methods, you can take the global
16// tracer and typecast it to a `lightstep.Tracer`. As a convenience, the
17// lightstep package provides static functions which perform the typecasting.
18type Tracer interface {
19	opentracing.Tracer
20
21	// Close flushes and then terminates the LightStep collector
22	Close(context.Context)
23	// Flush sends all spans currently in the buffer to the LighStep collector
24	Flush(context.Context)
25	// Options gets the Options used in New() or NewWithOptions().
26	Options() Options
27	// Disable prevents the tracer from recording spans or flushing
28	Disable()
29}
30
31// Implements the `Tracer` interface. Buffers spans and forwards to a Lightstep collector.
32type tracerImpl struct {
33	//////////////////////////////////////////////////////////////
34	// IMMUTABLE IMMUTABLE IMMUTABLE IMMUTABLE IMMUTABLE IMMUTABLE
35	//////////////////////////////////////////////////////////////
36
37	// Note: there may be a desire to update some of these fields
38	// at runtime, in which case suitable changes may be needed
39	// for variables accessed during Flush.
40
41	reporterID uint64 // the LightStep tracer guid
42	opts       Options
43
44	// report loop management
45	closeOnce               sync.Once
46	closeReportLoopChannel  chan struct{}
47	reportLoopClosedChannel chan struct{}
48
49	converter   *protoConverter
50	accessToken string
51	attributes  map[string]string
52
53	//////////////////////////////////////////////////////////
54	// MUTABLE MUTABLE MUTABLE MUTABLE MUTABLE MUTABLE MUTABLE
55	//////////////////////////////////////////////////////////
56
57	// the following fields are modified under `lock`.
58	lock sync.Mutex
59
60	// Remote service that will receive reports.
61	client     collectorClient
62	connection Connection
63
64	// Two buffers of data.
65	buffer   reportBuffer
66	flushing reportBuffer
67
68	// Flush state.
69	flushingLock      sync.Mutex
70	reportInFlight    bool
71	lastReportAttempt time.Time
72
73	// Meta Event Reporting can be enabled at tracer creation or on-demand by satellite
74	metaEventReportingEnabled bool
75	// Set to true on first report
76	firstReportHasRun bool
77
78	// We allow our remote peer to disable this instrumentation at any
79	// time, turning all potentially costly runtime operations into
80	// no-ops.
81	//
82	// TODO this should use atomic load/store to test disabled
83	// prior to taking the lock, do please.
84	disabled bool
85
86	// Map of propagators used to determine the correct propagator to use
87	// based on the format passed into Inject/Extract. Supports one
88	// propagator for each of the formats: TextMap, HTTPHeaders, Binary
89	propagators map[opentracing.BuiltinFormat]Propagator
90}
91
92// NewTracer creates and starts a new Lightstep Tracer.
93// In case of error, we emit event and return nil.
94func NewTracer(opts Options) Tracer {
95	tr, err := CreateTracer(opts)
96	if err != nil  {
97		emitEvent(newEventStartError(err))
98		return nil
99	}
100	return tr
101}
102
103// CreateTracer creates and starts a new Lightstep Tracer.
104// It is meant to replace NewTracer which does not propagate the error.
105func CreateTracer(opts Options) (Tracer, error) {
106	if err := opts.Initialize(); err != nil {
107		return nil, fmt.Errorf("init; err: %v", err)
108	}
109
110	attributes := map[string]string{}
111	for k, v := range opts.Tags {
112		attributes[k] = fmt.Sprint(v)
113	}
114	// Don't let the GrpcOptions override these values. That would be confusing.
115	attributes[TracerPlatformKey] = TracerPlatformValue
116	attributes[TracerPlatformVersionKey] = runtime.Version()
117	attributes[TracerVersionKey] = TracerVersionValue
118
119	now := time.Now()
120	impl := &tracerImpl{
121		opts:                    opts,
122		reporterID:              genSeededGUID(),
123		buffer:                  newSpansBuffer(opts.MaxBufferedSpans),
124		flushing:                newSpansBuffer(opts.MaxBufferedSpans),
125		closeReportLoopChannel:  make(chan struct{}),
126		reportLoopClosedChannel: make(chan struct{}),
127		converter:               newProtoConverter(opts),
128		accessToken:             opts.AccessToken,
129		attributes:              attributes,
130	}
131
132	impl.buffer.setCurrent(now)
133
134	var err error
135	impl.client, err = newCollectorClient(opts)
136	if err != nil {
137		return nil, fmt.Errorf("create collector client; err: %v", err)
138	}
139
140	conn, err := impl.client.ConnectClient()
141	if err != nil {
142		return nil, err
143	}
144	impl.connection = conn
145
146	// set meta reporting to defined option
147	impl.metaEventReportingEnabled = opts.MetaEventReportingEnabled
148	impl.firstReportHasRun = false
149
150	go impl.reportLoop()
151
152	impl.propagators = map[opentracing.BuiltinFormat]Propagator{
153		opentracing.TextMap:     theLightStepPropagator,
154		opentracing.HTTPHeaders: theLightStepPropagator,
155		opentracing.Binary:      theBinaryPropagator,
156	}
157
158	if opts.Propagator == "b3" {
159		impl.propagators[opentracing.TextMap] = theB3Propagator
160		impl.propagators[opentracing.HTTPHeaders] = theB3Propagator
161	}
162
163	return impl, nil
164}
165
166func (tracer *tracerImpl) Options() Options {
167	return tracer.opts
168}
169
170func (tracer *tracerImpl) StartSpan(
171	operationName string,
172	sso ...opentracing.StartSpanOption,
173) opentracing.Span {
174	return newSpan(operationName, tracer, sso)
175}
176
177func (tracer *tracerImpl) Inject(sc opentracing.SpanContext, format interface{}, carrier interface{}) error {
178	if tracer.opts.MetaEventReportingEnabled {
179		opentracing.StartSpan(LSMetaEvent_InjectOperation,
180			opentracing.Tag{Key: LSMetaEvent_MetaEventKey, Value: true},
181			opentracing.Tag{Key: LSMetaEvent_TraceIdKey, Value: sc.(SpanContext).TraceID},
182			opentracing.Tag{Key: LSMetaEvent_SpanIdKey, Value: sc.(SpanContext).SpanID},
183			opentracing.Tag{Key: LSMetaEvent_PropagationFormatKey, Value: format}).
184			Finish()
185	}
186
187	builtin, ok := format.(opentracing.BuiltinFormat)
188	if !ok {
189		return opentracing.ErrUnsupportedFormat
190	}
191	return tracer.propagators[builtin].Inject(sc, carrier)
192}
193
194func (tracer *tracerImpl) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) {
195	if tracer.opts.MetaEventReportingEnabled {
196		opentracing.StartSpan(LSMetaEvent_ExtractOperation,
197			opentracing.Tag{Key: LSMetaEvent_MetaEventKey, Value: true},
198			opentracing.Tag{Key: LSMetaEvent_PropagationFormatKey, Value: format}).
199			Finish()
200	}
201	builtin, ok := format.(opentracing.BuiltinFormat)
202	if !ok {
203		return nil, opentracing.ErrUnsupportedFormat
204	}
205	return tracer.propagators[builtin].Extract(carrier)
206}
207
208func (tracer *tracerImpl) reconnectClient(now time.Time) {
209	conn, err := tracer.client.ConnectClient()
210	if err != nil {
211		emitEvent(newEventConnectionError(err))
212	} else {
213		tracer.lock.Lock()
214		oldConn := tracer.connection
215		tracer.connection = conn
216		tracer.lock.Unlock()
217
218		oldConn.Close()
219	}
220}
221
222// Close flushes and then terminates the LightStep collector. Close may only be
223// called once; subsequent calls to Close are no-ops.
224func (tracer *tracerImpl) Close(ctx context.Context) {
225	tracer.closeOnce.Do(func() {
226		// notify report loop that we are closing
227		close(tracer.closeReportLoopChannel)
228		select {
229		case <-tracer.reportLoopClosedChannel:
230			tracer.Flush(ctx)
231		case <-ctx.Done():
232			return
233		}
234
235		// now its safe to close the connection
236		tracer.lock.Lock()
237		conn := tracer.connection
238		tracer.connection = nil
239		tracer.lock.Unlock()
240
241		if conn != nil {
242			err := conn.Close()
243			if err != nil {
244				emitEvent(newEventConnectionError(err))
245			}
246		}
247	})
248}
249
250// RecordSpan records a finished Span.
251func (tracer *tracerImpl) RecordSpan(raw RawSpan) {
252	tracer.lock.Lock()
253
254	// Early-out for disabled runtimes
255	if tracer.disabled {
256		tracer.lock.Unlock()
257		return
258	}
259
260	tracer.buffer.addSpan(raw)
261	tracer.lock.Unlock()
262
263	if tracer.opts.Recorder != nil {
264		tracer.opts.Recorder.RecordSpan(raw)
265	}
266}
267
268// Flush sends all buffered data to the collector.
269func (tracer *tracerImpl) Flush(ctx context.Context) {
270	tracer.flushingLock.Lock()
271	defer tracer.flushingLock.Unlock()
272
273	if errorEvent := tracer.preFlush(); errorEvent != nil {
274		emitEvent(errorEvent)
275		return
276	}
277
278	if tracer.opts.MetaEventReportingEnabled && !tracer.firstReportHasRun {
279		opentracing.StartSpan(LSMetaEvent_TracerCreateOperation,
280			opentracing.Tag{Key: LSMetaEvent_MetaEventKey, Value: true},
281			opentracing.Tag{Key: LSMetaEvent_TracerGuidKey, Value: tracer.reporterID}).
282			Finish()
283		tracer.firstReportHasRun = true
284	}
285
286	ctx, cancel := context.WithTimeout(ctx, tracer.opts.ReportTimeout)
287	defer cancel()
288
289	protoReq := tracer.converter.toReportRequest(
290		tracer.reporterID,
291		tracer.attributes,
292		tracer.accessToken,
293		&tracer.flushing,
294	)
295	req, err := tracer.client.Translate(protoReq)
296	if err != nil {
297		errorEvent := newEventFlushError(err, FlushErrorTranslate)
298		emitEvent(errorEvent)
299		// call postflush to prevent the tracer from going into an invalid state.
300		emitEvent(tracer.postFlush(errorEvent))
301		return
302	}
303
304	var reportErrorEvent *eventFlushError
305	resp, err := tracer.client.Report(ctx, req)
306	if err != nil {
307		reportErrorEvent = newEventFlushError(err, FlushErrorTransport)
308	} else if len(resp.GetErrors()) > 0 {
309		reportErrorEvent = newEventFlushError(fmt.Errorf(resp.GetErrors()[0]), FlushErrorReport)
310	}
311
312	if reportErrorEvent != nil {
313		emitEvent(reportErrorEvent)
314	}
315	emitEvent(tracer.postFlush(reportErrorEvent))
316
317	if err == nil && resp.DevMode() {
318		tracer.metaEventReportingEnabled = true
319	}
320
321	if err == nil && !resp.DevMode() {
322		tracer.metaEventReportingEnabled = false
323	}
324
325	if err == nil && resp.Disable() {
326		tracer.Disable()
327	}
328}
329
330// preFlush handles lock-protected data manipulation before flushing
331func (tracer *tracerImpl) preFlush() *eventFlushError {
332	tracer.lock.Lock()
333	defer tracer.lock.Unlock()
334
335	if tracer.disabled {
336		return newEventFlushError(errFlushFailedTracerClosed, FlushErrorTracerDisabled)
337	}
338
339	if tracer.connection == nil {
340		return newEventFlushError(errFlushFailedTracerClosed, FlushErrorTracerClosed)
341	}
342
343	now := time.Now()
344	tracer.buffer, tracer.flushing = tracer.flushing, tracer.buffer
345	tracer.reportInFlight = true
346	tracer.flushing.setFlushing(now)
347	tracer.buffer.setCurrent(now)
348	tracer.lastReportAttempt = now
349	return nil
350}
351
352// postFlush handles lock-protected data manipulation after flushing
353func (tracer *tracerImpl) postFlush(flushEventError *eventFlushError) *eventStatusReport {
354	tracer.lock.Lock()
355	defer tracer.lock.Unlock()
356
357	tracer.reportInFlight = false
358
359	statusReportEvent := newEventStatusReport(
360		tracer.flushing.reportStart,
361		tracer.flushing.reportEnd,
362		len(tracer.flushing.rawSpans),
363		int(tracer.flushing.droppedSpanCount+tracer.buffer.droppedSpanCount),
364		int(tracer.flushing.logEncoderErrorCount+tracer.buffer.logEncoderErrorCount),
365	)
366
367	if flushEventError == nil {
368		tracer.flushing.clear()
369		return statusReportEvent
370	}
371
372	switch flushEventError.State() {
373	case FlushErrorTranslate:
374		// When there's a translation error, we do not want to retry.
375		tracer.flushing.clear()
376	default:
377		// Restore the records that did not get sent correctly
378		tracer.buffer.mergeFrom(&tracer.flushing)
379	}
380
381	statusReportEvent.SetSentSpans(0)
382
383	return statusReportEvent
384}
385
386func (tracer *tracerImpl) Disable() {
387	tracer.lock.Lock()
388	if tracer.disabled {
389		tracer.lock.Unlock()
390		return
391	}
392	tracer.disabled = true
393	tracer.buffer.clear()
394	tracer.lock.Unlock()
395
396	emitEvent(newEventTracerDisabled())
397}
398
399// Every MinReportingPeriod the reporting loop wakes up and checks to see if
400// either (a) the Runtime's max reporting period is about to expire (see
401// maxReportingPeriod()), (b) the number of buffered log records is
402// approaching kMaxBufferedLogs, or if (c) the number of buffered span records
403// is approaching kMaxBufferedSpans. If any of those conditions are true,
404// pending data is flushed to the remote peer. If not, the reporting loop waits
405// until the next cycle. See Runtime.maybeFlush() for details.
406//
407// This could alternatively be implemented using flush channels and so forth,
408// but that would introduce opportunities for client code to block on the
409// runtime library, and we want to avoid that at all costs (even dropping data,
410// which can certainly happen with high data rates and/or unresponsive remote
411// peers).
412
413func (tracer *tracerImpl) shouldFlushLocked(now time.Time) bool {
414	if now.Add(tracer.opts.MinReportingPeriod).Sub(tracer.lastReportAttempt) > tracer.opts.ReportingPeriod {
415		return true
416	} else if tracer.buffer.isHalfFull() {
417		return true
418	}
419	return false
420}
421
422func (tracer *tracerImpl) reportLoop() {
423	tickerChan := time.Tick(tracer.opts.MinReportingPeriod)
424	for {
425		select {
426		case <-tickerChan:
427			now := time.Now()
428
429			tracer.lock.Lock()
430			disabled := tracer.disabled
431			reconnect := !tracer.reportInFlight && tracer.client.ShouldReconnect()
432			shouldFlush := tracer.shouldFlushLocked(now)
433			tracer.lock.Unlock()
434
435			if disabled {
436				return
437			}
438			if shouldFlush {
439				tracer.Flush(context.Background())
440			}
441			if reconnect {
442				tracer.reconnectClient(now)
443			}
444		case <-tracer.closeReportLoopChannel:
445			close(tracer.reportLoopClosedChannel)
446			return
447		}
448	}
449}
450