1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package otlp // import "go.opentelemetry.io/otel/exporters/otlp" 16 17// This code was based on 18// contrib.go.opencensus.io/exporter/ocagent/connection.go 19 20import ( 21 "context" 22 "errors" 23 "fmt" 24 "sync" 25 "unsafe" 26 27 "google.golang.org/grpc" 28 "google.golang.org/grpc/metadata" 29 30 colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" 31 coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" 32 "go.opentelemetry.io/otel/exporters/otlp/internal/transform" 33 "go.opentelemetry.io/otel/metric" 34 metricsdk "go.opentelemetry.io/otel/sdk/export/metric" 35 "go.opentelemetry.io/otel/sdk/export/metric/aggregation" 36 tracesdk "go.opentelemetry.io/otel/sdk/export/trace" 37) 38 39// Exporter is an OpenTelemetry exporter. It exports both traces and metrics 40// from OpenTelemetry instrumented to code using OpenTelemetry protocol 41// buffers to a configurable receiver. 42type Exporter struct { 43 // mu protects the non-atomic and non-channel variables 44 mu sync.RWMutex 45 // senderMu protects the concurrent unsafe sends on the shared gRPC client connection. 46 senderMu sync.Mutex 47 started bool 48 traceExporter coltracepb.TraceServiceClient 49 metricExporter colmetricpb.MetricsServiceClient 50 grpcClientConn *grpc.ClientConn 51 lastConnectErrPtr unsafe.Pointer 52 53 startOnce sync.Once 54 stopOnce sync.Once 55 stopCh chan struct{} 56 disconnectedCh chan bool 57 58 backgroundConnectionDoneCh chan bool 59 60 c config 61 metadata metadata.MD 62} 63 64var _ tracesdk.SpanExporter = (*Exporter)(nil) 65var _ metricsdk.Exporter = (*Exporter)(nil) 66 67// newConfig initializes a config struct with default values and applies 68// any ExporterOptions provided. 69func newConfig(opts ...ExporterOption) config { 70 cfg := config{ 71 grpcServiceConfig: DefaultGRPCServiceConfig, 72 73 // Note: the default ExportKindSelector is specified 74 // as Cumulative: 75 // https://github.com/open-telemetry/opentelemetry-specification/issues/731 76 exportKindSelector: metricsdk.CumulativeExportKindSelector(), 77 } 78 for _, opt := range opts { 79 opt(&cfg) 80 } 81 return cfg 82} 83 84// NewExporter constructs a new Exporter and starts it. 85func NewExporter(opts ...ExporterOption) (*Exporter, error) { 86 exp := NewUnstartedExporter(opts...) 87 if err := exp.Start(); err != nil { 88 return nil, err 89 } 90 return exp, nil 91} 92 93// NewUnstartedExporter constructs a new Exporter and does not start it. 94func NewUnstartedExporter(opts ...ExporterOption) *Exporter { 95 e := new(Exporter) 96 e.c = newConfig(opts...) 97 if len(e.c.headers) > 0 { 98 e.metadata = metadata.New(e.c.headers) 99 } 100 return e 101} 102 103var ( 104 errAlreadyStarted = errors.New("already started") 105 errNotStarted = errors.New("not started") 106 errDisconnected = errors.New("exporter disconnected") 107 errStopped = errors.New("exporter stopped") 108 errContextCanceled = errors.New("context canceled") 109) 110 111// Start dials to the collector, establishing a connection to it. It also 112// initiates the Config and Trace services by sending over the initial 113// messages that consist of the node identifier. Start invokes a background 114// connector that will reattempt connections to the collector periodically 115// if the connection dies. 116func (e *Exporter) Start() error { 117 var err = errAlreadyStarted 118 e.startOnce.Do(func() { 119 e.mu.Lock() 120 e.started = true 121 e.disconnectedCh = make(chan bool, 1) 122 e.stopCh = make(chan struct{}) 123 e.backgroundConnectionDoneCh = make(chan bool) 124 e.mu.Unlock() 125 126 // An optimistic first connection attempt to ensure that 127 // applications under heavy load can immediately process 128 // data. See https://github.com/census-ecosystem/opencensus-go-exporter-ocagent/pull/63 129 if err := e.connect(); err == nil { 130 e.setStateConnected() 131 } else { 132 e.setStateDisconnected(err) 133 } 134 go e.indefiniteBackgroundConnection() 135 136 err = nil 137 }) 138 139 return err 140} 141 142func (e *Exporter) prepareCollectorAddress() string { 143 if e.c.collectorAddr != "" { 144 return e.c.collectorAddr 145 } 146 return fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort) 147} 148 149func (e *Exporter) enableConnections(cc *grpc.ClientConn) error { 150 e.mu.RLock() 151 started := e.started 152 e.mu.RUnlock() 153 154 if !started { 155 return errNotStarted 156 } 157 158 e.mu.Lock() 159 // If previous clientConn is same as the current then just return. 160 // This doesn't happen right now as this func is only called with new ClientConn. 161 // It is more about future-proofing. 162 if e.grpcClientConn == cc { 163 e.mu.Unlock() 164 return nil 165 } 166 // If the previous clientConn was non-nil, close it 167 if e.grpcClientConn != nil { 168 _ = e.grpcClientConn.Close() 169 } 170 e.grpcClientConn = cc 171 e.traceExporter = coltracepb.NewTraceServiceClient(cc) 172 e.metricExporter = colmetricpb.NewMetricsServiceClient(cc) 173 e.mu.Unlock() 174 175 return nil 176} 177 178func (e *Exporter) contextWithMetadata(ctx context.Context) context.Context { 179 if e.metadata.Len() > 0 { 180 return metadata.NewOutgoingContext(ctx, e.metadata) 181 } 182 return ctx 183} 184 185func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) { 186 addr := e.prepareCollectorAddress() 187 188 dialOpts := []grpc.DialOption{} 189 if e.c.grpcServiceConfig != "" { 190 dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(e.c.grpcServiceConfig)) 191 } 192 if e.c.clientCredentials != nil { 193 dialOpts = append(dialOpts, grpc.WithTransportCredentials(e.c.clientCredentials)) 194 } else if e.c.canDialInsecure { 195 dialOpts = append(dialOpts, grpc.WithInsecure()) 196 } 197 if e.c.compressor != "" { 198 dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(e.c.compressor))) 199 } 200 if len(e.c.grpcDialOptions) != 0 { 201 dialOpts = append(dialOpts, e.c.grpcDialOptions...) 202 } 203 204 ctx := e.contextWithMetadata(context.Background()) 205 return grpc.DialContext(ctx, addr, dialOpts...) 206} 207 208// closeStopCh is used to wrap the exporters stopCh channel closing for testing. 209var closeStopCh = func(stopCh chan struct{}) { 210 close(stopCh) 211} 212 213// Shutdown closes all connections and releases resources currently being used 214// by the exporter. If the exporter is not started this does nothing. 215func (e *Exporter) Shutdown(ctx context.Context) error { 216 e.mu.RLock() 217 cc := e.grpcClientConn 218 started := e.started 219 e.mu.RUnlock() 220 221 if !started { 222 return nil 223 } 224 225 var err error 226 227 e.stopOnce.Do(func() { 228 if cc != nil { 229 // Clean things up before checking this error. 230 err = cc.Close() 231 } 232 233 // At this point we can change the state variable started 234 e.mu.Lock() 235 e.started = false 236 e.mu.Unlock() 237 closeStopCh(e.stopCh) 238 239 // Ensure that the backgroundConnector returns 240 select { 241 case <-e.backgroundConnectionDoneCh: 242 case <-ctx.Done(): 243 err = ctx.Err() 244 } 245 }) 246 247 return err 248} 249 250// Export implements the "go.opentelemetry.io/otel/sdk/export/metric".Exporter 251// interface. It transforms and batches metric Records into OTLP Metrics and 252// transmits them to the configured collector. 253func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error { 254 // Unify the parent context Done signal with the exporter stopCh. 255 ctx, cancel := context.WithCancel(parent) 256 defer cancel() 257 go func(ctx context.Context, cancel context.CancelFunc) { 258 select { 259 case <-ctx.Done(): 260 case <-e.stopCh: 261 cancel() 262 } 263 }(ctx, cancel) 264 265 // Hardcode the number of worker goroutines to 1. We later will 266 // need to see if there's a way to adjust that number for longer 267 // running operations. 268 rms, err := transform.CheckpointSet(ctx, e, cps, 1) 269 if err != nil { 270 return err 271 } 272 273 if !e.connected() { 274 return errDisconnected 275 } 276 277 select { 278 case <-e.stopCh: 279 return errStopped 280 case <-ctx.Done(): 281 return errContextCanceled 282 default: 283 e.senderMu.Lock() 284 _, err := e.metricExporter.Export(e.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{ 285 ResourceMetrics: rms, 286 }) 287 e.senderMu.Unlock() 288 if err != nil { 289 return err 290 } 291 } 292 return nil 293} 294 295// ExportKindFor reports back to the OpenTelemetry SDK sending this Exporter 296// metric telemetry that it needs to be provided in a cumulative format. 297func (e *Exporter) ExportKindFor(desc *metric.Descriptor, kind aggregation.Kind) metricsdk.ExportKind { 298 return e.c.exportKindSelector.ExportKindFor(desc, kind) 299} 300 301// ExportSpans exports a batch of SpanData. 302func (e *Exporter) ExportSpans(ctx context.Context, sds []*tracesdk.SpanData) error { 303 return e.uploadTraces(ctx, sds) 304} 305 306func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) error { 307 select { 308 case <-e.stopCh: 309 return nil 310 default: 311 if !e.connected() { 312 return nil 313 } 314 315 protoSpans := transform.SpanData(sdl) 316 if len(protoSpans) == 0 { 317 return nil 318 } 319 320 e.senderMu.Lock() 321 _, err := e.traceExporter.Export(e.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{ 322 ResourceSpans: protoSpans, 323 }) 324 e.senderMu.Unlock() 325 if err != nil { 326 e.setStateDisconnected(err) 327 return err 328 } 329 } 330 return nil 331} 332