1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package opencensusexporter 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 22 commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" 23 agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" 24 agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" 25 resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" 26 "google.golang.org/grpc" 27 "google.golang.org/grpc/metadata" 28 29 "go.opentelemetry.io/collector/component" 30 "go.opentelemetry.io/collector/model/pdata" 31 "go.opentelemetry.io/collector/translator/internaldata" 32) 33 34// See https://godoc.org/google.golang.org/grpc#ClientConn.NewStream 35// why we need to keep the cancel func to cancel the stream 36type tracesClientWithCancel struct { 37 cancel context.CancelFunc 38 tsec agenttracepb.TraceService_ExportClient 39} 40 41// See https://godoc.org/google.golang.org/grpc#ClientConn.NewStream 42// why we need to keep the cancel func to cancel the stream 43type metricsClientWithCancel struct { 44 cancel context.CancelFunc 45 msec agentmetricspb.MetricsService_ExportClient 46} 47 48type ocExporter struct { 49 cfg *Config 50 // gRPC clients and connection. 51 traceSvcClient agenttracepb.TraceServiceClient 52 metricsSvcClient agentmetricspb.MetricsServiceClient 53 // In any of the channels we keep always NumWorkers object (sometimes nil), 54 // to make sure we don't open more than NumWorkers RPCs at any moment. 55 tracesClients chan *tracesClientWithCancel 56 metricsClients chan *metricsClientWithCancel 57 grpcClientConn *grpc.ClientConn 58 metadata metadata.MD 59} 60 61func newOcExporter(_ context.Context, cfg *Config) (*ocExporter, error) { 62 if cfg.Endpoint == "" { 63 return nil, errors.New("OpenCensus exporter cfg requires an Endpoint") 64 } 65 66 if cfg.NumWorkers <= 0 { 67 return nil, errors.New("OpenCensus exporter cfg requires at least one worker") 68 } 69 70 oce := &ocExporter{ 71 cfg: cfg, 72 metadata: metadata.New(cfg.GRPCClientSettings.Headers), 73 } 74 return oce, nil 75} 76 77// start creates the gRPC client Connection 78func (oce *ocExporter) start(ctx context.Context, host component.Host) error { 79 dialOpts, err := oce.cfg.GRPCClientSettings.ToDialOptions(host.GetExtensions()) 80 if err != nil { 81 return err 82 } 83 var clientConn *grpc.ClientConn 84 if clientConn, err = grpc.DialContext(ctx, oce.cfg.GRPCClientSettings.Endpoint, dialOpts...); err != nil { 85 return err 86 } 87 88 oce.grpcClientConn = clientConn 89 90 if oce.tracesClients != nil { 91 oce.traceSvcClient = agenttracepb.NewTraceServiceClient(oce.grpcClientConn) 92 // Try to create rpc clients now. 93 for i := 0; i < oce.cfg.NumWorkers; i++ { 94 // Populate the channel with NumWorkers nil RPCs to keep the number of workers 95 // constant in the channel. 96 oce.tracesClients <- nil 97 } 98 } 99 100 if oce.metricsClients != nil { 101 oce.metricsSvcClient = agentmetricspb.NewMetricsServiceClient(oce.grpcClientConn) 102 // Try to create rpc clients now. 103 for i := 0; i < oce.cfg.NumWorkers; i++ { 104 // Populate the channel with NumWorkers nil RPCs to keep the number of workers 105 // constant in the channel. 106 oce.metricsClients <- nil 107 } 108 } 109 return nil 110} 111 112func (oce *ocExporter) shutdown(context.Context) error { 113 if oce.tracesClients != nil { 114 // First remove all the clients from the channel. 115 for i := 0; i < oce.cfg.NumWorkers; i++ { 116 <-oce.tracesClients 117 } 118 // Now close the channel 119 close(oce.tracesClients) 120 } 121 if oce.metricsClients != nil { 122 // First remove all the clients from the channel. 123 for i := 0; i < oce.cfg.NumWorkers; i++ { 124 <-oce.metricsClients 125 } 126 // Now close the channel 127 close(oce.metricsClients) 128 } 129 return oce.grpcClientConn.Close() 130} 131 132func newTracesExporter(ctx context.Context, cfg *Config) (*ocExporter, error) { 133 oce, err := newOcExporter(ctx, cfg) 134 if err != nil { 135 return nil, err 136 } 137 oce.tracesClients = make(chan *tracesClientWithCancel, oce.cfg.NumWorkers) 138 return oce, nil 139} 140 141func newMetricsExporter(ctx context.Context, cfg *Config) (*ocExporter, error) { 142 oce, err := newOcExporter(ctx, cfg) 143 if err != nil { 144 return nil, err 145 } 146 oce.metricsClients = make(chan *metricsClientWithCancel, oce.cfg.NumWorkers) 147 return oce, nil 148} 149 150func (oce *ocExporter) pushTraces(_ context.Context, td pdata.Traces) error { 151 // Get first available trace Client. 152 tClient, ok := <-oce.tracesClients 153 if !ok { 154 err := errors.New("failed to push traces, OpenCensus exporter was already stopped") 155 return err 156 } 157 158 // In any of the metricsClients channel we keep always NumWorkers object (sometimes nil), 159 // to make sure we don't open more than NumWorkers RPCs at any moment. 160 // Here check if the client is nil and create a new one if that is the case. A nil 161 // object means that an error happened: could not connect, service went down, etc. 162 if tClient == nil { 163 var err error 164 tClient, err = oce.createTraceServiceRPC() 165 if err != nil { 166 // Cannot create an RPC, put back nil to keep the number of workers constant. 167 oce.tracesClients <- nil 168 return err 169 } 170 } 171 172 rss := td.ResourceSpans() 173 for i := 0; i < rss.Len(); i++ { 174 node, resource, spans := internaldata.ResourceSpansToOC(rss.At(i)) 175 // This is a hack because OC protocol expects a Node for the initial message. 176 if node == nil { 177 node = &commonpb.Node{} 178 } 179 if resource == nil { 180 resource = &resourcepb.Resource{} 181 } 182 req := &agenttracepb.ExportTraceServiceRequest{ 183 Spans: spans, 184 Resource: resource, 185 Node: node, 186 } 187 if err := tClient.tsec.Send(req); err != nil { 188 // Error received, cancel the context used to create the RPC to free all resources, 189 // put back nil to keep the number of workers constant. 190 tClient.cancel() 191 oce.tracesClients <- nil 192 return err 193 } 194 } 195 oce.tracesClients <- tClient 196 return nil 197} 198 199func (oce *ocExporter) pushMetrics(_ context.Context, md pdata.Metrics) error { 200 // Get first available mClient. 201 mClient, ok := <-oce.metricsClients 202 if !ok { 203 err := errors.New("failed to push metrics, OpenCensus exporter was already stopped") 204 return err 205 } 206 207 // In any of the metricsClients channel we keep always NumWorkers object (sometimes nil), 208 // to make sure we don't open more than NumWorkers RPCs at any moment. 209 // Here check if the client is nil and create a new one if that is the case. A nil 210 // object means that an error happened: could not connect, service went down, etc. 211 if mClient == nil { 212 var err error 213 mClient, err = oce.createMetricsServiceRPC() 214 if err != nil { 215 // Cannot create an RPC, put back nil to keep the number of workers constant. 216 oce.metricsClients <- nil 217 return err 218 } 219 } 220 221 rms := md.ResourceMetrics() 222 for i := 0; i < rms.Len(); i++ { 223 ocReq := agentmetricspb.ExportMetricsServiceRequest{} 224 ocReq.Node, ocReq.Resource, ocReq.Metrics = internaldata.ResourceMetricsToOC(rms.At(i)) 225 226 // This is a hack because OC protocol expects a Node for the initial message. 227 if ocReq.Node == nil { 228 ocReq.Node = &commonpb.Node{} 229 } 230 if ocReq.Resource == nil { 231 ocReq.Resource = &resourcepb.Resource{} 232 } 233 if err := mClient.msec.Send(&ocReq); err != nil { 234 // Error received, cancel the context used to create the RPC to free all resources, 235 // put back nil to keep the number of workers constant. 236 mClient.cancel() 237 oce.metricsClients <- nil 238 return err 239 } 240 } 241 oce.metricsClients <- mClient 242 return nil 243} 244 245func (oce *ocExporter) createTraceServiceRPC() (*tracesClientWithCancel, error) { 246 // Initiate the trace service by sending over node identifier info. 247 ctx, cancel := context.WithCancel(context.Background()) 248 if len(oce.cfg.Headers) > 0 { 249 ctx = metadata.NewOutgoingContext(ctx, metadata.New(oce.cfg.Headers)) 250 } 251 // Cannot use grpc.WaitForReady(cfg.WaitForReady) because will block forever. 252 traceClient, err := oce.traceSvcClient.Export(ctx) 253 if err != nil { 254 cancel() 255 return nil, fmt.Errorf("TraceServiceClient: %w", err) 256 } 257 return &tracesClientWithCancel{cancel: cancel, tsec: traceClient}, nil 258} 259 260func (oce *ocExporter) createMetricsServiceRPC() (*metricsClientWithCancel, error) { 261 // Initiate the trace service by sending over node identifier info. 262 ctx, cancel := context.WithCancel(context.Background()) 263 if len(oce.cfg.Headers) > 0 { 264 ctx = metadata.NewOutgoingContext(ctx, metadata.New(oce.cfg.Headers)) 265 } 266 // Cannot use grpc.WaitForReady(cfg.WaitForReady) because will block forever. 267 metricsClient, err := oce.metricsSvcClient.Export(ctx) 268 if err != nil { 269 cancel() 270 return nil, fmt.Errorf("MetricsServiceClient: %w", err) 271 } 272 return &metricsClientWithCancel{cancel: cancel, msec: metricsClient}, nil 273} 274