1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//       http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package opencensusexporter
16
17import (
18	"context"
19	"errors"
20	"fmt"
21
22	commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
23	agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
24	agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
25	resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
26	"google.golang.org/grpc"
27	"google.golang.org/grpc/metadata"
28
29	"go.opentelemetry.io/collector/component"
30	"go.opentelemetry.io/collector/model/pdata"
31	"go.opentelemetry.io/collector/translator/internaldata"
32)
33
34// See https://godoc.org/google.golang.org/grpc#ClientConn.NewStream
35// why we need to keep the cancel func to cancel the stream
36type tracesClientWithCancel struct {
37	cancel context.CancelFunc
38	tsec   agenttracepb.TraceService_ExportClient
39}
40
41// See https://godoc.org/google.golang.org/grpc#ClientConn.NewStream
42// why we need to keep the cancel func to cancel the stream
43type metricsClientWithCancel struct {
44	cancel context.CancelFunc
45	msec   agentmetricspb.MetricsService_ExportClient
46}
47
48type ocExporter struct {
49	cfg *Config
50	// gRPC clients and connection.
51	traceSvcClient   agenttracepb.TraceServiceClient
52	metricsSvcClient agentmetricspb.MetricsServiceClient
53	// In any of the channels we keep always NumWorkers object (sometimes nil),
54	// to make sure we don't open more than NumWorkers RPCs at any moment.
55	tracesClients  chan *tracesClientWithCancel
56	metricsClients chan *metricsClientWithCancel
57	grpcClientConn *grpc.ClientConn
58	metadata       metadata.MD
59}
60
61func newOcExporter(_ context.Context, cfg *Config) (*ocExporter, error) {
62	if cfg.Endpoint == "" {
63		return nil, errors.New("OpenCensus exporter cfg requires an Endpoint")
64	}
65
66	if cfg.NumWorkers <= 0 {
67		return nil, errors.New("OpenCensus exporter cfg requires at least one worker")
68	}
69
70	oce := &ocExporter{
71		cfg:      cfg,
72		metadata: metadata.New(cfg.GRPCClientSettings.Headers),
73	}
74	return oce, nil
75}
76
77// start creates the gRPC client Connection
78func (oce *ocExporter) start(ctx context.Context, host component.Host) error {
79	dialOpts, err := oce.cfg.GRPCClientSettings.ToDialOptions(host.GetExtensions())
80	if err != nil {
81		return err
82	}
83	var clientConn *grpc.ClientConn
84	if clientConn, err = grpc.DialContext(ctx, oce.cfg.GRPCClientSettings.Endpoint, dialOpts...); err != nil {
85		return err
86	}
87
88	oce.grpcClientConn = clientConn
89
90	if oce.tracesClients != nil {
91		oce.traceSvcClient = agenttracepb.NewTraceServiceClient(oce.grpcClientConn)
92		// Try to create rpc clients now.
93		for i := 0; i < oce.cfg.NumWorkers; i++ {
94			// Populate the channel with NumWorkers nil RPCs to keep the number of workers
95			// constant in the channel.
96			oce.tracesClients <- nil
97		}
98	}
99
100	if oce.metricsClients != nil {
101		oce.metricsSvcClient = agentmetricspb.NewMetricsServiceClient(oce.grpcClientConn)
102		// Try to create rpc clients now.
103		for i := 0; i < oce.cfg.NumWorkers; i++ {
104			// Populate the channel with NumWorkers nil RPCs to keep the number of workers
105			// constant in the channel.
106			oce.metricsClients <- nil
107		}
108	}
109	return nil
110}
111
112func (oce *ocExporter) shutdown(context.Context) error {
113	if oce.tracesClients != nil {
114		// First remove all the clients from the channel.
115		for i := 0; i < oce.cfg.NumWorkers; i++ {
116			<-oce.tracesClients
117		}
118		// Now close the channel
119		close(oce.tracesClients)
120	}
121	if oce.metricsClients != nil {
122		// First remove all the clients from the channel.
123		for i := 0; i < oce.cfg.NumWorkers; i++ {
124			<-oce.metricsClients
125		}
126		// Now close the channel
127		close(oce.metricsClients)
128	}
129	return oce.grpcClientConn.Close()
130}
131
132func newTracesExporter(ctx context.Context, cfg *Config) (*ocExporter, error) {
133	oce, err := newOcExporter(ctx, cfg)
134	if err != nil {
135		return nil, err
136	}
137	oce.tracesClients = make(chan *tracesClientWithCancel, oce.cfg.NumWorkers)
138	return oce, nil
139}
140
141func newMetricsExporter(ctx context.Context, cfg *Config) (*ocExporter, error) {
142	oce, err := newOcExporter(ctx, cfg)
143	if err != nil {
144		return nil, err
145	}
146	oce.metricsClients = make(chan *metricsClientWithCancel, oce.cfg.NumWorkers)
147	return oce, nil
148}
149
150func (oce *ocExporter) pushTraces(_ context.Context, td pdata.Traces) error {
151	// Get first available trace Client.
152	tClient, ok := <-oce.tracesClients
153	if !ok {
154		err := errors.New("failed to push traces, OpenCensus exporter was already stopped")
155		return err
156	}
157
158	// In any of the metricsClients channel we keep always NumWorkers object (sometimes nil),
159	// to make sure we don't open more than NumWorkers RPCs at any moment.
160	// Here check if the client is nil and create a new one if that is the case. A nil
161	// object means that an error happened: could not connect, service went down, etc.
162	if tClient == nil {
163		var err error
164		tClient, err = oce.createTraceServiceRPC()
165		if err != nil {
166			// Cannot create an RPC, put back nil to keep the number of workers constant.
167			oce.tracesClients <- nil
168			return err
169		}
170	}
171
172	rss := td.ResourceSpans()
173	for i := 0; i < rss.Len(); i++ {
174		node, resource, spans := internaldata.ResourceSpansToOC(rss.At(i))
175		// This is a hack because OC protocol expects a Node for the initial message.
176		if node == nil {
177			node = &commonpb.Node{}
178		}
179		if resource == nil {
180			resource = &resourcepb.Resource{}
181		}
182		req := &agenttracepb.ExportTraceServiceRequest{
183			Spans:    spans,
184			Resource: resource,
185			Node:     node,
186		}
187		if err := tClient.tsec.Send(req); err != nil {
188			// Error received, cancel the context used to create the RPC to free all resources,
189			// put back nil to keep the number of workers constant.
190			tClient.cancel()
191			oce.tracesClients <- nil
192			return err
193		}
194	}
195	oce.tracesClients <- tClient
196	return nil
197}
198
199func (oce *ocExporter) pushMetrics(_ context.Context, md pdata.Metrics) error {
200	// Get first available mClient.
201	mClient, ok := <-oce.metricsClients
202	if !ok {
203		err := errors.New("failed to push metrics, OpenCensus exporter was already stopped")
204		return err
205	}
206
207	// In any of the metricsClients channel we keep always NumWorkers object (sometimes nil),
208	// to make sure we don't open more than NumWorkers RPCs at any moment.
209	// Here check if the client is nil and create a new one if that is the case. A nil
210	// object means that an error happened: could not connect, service went down, etc.
211	if mClient == nil {
212		var err error
213		mClient, err = oce.createMetricsServiceRPC()
214		if err != nil {
215			// Cannot create an RPC, put back nil to keep the number of workers constant.
216			oce.metricsClients <- nil
217			return err
218		}
219	}
220
221	rms := md.ResourceMetrics()
222	for i := 0; i < rms.Len(); i++ {
223		ocReq := agentmetricspb.ExportMetricsServiceRequest{}
224		ocReq.Node, ocReq.Resource, ocReq.Metrics = internaldata.ResourceMetricsToOC(rms.At(i))
225
226		// This is a hack because OC protocol expects a Node for the initial message.
227		if ocReq.Node == nil {
228			ocReq.Node = &commonpb.Node{}
229		}
230		if ocReq.Resource == nil {
231			ocReq.Resource = &resourcepb.Resource{}
232		}
233		if err := mClient.msec.Send(&ocReq); err != nil {
234			// Error received, cancel the context used to create the RPC to free all resources,
235			// put back nil to keep the number of workers constant.
236			mClient.cancel()
237			oce.metricsClients <- nil
238			return err
239		}
240	}
241	oce.metricsClients <- mClient
242	return nil
243}
244
245func (oce *ocExporter) createTraceServiceRPC() (*tracesClientWithCancel, error) {
246	// Initiate the trace service by sending over node identifier info.
247	ctx, cancel := context.WithCancel(context.Background())
248	if len(oce.cfg.Headers) > 0 {
249		ctx = metadata.NewOutgoingContext(ctx, metadata.New(oce.cfg.Headers))
250	}
251	// Cannot use grpc.WaitForReady(cfg.WaitForReady) because will block forever.
252	traceClient, err := oce.traceSvcClient.Export(ctx)
253	if err != nil {
254		cancel()
255		return nil, fmt.Errorf("TraceServiceClient: %w", err)
256	}
257	return &tracesClientWithCancel{cancel: cancel, tsec: traceClient}, nil
258}
259
260func (oce *ocExporter) createMetricsServiceRPC() (*metricsClientWithCancel, error) {
261	// Initiate the trace service by sending over node identifier info.
262	ctx, cancel := context.WithCancel(context.Background())
263	if len(oce.cfg.Headers) > 0 {
264		ctx = metadata.NewOutgoingContext(ctx, metadata.New(oce.cfg.Headers))
265	}
266	// Cannot use grpc.WaitForReady(cfg.WaitForReady) because will block forever.
267	metricsClient, err := oce.metricsSvcClient.Export(ctx)
268	if err != nil {
269		cancel()
270		return nil, fmt.Errorf("MetricsServiceClient: %w", err)
271	}
272	return &metricsClientWithCancel{cancel: cancel, msec: metricsClient}, nil
273}
274