1// Copyright 2018, OpenCensus Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package ocagent
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"io"
22	"sync"
23	"time"
24	"unsafe"
25
26	"google.golang.org/api/support/bundler"
27	"google.golang.org/grpc"
28	"google.golang.org/grpc/credentials"
29	"google.golang.org/grpc/metadata"
30
31	"go.opencensus.io/plugin/ocgrpc"
32	"go.opencensus.io/resource"
33	"go.opencensus.io/stats/view"
34	"go.opencensus.io/trace"
35
36	commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
37	agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
38	agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
39	metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
40	resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
41	tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
42)
43
44var startupMu sync.Mutex
45var startTime time.Time
46
47func init() {
48	startupMu.Lock()
49	startTime = time.Now()
50	startupMu.Unlock()
51}
52
53var _ trace.Exporter = (*Exporter)(nil)
54var _ view.Exporter = (*Exporter)(nil)
55
56type Exporter struct {
57	// mu protects the non-atomic and non-channel variables
58	mu sync.RWMutex
59	// senderMu protects the concurrent unsafe send on traceExporter client
60	senderMu sync.Mutex
61	// recvMu protects the concurrent unsafe recv on traceExporter client
62	recvMu             sync.Mutex
63	started            bool
64	stopped            bool
65	agentAddress       string
66	serviceName        string
67	canDialInsecure    bool
68	traceExporter      agenttracepb.TraceService_ExportClient
69	metricsExporter    agentmetricspb.MetricsService_ExportClient
70	nodeInfo           *commonpb.Node
71	grpcClientConn     *grpc.ClientConn
72	reconnectionPeriod time.Duration
73	resourceDetector   resource.Detector
74	resource           *resourcepb.Resource
75	compressor         string
76	headers            map[string]string
77	lastConnectErrPtr  unsafe.Pointer
78
79	startOnce      sync.Once
80	stopCh         chan bool
81	disconnectedCh chan bool
82
83	backgroundConnectionDoneCh chan bool
84
85	traceBundler *bundler.Bundler
86
87	// viewDataBundler is the bundler to enable conversion
88	// from OpenCensus-Go view.Data to metricspb.Metric.
89	// Please do not confuse it with metricsBundler!
90	viewDataBundler *bundler.Bundler
91
92	clientTransportCredentials credentials.TransportCredentials
93
94	grpcDialOptions []grpc.DialOption
95}
96
97func NewExporter(opts ...ExporterOption) (*Exporter, error) {
98	exp, err := NewUnstartedExporter(opts...)
99	if err != nil {
100		return nil, err
101	}
102	if err := exp.Start(); err != nil {
103		return nil, err
104	}
105	return exp, nil
106}
107
108const spanDataBufferSize = 300
109
110func NewUnstartedExporter(opts ...ExporterOption) (*Exporter, error) {
111	e := new(Exporter)
112	for _, opt := range opts {
113		opt.withExporter(e)
114	}
115	traceBundler := bundler.NewBundler((*trace.SpanData)(nil), func(bundle interface{}) {
116		e.uploadTraces(bundle.([]*trace.SpanData))
117	})
118	traceBundler.DelayThreshold = 2 * time.Second
119	traceBundler.BundleCountThreshold = spanDataBufferSize
120	e.traceBundler = traceBundler
121
122	viewDataBundler := bundler.NewBundler((*view.Data)(nil), func(bundle interface{}) {
123		e.uploadViewData(bundle.([]*view.Data))
124	})
125	viewDataBundler.DelayThreshold = 2 * time.Second
126	viewDataBundler.BundleCountThreshold = 500 // TODO: (@odeke-em) make this configurable.
127	e.viewDataBundler = viewDataBundler
128	e.nodeInfo = NodeWithStartTime(e.serviceName)
129	if e.resourceDetector != nil {
130		res, err := e.resourceDetector(context.Background())
131		if err != nil {
132			panic(fmt.Sprintf("Error detecting resource. err:%v\n", err))
133		}
134		if res != nil {
135			e.resource = resourceToResourcePb(res)
136		}
137	} else {
138		e.resource = resourceProtoFromEnv()
139	}
140
141	return e, nil
142}
143
144const (
145	maxInitialConfigRetries = 10
146	maxInitialTracesRetries = 10
147)
148
149var (
150	errAlreadyStarted = errors.New("already started")
151	errNotStarted     = errors.New("not started")
152	errStopped        = errors.New("stopped")
153)
154
155// Start dials to the agent, establishing a connection to it. It also
156// initiates the Config and Trace services by sending over the initial
157// messages that consist of the node identifier. Start invokes a background
158// connector that will reattempt connections to the agent periodically
159// if the connection dies.
160func (ae *Exporter) Start() error {
161	var err = errAlreadyStarted
162	ae.startOnce.Do(func() {
163		ae.mu.Lock()
164		ae.started = true
165		ae.disconnectedCh = make(chan bool, 1)
166		ae.stopCh = make(chan bool)
167		ae.backgroundConnectionDoneCh = make(chan bool)
168		ae.mu.Unlock()
169
170		// An optimistic first connection attempt to ensure that
171		// applications under heavy load can immediately process
172		// data. See https://github.com/census-ecosystem/opencensus-go-exporter-ocagent/pull/63
173		if err := ae.connect(); err == nil {
174			ae.setStateConnected()
175		} else {
176			ae.setStateDisconnected(err)
177		}
178		go ae.indefiniteBackgroundConnection()
179
180		err = nil
181	})
182
183	return err
184}
185
186func (ae *Exporter) prepareAgentAddress() string {
187	if ae.agentAddress != "" {
188		return ae.agentAddress
189	}
190	return fmt.Sprintf("%s:%d", DefaultAgentHost, DefaultAgentPort)
191}
192
193func (ae *Exporter) enableConnectionStreams(cc *grpc.ClientConn) error {
194	ae.mu.RLock()
195	started := ae.started
196	nodeInfo := ae.nodeInfo
197	ae.mu.RUnlock()
198
199	if !started {
200		return errNotStarted
201	}
202
203	ae.mu.Lock()
204	// If the previous clientConn was non-nil, close it
205	if ae.grpcClientConn != nil {
206		_ = ae.grpcClientConn.Close()
207	}
208	ae.grpcClientConn = cc
209	ae.mu.Unlock()
210
211	if err := ae.createTraceServiceConnection(ae.grpcClientConn, nodeInfo); err != nil {
212		return err
213	}
214
215	return ae.createMetricsServiceConnection(ae.grpcClientConn, nodeInfo)
216}
217
218func (ae *Exporter) createTraceServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error {
219	// Initiate the trace service by sending over node identifier info.
220	traceSvcClient := agenttracepb.NewTraceServiceClient(cc)
221	ctx := context.Background()
222	if len(ae.headers) > 0 {
223		ctx = metadata.NewOutgoingContext(ctx, metadata.New(ae.headers))
224	}
225	traceExporter, err := traceSvcClient.Export(ctx)
226	if err != nil {
227		return fmt.Errorf("Exporter.Start:: TraceServiceClient: %v", err)
228	}
229
230	firstTraceMessage := &agenttracepb.ExportTraceServiceRequest{
231		Node:     node,
232		Resource: ae.resource,
233	}
234	if err := traceExporter.Send(firstTraceMessage); err != nil {
235		return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err)
236	}
237
238	ae.mu.Lock()
239	ae.traceExporter = traceExporter
240	ae.mu.Unlock()
241
242	// Initiate the config service by sending over node identifier info.
243	configStream, err := traceSvcClient.Config(context.Background())
244	if err != nil {
245		return fmt.Errorf("Exporter.Start:: ConfigStream: %v", err)
246	}
247	firstCfgMessage := &agenttracepb.CurrentLibraryConfig{Node: node}
248	if err := configStream.Send(firstCfgMessage); err != nil {
249		return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err)
250	}
251
252	// In the background, handle trace configurations that are beamed down
253	// by the agent, but also reply to it with the applied configuration.
254	go ae.handleConfigStreaming(configStream)
255
256	return nil
257}
258
259func (ae *Exporter) createMetricsServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error {
260	metricsSvcClient := agentmetricspb.NewMetricsServiceClient(cc)
261	metricsExporter, err := metricsSvcClient.Export(context.Background())
262	if err != nil {
263		return fmt.Errorf("MetricsExporter: failed to start the service client: %v", err)
264	}
265	// Initiate the metrics service by sending over the first message just containing the Node and Resource.
266	firstMetricsMessage := &agentmetricspb.ExportMetricsServiceRequest{
267		Node:     node,
268		Resource: ae.resource,
269	}
270	if err := metricsExporter.Send(firstMetricsMessage); err != nil {
271		return fmt.Errorf("MetricsExporter:: failed to send the first message: %v", err)
272	}
273
274	ae.mu.Lock()
275	ae.metricsExporter = metricsExporter
276	ae.mu.Unlock()
277
278	// With that we are good to go and can start sending metrics
279	return nil
280}
281
282func (ae *Exporter) dialToAgent() (*grpc.ClientConn, error) {
283	addr := ae.prepareAgentAddress()
284	var dialOpts []grpc.DialOption
285	if ae.clientTransportCredentials != nil {
286		dialOpts = append(dialOpts, grpc.WithTransportCredentials(ae.clientTransportCredentials))
287	} else if ae.canDialInsecure {
288		dialOpts = append(dialOpts, grpc.WithInsecure())
289	}
290	if ae.compressor != "" {
291		dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(ae.compressor)))
292	}
293	dialOpts = append(dialOpts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{}))
294	if len(ae.grpcDialOptions) != 0 {
295		dialOpts = append(dialOpts, ae.grpcDialOptions...)
296	}
297
298	ctx := context.Background()
299	if len(ae.headers) > 0 {
300		ctx = metadata.NewOutgoingContext(ctx, metadata.New(ae.headers))
301	}
302	return grpc.DialContext(ctx, addr, dialOpts...)
303}
304
305func (ae *Exporter) handleConfigStreaming(configStream agenttracepb.TraceService_ConfigClient) error {
306	// Note: We haven't yet implemented configuration sending so we
307	// should NOT be changing connection states within this function for now.
308	for {
309		recv, err := configStream.Recv()
310		if err != nil {
311			// TODO: Check if this is a transient error or exponential backoff-able.
312			return err
313		}
314		cfg := recv.Config
315		if cfg == nil {
316			continue
317		}
318
319		// Otherwise now apply the trace configuration sent down from the agent
320		if psamp := cfg.GetProbabilitySampler(); psamp != nil {
321			trace.ApplyConfig(trace.Config{DefaultSampler: trace.ProbabilitySampler(psamp.SamplingProbability)})
322		} else if csamp := cfg.GetConstantSampler(); csamp != nil {
323			alwaysSample := csamp.Decision == tracepb.ConstantSampler_ALWAYS_ON
324			if alwaysSample {
325				trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
326			} else {
327				trace.ApplyConfig(trace.Config{DefaultSampler: trace.NeverSample()})
328			}
329		} else { // TODO: Add the rate limiting sampler here
330		}
331
332		// Then finally send back to upstream the newly applied configuration
333		err = configStream.Send(&agenttracepb.CurrentLibraryConfig{Config: &tracepb.TraceConfig{Sampler: cfg.Sampler}})
334		if err != nil {
335			return err
336		}
337	}
338}
339
340// Stop shuts down all the connections and resources
341// related to the exporter.
342func (ae *Exporter) Stop() error {
343	ae.mu.RLock()
344	cc := ae.grpcClientConn
345	started := ae.started
346	stopped := ae.stopped
347	ae.mu.RUnlock()
348
349	if !started {
350		return errNotStarted
351	}
352	if stopped {
353		// TODO: tell the user that we've already stopped, so perhaps a sentinel error?
354		return nil
355	}
356
357	ae.Flush()
358
359	// Now close the underlying gRPC connection.
360	var err error
361	if cc != nil {
362		err = cc.Close()
363	}
364
365	// At this point we can change the state variables: started and stopped
366	ae.mu.Lock()
367	ae.started = false
368	ae.stopped = true
369	ae.mu.Unlock()
370	close(ae.stopCh)
371
372	// Ensure that the backgroundConnector returns
373	<-ae.backgroundConnectionDoneCh
374
375	return err
376}
377
378func (ae *Exporter) ExportSpan(sd *trace.SpanData) {
379	if sd == nil {
380		return
381	}
382	_ = ae.traceBundler.Add(sd, 1)
383}
384
385func (ae *Exporter) ExportTraceServiceRequest(batch *agenttracepb.ExportTraceServiceRequest) error {
386	if batch == nil || len(batch.Spans) == 0 {
387		return nil
388	}
389
390	select {
391	case <-ae.stopCh:
392		return errStopped
393
394	default:
395		if lastConnectErr := ae.lastConnectError(); lastConnectErr != nil {
396			return fmt.Errorf("ExportTraceServiceRequest: no active connection, last connection error: %v", lastConnectErr)
397		}
398
399		ae.senderMu.Lock()
400		err := ae.traceExporter.Send(batch)
401		ae.senderMu.Unlock()
402		if err != nil {
403			if err == io.EOF {
404				ae.recvMu.Lock()
405				// Perform a .Recv to try to find out why the RPC actually ended.
406				// See:
407				//   * https://github.com/grpc/grpc-go/blob/d389f9fac68eea0dcc49957d0b4cca5b3a0a7171/stream.go#L98-L100
408				//   * https://groups.google.com/forum/#!msg/grpc-io/XcN4hA9HonI/F_UDiejTAwAJ
409				for {
410					_, err = ae.traceExporter.Recv()
411					if err != nil {
412						break
413					}
414				}
415				ae.recvMu.Unlock()
416			}
417
418			ae.setStateDisconnected(err)
419			if err != io.EOF {
420				return err
421			}
422		}
423		return nil
424	}
425}
426
427func (ae *Exporter) ExportView(vd *view.Data) {
428	if vd == nil {
429		return
430	}
431	_ = ae.viewDataBundler.Add(vd, 1)
432}
433
434// ExportMetricsServiceRequest sends proto metrics with the metrics service client.
435func (ae *Exporter) ExportMetricsServiceRequest(batch *agentmetricspb.ExportMetricsServiceRequest) error {
436	if batch == nil || len(batch.Metrics) == 0 {
437		return nil
438	}
439
440	select {
441	case <-ae.stopCh:
442		return errStopped
443
444	default:
445		if lastConnectErr := ae.lastConnectError(); lastConnectErr != nil {
446			return fmt.Errorf("ExportMetricsServiceRequest: no active connection, last connection error: %v", lastConnectErr)
447		}
448
449		ae.senderMu.Lock()
450		err := ae.metricsExporter.Send(batch)
451		ae.senderMu.Unlock()
452		if err != nil {
453			if err == io.EOF {
454				ae.recvMu.Lock()
455				// Perform a .Recv to try to find out why the RPC actually ended.
456				// See:
457				//   * https://github.com/grpc/grpc-go/blob/d389f9fac68eea0dcc49957d0b4cca5b3a0a7171/stream.go#L98-L100
458				//   * https://groups.google.com/forum/#!msg/grpc-io/XcN4hA9HonI/F_UDiejTAwAJ
459				for {
460					_, err = ae.metricsExporter.Recv()
461					if err != nil {
462						break
463					}
464				}
465				ae.recvMu.Unlock()
466			}
467
468			ae.setStateDisconnected(err)
469			if err != io.EOF {
470				return err
471			}
472		}
473		return nil
474	}
475}
476
477func ocSpanDataToPbSpans(sdl []*trace.SpanData) []*tracepb.Span {
478	if len(sdl) == 0 {
479		return nil
480	}
481	protoSpans := make([]*tracepb.Span, 0, len(sdl))
482	for _, sd := range sdl {
483		if sd != nil {
484			protoSpans = append(protoSpans, ocSpanToProtoSpan(sd))
485		}
486	}
487	return protoSpans
488}
489
490func (ae *Exporter) uploadTraces(sdl []*trace.SpanData) {
491	select {
492	case <-ae.stopCh:
493		return
494
495	default:
496		if !ae.connected() {
497			return
498		}
499
500		protoSpans := ocSpanDataToPbSpans(sdl)
501		if len(protoSpans) == 0 {
502			return
503		}
504		ae.senderMu.Lock()
505		err := ae.traceExporter.Send(&agenttracepb.ExportTraceServiceRequest{
506			Spans:    protoSpans,
507			Resource: resourceProtoFromEnv(),
508		})
509		ae.senderMu.Unlock()
510		if err != nil {
511			ae.setStateDisconnected(err)
512		}
513	}
514}
515
516func ocViewDataToPbMetrics(vdl []*view.Data) []*metricspb.Metric {
517	if len(vdl) == 0 {
518		return nil
519	}
520	metrics := make([]*metricspb.Metric, 0, len(vdl))
521	for _, vd := range vdl {
522		if vd != nil {
523			vmetric, err := viewDataToMetric(vd)
524			// TODO: (@odeke-em) somehow report this error, if it is non-nil.
525			if err == nil && vmetric != nil {
526				metrics = append(metrics, vmetric)
527			}
528		}
529	}
530	return metrics
531}
532
533func (ae *Exporter) uploadViewData(vdl []*view.Data) {
534	protoMetrics := ocViewDataToPbMetrics(vdl)
535	if len(protoMetrics) == 0 {
536		return
537	}
538	req := &agentmetricspb.ExportMetricsServiceRequest{
539		Metrics:  protoMetrics,
540		Resource: resourceProtoFromEnv(),
541		// TODO:(@odeke-em)
542		// a) Figure out how to derive a Node from the environment
543		// or better letting users of the exporter configure it.
544	}
545	ae.ExportMetricsServiceRequest(req)
546}
547
548func (ae *Exporter) Flush() {
549	ae.traceBundler.Flush()
550	ae.viewDataBundler.Flush()
551}
552
553func resourceProtoFromEnv() *resourcepb.Resource {
554	rs, _ := resource.FromEnv(context.Background())
555	if rs == nil {
556		return nil
557	}
558	return resourceToResourcePb(rs)
559}
560
561func resourceToResourcePb(rs *resource.Resource) *resourcepb.Resource {
562	rprs := &resourcepb.Resource{
563		Type: rs.Type,
564	}
565	if rs.Labels != nil {
566		rprs.Labels = make(map[string]string)
567		for k, v := range rs.Labels {
568			rprs.Labels[k] = v
569		}
570	}
571	return rprs
572}
573