1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package exporterhelper 16 17import ( 18 "context" 19 "errors" 20 21 "go.opentelemetry.io/collector/component" 22 "go.opentelemetry.io/collector/config" 23 "go.opentelemetry.io/collector/consumer" 24 "go.opentelemetry.io/collector/consumer/consumererror" 25 "go.opentelemetry.io/collector/consumer/consumerhelper" 26 "go.opentelemetry.io/collector/model/pdata" 27) 28 29type tracesRequest struct { 30 baseRequest 31 td pdata.Traces 32 pusher consumerhelper.ConsumeTracesFunc 33} 34 35func newTracesRequest(ctx context.Context, td pdata.Traces, pusher consumerhelper.ConsumeTracesFunc) request { 36 return &tracesRequest{ 37 baseRequest: baseRequest{ctx: ctx}, 38 td: td, 39 pusher: pusher, 40 } 41} 42 43func (req *tracesRequest) onError(err error) request { 44 var traceError consumererror.Traces 45 if consumererror.AsTraces(err, &traceError) { 46 return newTracesRequest(req.ctx, traceError.GetTraces(), req.pusher) 47 } 48 return req 49} 50 51func (req *tracesRequest) export(ctx context.Context) error { 52 return req.pusher(ctx, req.td) 53} 54 55func (req *tracesRequest) count() int { 56 return req.td.SpanCount() 57} 58 59type traceExporter struct { 60 *baseExporter 61 consumer.Traces 62} 63 64// NewTracesExporter creates a TracesExporter that records observability metrics and wraps every request with a Span. 65func NewTracesExporter( 66 cfg config.Exporter, 67 set component.ExporterCreateSettings, 68 pusher consumerhelper.ConsumeTracesFunc, 69 options ...Option, 70) (component.TracesExporter, error) { 71 72 if cfg == nil { 73 return nil, errNilConfig 74 } 75 76 if set.Logger == nil { 77 return nil, errNilLogger 78 } 79 80 if pusher == nil { 81 return nil, errNilPushTraceData 82 } 83 84 bs := fromOptions(options...) 85 be := newBaseExporter(cfg, set.Logger, bs) 86 be.wrapConsumerSender(func(nextSender requestSender) requestSender { 87 return &tracesExporterWithObservability{ 88 obsrep: be.obsrep, 89 nextSender: nextSender, 90 } 91 }) 92 93 tc, err := consumerhelper.NewTraces(func(ctx context.Context, td pdata.Traces) error { 94 req := newTracesRequest(ctx, td, pusher) 95 err := be.sender.send(req) 96 if errors.Is(err, errSendingQueueIsFull) { 97 be.obsrep.recordTracesEnqueueFailure(req.context(), req.count()) 98 } 99 return err 100 }, bs.consumerOptions...) 101 102 return &traceExporter{ 103 baseExporter: be, 104 Traces: tc, 105 }, err 106} 107 108type tracesExporterWithObservability struct { 109 obsrep *obsExporter 110 nextSender requestSender 111} 112 113func (tewo *tracesExporterWithObservability) send(req request) error { 114 req.setContext(tewo.obsrep.StartTracesOp(req.context())) 115 // Forward the data to the next consumer (this pusher is the next). 116 err := tewo.nextSender.send(req) 117 tewo.obsrep.EndTracesOp(req.context(), req.count(), err) 118 return err 119} 120