1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package otlp // import "go.opentelemetry.io/otel/exporters/otlp"
16
17// This code was based on
18// contrib.go.opencensus.io/exporter/ocagent/connection.go
19
20import (
21	"context"
22	"errors"
23	"fmt"
24	"sync"
25	"unsafe"
26
27	"google.golang.org/grpc"
28	"google.golang.org/grpc/metadata"
29
30	colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
31	coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
32	"go.opentelemetry.io/otel/exporters/otlp/internal/transform"
33	"go.opentelemetry.io/otel/metric"
34	metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
35	"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
36	tracesdk "go.opentelemetry.io/otel/sdk/export/trace"
37)
38
39// Exporter is an OpenTelemetry exporter. It exports both traces and metrics
40// from OpenTelemetry instrumented to code using OpenTelemetry protocol
41// buffers to a configurable receiver.
42type Exporter struct {
43	// mu protects the non-atomic and non-channel variables
44	mu sync.RWMutex
45	// senderMu protects the concurrent unsafe sends on the shared gRPC client connection.
46	senderMu          sync.Mutex
47	started           bool
48	traceExporter     coltracepb.TraceServiceClient
49	metricExporter    colmetricpb.MetricsServiceClient
50	grpcClientConn    *grpc.ClientConn
51	lastConnectErrPtr unsafe.Pointer
52
53	startOnce      sync.Once
54	stopOnce       sync.Once
55	stopCh         chan struct{}
56	disconnectedCh chan bool
57
58	backgroundConnectionDoneCh chan bool
59
60	c        config
61	metadata metadata.MD
62}
63
64var _ tracesdk.SpanExporter = (*Exporter)(nil)
65var _ metricsdk.Exporter = (*Exporter)(nil)
66
67// newConfig initializes a config struct with default values and applies
68// any ExporterOptions provided.
69func newConfig(opts ...ExporterOption) config {
70	cfg := config{
71		grpcServiceConfig: DefaultGRPCServiceConfig,
72
73		// Note: the default ExportKindSelector is specified
74		// as Cumulative:
75		// https://github.com/open-telemetry/opentelemetry-specification/issues/731
76		exportKindSelector: metricsdk.CumulativeExportKindSelector(),
77	}
78	for _, opt := range opts {
79		opt(&cfg)
80	}
81	return cfg
82}
83
84// NewExporter constructs a new Exporter and starts it.
85func NewExporter(opts ...ExporterOption) (*Exporter, error) {
86	exp := NewUnstartedExporter(opts...)
87	if err := exp.Start(); err != nil {
88		return nil, err
89	}
90	return exp, nil
91}
92
93// NewUnstartedExporter constructs a new Exporter and does not start it.
94func NewUnstartedExporter(opts ...ExporterOption) *Exporter {
95	e := new(Exporter)
96	e.c = newConfig(opts...)
97	if len(e.c.headers) > 0 {
98		e.metadata = metadata.New(e.c.headers)
99	}
100	return e
101}
102
103var (
104	errAlreadyStarted  = errors.New("already started")
105	errNotStarted      = errors.New("not started")
106	errDisconnected    = errors.New("exporter disconnected")
107	errStopped         = errors.New("exporter stopped")
108	errContextCanceled = errors.New("context canceled")
109)
110
111// Start dials to the collector, establishing a connection to it. It also
112// initiates the Config and Trace services by sending over the initial
113// messages that consist of the node identifier. Start invokes a background
114// connector that will reattempt connections to the collector periodically
115// if the connection dies.
116func (e *Exporter) Start() error {
117	var err = errAlreadyStarted
118	e.startOnce.Do(func() {
119		e.mu.Lock()
120		e.started = true
121		e.disconnectedCh = make(chan bool, 1)
122		e.stopCh = make(chan struct{})
123		e.backgroundConnectionDoneCh = make(chan bool)
124		e.mu.Unlock()
125
126		// An optimistic first connection attempt to ensure that
127		// applications under heavy load can immediately process
128		// data. See https://github.com/census-ecosystem/opencensus-go-exporter-ocagent/pull/63
129		if err := e.connect(); err == nil {
130			e.setStateConnected()
131		} else {
132			e.setStateDisconnected(err)
133		}
134		go e.indefiniteBackgroundConnection()
135
136		err = nil
137	})
138
139	return err
140}
141
142func (e *Exporter) prepareCollectorAddress() string {
143	if e.c.collectorAddr != "" {
144		return e.c.collectorAddr
145	}
146	return fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort)
147}
148
149func (e *Exporter) enableConnections(cc *grpc.ClientConn) error {
150	e.mu.RLock()
151	started := e.started
152	e.mu.RUnlock()
153
154	if !started {
155		return errNotStarted
156	}
157
158	e.mu.Lock()
159	// If previous clientConn is same as the current then just return.
160	// This doesn't happen right now as this func is only called with new ClientConn.
161	// It is more about future-proofing.
162	if e.grpcClientConn == cc {
163		e.mu.Unlock()
164		return nil
165	}
166	// If the previous clientConn was non-nil, close it
167	if e.grpcClientConn != nil {
168		_ = e.grpcClientConn.Close()
169	}
170	e.grpcClientConn = cc
171	e.traceExporter = coltracepb.NewTraceServiceClient(cc)
172	e.metricExporter = colmetricpb.NewMetricsServiceClient(cc)
173	e.mu.Unlock()
174
175	return nil
176}
177
178func (e *Exporter) contextWithMetadata(ctx context.Context) context.Context {
179	if e.metadata.Len() > 0 {
180		return metadata.NewOutgoingContext(ctx, e.metadata)
181	}
182	return ctx
183}
184
185func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) {
186	addr := e.prepareCollectorAddress()
187
188	dialOpts := []grpc.DialOption{}
189	if e.c.grpcServiceConfig != "" {
190		dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(e.c.grpcServiceConfig))
191	}
192	if e.c.clientCredentials != nil {
193		dialOpts = append(dialOpts, grpc.WithTransportCredentials(e.c.clientCredentials))
194	} else if e.c.canDialInsecure {
195		dialOpts = append(dialOpts, grpc.WithInsecure())
196	}
197	if e.c.compressor != "" {
198		dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(e.c.compressor)))
199	}
200	if len(e.c.grpcDialOptions) != 0 {
201		dialOpts = append(dialOpts, e.c.grpcDialOptions...)
202	}
203
204	ctx := e.contextWithMetadata(context.Background())
205	return grpc.DialContext(ctx, addr, dialOpts...)
206}
207
208// closeStopCh is used to wrap the exporters stopCh channel closing for testing.
209var closeStopCh = func(stopCh chan struct{}) {
210	close(stopCh)
211}
212
213// Shutdown closes all connections and releases resources currently being used
214// by the exporter. If the exporter is not started this does nothing.
215func (e *Exporter) Shutdown(ctx context.Context) error {
216	e.mu.RLock()
217	cc := e.grpcClientConn
218	started := e.started
219	e.mu.RUnlock()
220
221	if !started {
222		return nil
223	}
224
225	var err error
226
227	e.stopOnce.Do(func() {
228		if cc != nil {
229			// Clean things up before checking this error.
230			err = cc.Close()
231		}
232
233		// At this point we can change the state variable started
234		e.mu.Lock()
235		e.started = false
236		e.mu.Unlock()
237		closeStopCh(e.stopCh)
238
239		// Ensure that the backgroundConnector returns
240		select {
241		case <-e.backgroundConnectionDoneCh:
242		case <-ctx.Done():
243			err = ctx.Err()
244		}
245	})
246
247	return err
248}
249
250// Export implements the "go.opentelemetry.io/otel/sdk/export/metric".Exporter
251// interface. It transforms and batches metric Records into OTLP Metrics and
252// transmits them to the configured collector.
253func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error {
254	// Unify the parent context Done signal with the exporter stopCh.
255	ctx, cancel := context.WithCancel(parent)
256	defer cancel()
257	go func(ctx context.Context, cancel context.CancelFunc) {
258		select {
259		case <-ctx.Done():
260		case <-e.stopCh:
261			cancel()
262		}
263	}(ctx, cancel)
264
265	// Hardcode the number of worker goroutines to 1. We later will
266	// need to see if there's a way to adjust that number for longer
267	// running operations.
268	rms, err := transform.CheckpointSet(ctx, e, cps, 1)
269	if err != nil {
270		return err
271	}
272
273	if !e.connected() {
274		return errDisconnected
275	}
276
277	select {
278	case <-e.stopCh:
279		return errStopped
280	case <-ctx.Done():
281		return errContextCanceled
282	default:
283		e.senderMu.Lock()
284		_, err := e.metricExporter.Export(e.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{
285			ResourceMetrics: rms,
286		})
287		e.senderMu.Unlock()
288		if err != nil {
289			return err
290		}
291	}
292	return nil
293}
294
295// ExportKindFor reports back to the OpenTelemetry SDK sending this Exporter
296// metric telemetry that it needs to be provided in a cumulative format.
297func (e *Exporter) ExportKindFor(desc *metric.Descriptor, kind aggregation.Kind) metricsdk.ExportKind {
298	return e.c.exportKindSelector.ExportKindFor(desc, kind)
299}
300
301// ExportSpans exports a batch of SpanData.
302func (e *Exporter) ExportSpans(ctx context.Context, sds []*tracesdk.SpanData) error {
303	return e.uploadTraces(ctx, sds)
304}
305
306func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) error {
307	select {
308	case <-e.stopCh:
309		return nil
310	default:
311		if !e.connected() {
312			return nil
313		}
314
315		protoSpans := transform.SpanData(sdl)
316		if len(protoSpans) == 0 {
317			return nil
318		}
319
320		e.senderMu.Lock()
321		_, err := e.traceExporter.Export(e.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{
322			ResourceSpans: protoSpans,
323		})
324		e.senderMu.Unlock()
325		if err != nil {
326			e.setStateDisconnected(err)
327			return err
328		}
329	}
330	return nil
331}
332