1// Copyright 2018, OpenCensus Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package ocagent 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "io" 22 "sync" 23 "time" 24 "unsafe" 25 26 "google.golang.org/api/support/bundler" 27 "google.golang.org/grpc" 28 "google.golang.org/grpc/credentials" 29 "google.golang.org/grpc/metadata" 30 31 "go.opencensus.io/plugin/ocgrpc" 32 "go.opencensus.io/resource" 33 "go.opencensus.io/stats/view" 34 "go.opencensus.io/trace" 35 36 commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" 37 agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" 38 agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" 39 metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" 40 resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" 41 tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" 42) 43 44var startupMu sync.Mutex 45var startTime time.Time 46 47func init() { 48 startupMu.Lock() 49 startTime = time.Now() 50 startupMu.Unlock() 51} 52 53var _ trace.Exporter = (*Exporter)(nil) 54var _ view.Exporter = (*Exporter)(nil) 55 56type Exporter struct { 57 // mu protects the non-atomic and non-channel variables 58 mu sync.RWMutex 59 // senderMu protects the concurrent unsafe send on traceExporter client 60 senderMu sync.Mutex 61 // recvMu protects the concurrent unsafe recv on traceExporter client 62 recvMu sync.Mutex 63 started bool 64 stopped bool 65 agentAddress string 66 serviceName string 67 canDialInsecure bool 68 traceExporter agenttracepb.TraceService_ExportClient 69 metricsExporter agentmetricspb.MetricsService_ExportClient 70 nodeInfo *commonpb.Node 71 grpcClientConn *grpc.ClientConn 72 reconnectionPeriod time.Duration 73 resourceDetector resource.Detector 74 resource *resourcepb.Resource 75 compressor string 76 headers map[string]string 77 lastConnectErrPtr unsafe.Pointer 78 79 startOnce sync.Once 80 stopCh chan bool 81 disconnectedCh chan bool 82 83 backgroundConnectionDoneCh chan bool 84 85 traceBundler *bundler.Bundler 86 87 // viewDataBundler is the bundler to enable conversion 88 // from OpenCensus-Go view.Data to metricspb.Metric. 89 // Please do not confuse it with metricsBundler! 90 viewDataBundler *bundler.Bundler 91 92 clientTransportCredentials credentials.TransportCredentials 93 94 grpcDialOptions []grpc.DialOption 95} 96 97func NewExporter(opts ...ExporterOption) (*Exporter, error) { 98 exp, err := NewUnstartedExporter(opts...) 99 if err != nil { 100 return nil, err 101 } 102 if err := exp.Start(); err != nil { 103 return nil, err 104 } 105 return exp, nil 106} 107 108const spanDataBufferSize = 300 109 110func NewUnstartedExporter(opts ...ExporterOption) (*Exporter, error) { 111 e := new(Exporter) 112 for _, opt := range opts { 113 opt.withExporter(e) 114 } 115 traceBundler := bundler.NewBundler((*trace.SpanData)(nil), func(bundle interface{}) { 116 e.uploadTraces(bundle.([]*trace.SpanData)) 117 }) 118 traceBundler.DelayThreshold = 2 * time.Second 119 traceBundler.BundleCountThreshold = spanDataBufferSize 120 e.traceBundler = traceBundler 121 122 viewDataBundler := bundler.NewBundler((*view.Data)(nil), func(bundle interface{}) { 123 e.uploadViewData(bundle.([]*view.Data)) 124 }) 125 viewDataBundler.DelayThreshold = 2 * time.Second 126 viewDataBundler.BundleCountThreshold = 500 // TODO: (@odeke-em) make this configurable. 127 e.viewDataBundler = viewDataBundler 128 e.nodeInfo = NodeWithStartTime(e.serviceName) 129 if e.resourceDetector != nil { 130 res, err := e.resourceDetector(context.Background()) 131 if err != nil { 132 panic(fmt.Sprintf("Error detecting resource. err:%v\n", err)) 133 } 134 if res != nil { 135 e.resource = resourceToResourcePb(res) 136 } 137 } else { 138 e.resource = resourceProtoFromEnv() 139 } 140 141 return e, nil 142} 143 144const ( 145 maxInitialConfigRetries = 10 146 maxInitialTracesRetries = 10 147) 148 149var ( 150 errAlreadyStarted = errors.New("already started") 151 errNotStarted = errors.New("not started") 152 errStopped = errors.New("stopped") 153) 154 155// Start dials to the agent, establishing a connection to it. It also 156// initiates the Config and Trace services by sending over the initial 157// messages that consist of the node identifier. Start invokes a background 158// connector that will reattempt connections to the agent periodically 159// if the connection dies. 160func (ae *Exporter) Start() error { 161 var err = errAlreadyStarted 162 ae.startOnce.Do(func() { 163 ae.mu.Lock() 164 ae.started = true 165 ae.disconnectedCh = make(chan bool, 1) 166 ae.stopCh = make(chan bool) 167 ae.backgroundConnectionDoneCh = make(chan bool) 168 ae.mu.Unlock() 169 170 // An optimistic first connection attempt to ensure that 171 // applications under heavy load can immediately process 172 // data. See https://github.com/census-ecosystem/opencensus-go-exporter-ocagent/pull/63 173 if err := ae.connect(); err == nil { 174 ae.setStateConnected() 175 } else { 176 ae.setStateDisconnected(err) 177 } 178 go ae.indefiniteBackgroundConnection() 179 180 err = nil 181 }) 182 183 return err 184} 185 186func (ae *Exporter) prepareAgentAddress() string { 187 if ae.agentAddress != "" { 188 return ae.agentAddress 189 } 190 return fmt.Sprintf("%s:%d", DefaultAgentHost, DefaultAgentPort) 191} 192 193func (ae *Exporter) enableConnectionStreams(cc *grpc.ClientConn) error { 194 ae.mu.RLock() 195 started := ae.started 196 nodeInfo := ae.nodeInfo 197 ae.mu.RUnlock() 198 199 if !started { 200 return errNotStarted 201 } 202 203 ae.mu.Lock() 204 // If the previous clientConn was non-nil, close it 205 if ae.grpcClientConn != nil { 206 _ = ae.grpcClientConn.Close() 207 } 208 ae.grpcClientConn = cc 209 ae.mu.Unlock() 210 211 if err := ae.createTraceServiceConnection(ae.grpcClientConn, nodeInfo); err != nil { 212 return err 213 } 214 215 return ae.createMetricsServiceConnection(ae.grpcClientConn, nodeInfo) 216} 217 218func (ae *Exporter) createTraceServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error { 219 // Initiate the trace service by sending over node identifier info. 220 traceSvcClient := agenttracepb.NewTraceServiceClient(cc) 221 ctx := context.Background() 222 if len(ae.headers) > 0 { 223 ctx = metadata.NewOutgoingContext(ctx, metadata.New(ae.headers)) 224 } 225 traceExporter, err := traceSvcClient.Export(ctx) 226 if err != nil { 227 return fmt.Errorf("Exporter.Start:: TraceServiceClient: %v", err) 228 } 229 230 firstTraceMessage := &agenttracepb.ExportTraceServiceRequest{ 231 Node: node, 232 Resource: ae.resource, 233 } 234 if err := traceExporter.Send(firstTraceMessage); err != nil { 235 return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err) 236 } 237 238 ae.mu.Lock() 239 ae.traceExporter = traceExporter 240 ae.mu.Unlock() 241 242 // Initiate the config service by sending over node identifier info. 243 configStream, err := traceSvcClient.Config(context.Background()) 244 if err != nil { 245 return fmt.Errorf("Exporter.Start:: ConfigStream: %v", err) 246 } 247 firstCfgMessage := &agenttracepb.CurrentLibraryConfig{Node: node} 248 if err := configStream.Send(firstCfgMessage); err != nil { 249 return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err) 250 } 251 252 // In the background, handle trace configurations that are beamed down 253 // by the agent, but also reply to it with the applied configuration. 254 go ae.handleConfigStreaming(configStream) 255 256 return nil 257} 258 259func (ae *Exporter) createMetricsServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error { 260 metricsSvcClient := agentmetricspb.NewMetricsServiceClient(cc) 261 metricsExporter, err := metricsSvcClient.Export(context.Background()) 262 if err != nil { 263 return fmt.Errorf("MetricsExporter: failed to start the service client: %v", err) 264 } 265 // Initiate the metrics service by sending over the first message just containing the Node and Resource. 266 firstMetricsMessage := &agentmetricspb.ExportMetricsServiceRequest{ 267 Node: node, 268 Resource: ae.resource, 269 } 270 if err := metricsExporter.Send(firstMetricsMessage); err != nil { 271 return fmt.Errorf("MetricsExporter:: failed to send the first message: %v", err) 272 } 273 274 ae.mu.Lock() 275 ae.metricsExporter = metricsExporter 276 ae.mu.Unlock() 277 278 // With that we are good to go and can start sending metrics 279 return nil 280} 281 282func (ae *Exporter) dialToAgent() (*grpc.ClientConn, error) { 283 addr := ae.prepareAgentAddress() 284 var dialOpts []grpc.DialOption 285 if ae.clientTransportCredentials != nil { 286 dialOpts = append(dialOpts, grpc.WithTransportCredentials(ae.clientTransportCredentials)) 287 } else if ae.canDialInsecure { 288 dialOpts = append(dialOpts, grpc.WithInsecure()) 289 } 290 if ae.compressor != "" { 291 dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(ae.compressor))) 292 } 293 dialOpts = append(dialOpts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{})) 294 if len(ae.grpcDialOptions) != 0 { 295 dialOpts = append(dialOpts, ae.grpcDialOptions...) 296 } 297 298 ctx := context.Background() 299 if len(ae.headers) > 0 { 300 ctx = metadata.NewOutgoingContext(ctx, metadata.New(ae.headers)) 301 } 302 return grpc.DialContext(ctx, addr, dialOpts...) 303} 304 305func (ae *Exporter) handleConfigStreaming(configStream agenttracepb.TraceService_ConfigClient) error { 306 // Note: We haven't yet implemented configuration sending so we 307 // should NOT be changing connection states within this function for now. 308 for { 309 recv, err := configStream.Recv() 310 if err != nil { 311 // TODO: Check if this is a transient error or exponential backoff-able. 312 return err 313 } 314 cfg := recv.Config 315 if cfg == nil { 316 continue 317 } 318 319 // Otherwise now apply the trace configuration sent down from the agent 320 if psamp := cfg.GetProbabilitySampler(); psamp != nil { 321 trace.ApplyConfig(trace.Config{DefaultSampler: trace.ProbabilitySampler(psamp.SamplingProbability)}) 322 } else if csamp := cfg.GetConstantSampler(); csamp != nil { 323 alwaysSample := csamp.Decision == tracepb.ConstantSampler_ALWAYS_ON 324 if alwaysSample { 325 trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()}) 326 } else { 327 trace.ApplyConfig(trace.Config{DefaultSampler: trace.NeverSample()}) 328 } 329 } else { // TODO: Add the rate limiting sampler here 330 } 331 332 // Then finally send back to upstream the newly applied configuration 333 err = configStream.Send(&agenttracepb.CurrentLibraryConfig{Config: &tracepb.TraceConfig{Sampler: cfg.Sampler}}) 334 if err != nil { 335 return err 336 } 337 } 338} 339 340// Stop shuts down all the connections and resources 341// related to the exporter. 342func (ae *Exporter) Stop() error { 343 ae.mu.RLock() 344 cc := ae.grpcClientConn 345 started := ae.started 346 stopped := ae.stopped 347 ae.mu.RUnlock() 348 349 if !started { 350 return errNotStarted 351 } 352 if stopped { 353 // TODO: tell the user that we've already stopped, so perhaps a sentinel error? 354 return nil 355 } 356 357 ae.Flush() 358 359 // Now close the underlying gRPC connection. 360 var err error 361 if cc != nil { 362 err = cc.Close() 363 } 364 365 // At this point we can change the state variables: started and stopped 366 ae.mu.Lock() 367 ae.started = false 368 ae.stopped = true 369 ae.mu.Unlock() 370 close(ae.stopCh) 371 372 // Ensure that the backgroundConnector returns 373 <-ae.backgroundConnectionDoneCh 374 375 return err 376} 377 378func (ae *Exporter) ExportSpan(sd *trace.SpanData) { 379 if sd == nil { 380 return 381 } 382 _ = ae.traceBundler.Add(sd, 1) 383} 384 385func (ae *Exporter) ExportTraceServiceRequest(batch *agenttracepb.ExportTraceServiceRequest) error { 386 if batch == nil || len(batch.Spans) == 0 { 387 return nil 388 } 389 390 select { 391 case <-ae.stopCh: 392 return errStopped 393 394 default: 395 if lastConnectErr := ae.lastConnectError(); lastConnectErr != nil { 396 return fmt.Errorf("ExportTraceServiceRequest: no active connection, last connection error: %v", lastConnectErr) 397 } 398 399 ae.senderMu.Lock() 400 err := ae.traceExporter.Send(batch) 401 ae.senderMu.Unlock() 402 if err != nil { 403 if err == io.EOF { 404 ae.recvMu.Lock() 405 // Perform a .Recv to try to find out why the RPC actually ended. 406 // See: 407 // * https://github.com/grpc/grpc-go/blob/d389f9fac68eea0dcc49957d0b4cca5b3a0a7171/stream.go#L98-L100 408 // * https://groups.google.com/forum/#!msg/grpc-io/XcN4hA9HonI/F_UDiejTAwAJ 409 for { 410 _, err = ae.traceExporter.Recv() 411 if err != nil { 412 break 413 } 414 } 415 ae.recvMu.Unlock() 416 } 417 418 ae.setStateDisconnected(err) 419 if err != io.EOF { 420 return err 421 } 422 } 423 return nil 424 } 425} 426 427func (ae *Exporter) ExportView(vd *view.Data) { 428 if vd == nil { 429 return 430 } 431 _ = ae.viewDataBundler.Add(vd, 1) 432} 433 434// ExportMetricsServiceRequest sends proto metrics with the metrics service client. 435func (ae *Exporter) ExportMetricsServiceRequest(batch *agentmetricspb.ExportMetricsServiceRequest) error { 436 if batch == nil || len(batch.Metrics) == 0 { 437 return nil 438 } 439 440 select { 441 case <-ae.stopCh: 442 return errStopped 443 444 default: 445 if lastConnectErr := ae.lastConnectError(); lastConnectErr != nil { 446 return fmt.Errorf("ExportMetricsServiceRequest: no active connection, last connection error: %v", lastConnectErr) 447 } 448 449 ae.senderMu.Lock() 450 err := ae.metricsExporter.Send(batch) 451 ae.senderMu.Unlock() 452 if err != nil { 453 if err == io.EOF { 454 ae.recvMu.Lock() 455 // Perform a .Recv to try to find out why the RPC actually ended. 456 // See: 457 // * https://github.com/grpc/grpc-go/blob/d389f9fac68eea0dcc49957d0b4cca5b3a0a7171/stream.go#L98-L100 458 // * https://groups.google.com/forum/#!msg/grpc-io/XcN4hA9HonI/F_UDiejTAwAJ 459 for { 460 _, err = ae.metricsExporter.Recv() 461 if err != nil { 462 break 463 } 464 } 465 ae.recvMu.Unlock() 466 } 467 468 ae.setStateDisconnected(err) 469 if err != io.EOF { 470 return err 471 } 472 } 473 return nil 474 } 475} 476 477func ocSpanDataToPbSpans(sdl []*trace.SpanData) []*tracepb.Span { 478 if len(sdl) == 0 { 479 return nil 480 } 481 protoSpans := make([]*tracepb.Span, 0, len(sdl)) 482 for _, sd := range sdl { 483 if sd != nil { 484 protoSpans = append(protoSpans, ocSpanToProtoSpan(sd)) 485 } 486 } 487 return protoSpans 488} 489 490func (ae *Exporter) uploadTraces(sdl []*trace.SpanData) { 491 select { 492 case <-ae.stopCh: 493 return 494 495 default: 496 if !ae.connected() { 497 return 498 } 499 500 protoSpans := ocSpanDataToPbSpans(sdl) 501 if len(protoSpans) == 0 { 502 return 503 } 504 ae.senderMu.Lock() 505 err := ae.traceExporter.Send(&agenttracepb.ExportTraceServiceRequest{ 506 Spans: protoSpans, 507 Resource: resourceProtoFromEnv(), 508 }) 509 ae.senderMu.Unlock() 510 if err != nil { 511 ae.setStateDisconnected(err) 512 } 513 } 514} 515 516func ocViewDataToPbMetrics(vdl []*view.Data) []*metricspb.Metric { 517 if len(vdl) == 0 { 518 return nil 519 } 520 metrics := make([]*metricspb.Metric, 0, len(vdl)) 521 for _, vd := range vdl { 522 if vd != nil { 523 vmetric, err := viewDataToMetric(vd) 524 // TODO: (@odeke-em) somehow report this error, if it is non-nil. 525 if err == nil && vmetric != nil { 526 metrics = append(metrics, vmetric) 527 } 528 } 529 } 530 return metrics 531} 532 533func (ae *Exporter) uploadViewData(vdl []*view.Data) { 534 protoMetrics := ocViewDataToPbMetrics(vdl) 535 if len(protoMetrics) == 0 { 536 return 537 } 538 req := &agentmetricspb.ExportMetricsServiceRequest{ 539 Metrics: protoMetrics, 540 Resource: resourceProtoFromEnv(), 541 // TODO:(@odeke-em) 542 // a) Figure out how to derive a Node from the environment 543 // or better letting users of the exporter configure it. 544 } 545 ae.ExportMetricsServiceRequest(req) 546} 547 548func (ae *Exporter) Flush() { 549 ae.traceBundler.Flush() 550 ae.viewDataBundler.Flush() 551} 552 553func resourceProtoFromEnv() *resourcepb.Resource { 554 rs, _ := resource.FromEnv(context.Background()) 555 if rs == nil { 556 return nil 557 } 558 return resourceToResourcePb(rs) 559} 560 561func resourceToResourcePb(rs *resource.Resource) *resourcepb.Resource { 562 rprs := &resourcepb.Resource{ 563 Type: rs.Type, 564 } 565 if rs.Labels != nil { 566 rprs.Labels = make(map[string]string) 567 for k, v := range rs.Labels { 568 rprs.Labels[k] = v 569 } 570 } 571 return rprs 572} 573