1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//       http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package exporterhelper
16
17import (
18	"context"
19	"errors"
20
21	"go.opentelemetry.io/collector/component"
22	"go.opentelemetry.io/collector/config"
23	"go.opentelemetry.io/collector/consumer"
24	"go.opentelemetry.io/collector/consumer/consumererror"
25	"go.opentelemetry.io/collector/consumer/consumerhelper"
26	"go.opentelemetry.io/collector/model/pdata"
27)
28
29type tracesRequest struct {
30	baseRequest
31	td     pdata.Traces
32	pusher consumerhelper.ConsumeTracesFunc
33}
34
35func newTracesRequest(ctx context.Context, td pdata.Traces, pusher consumerhelper.ConsumeTracesFunc) request {
36	return &tracesRequest{
37		baseRequest: baseRequest{ctx: ctx},
38		td:          td,
39		pusher:      pusher,
40	}
41}
42
43func (req *tracesRequest) onError(err error) request {
44	var traceError consumererror.Traces
45	if consumererror.AsTraces(err, &traceError) {
46		return newTracesRequest(req.ctx, traceError.GetTraces(), req.pusher)
47	}
48	return req
49}
50
51func (req *tracesRequest) export(ctx context.Context) error {
52	return req.pusher(ctx, req.td)
53}
54
55func (req *tracesRequest) count() int {
56	return req.td.SpanCount()
57}
58
59type traceExporter struct {
60	*baseExporter
61	consumer.Traces
62}
63
64// NewTracesExporter creates a TracesExporter that records observability metrics and wraps every request with a Span.
65func NewTracesExporter(
66	cfg config.Exporter,
67	set component.ExporterCreateSettings,
68	pusher consumerhelper.ConsumeTracesFunc,
69	options ...Option,
70) (component.TracesExporter, error) {
71
72	if cfg == nil {
73		return nil, errNilConfig
74	}
75
76	if set.Logger == nil {
77		return nil, errNilLogger
78	}
79
80	if pusher == nil {
81		return nil, errNilPushTraceData
82	}
83
84	bs := fromOptions(options...)
85	be := newBaseExporter(cfg, set.Logger, bs)
86	be.wrapConsumerSender(func(nextSender requestSender) requestSender {
87		return &tracesExporterWithObservability{
88			obsrep:     be.obsrep,
89			nextSender: nextSender,
90		}
91	})
92
93	tc, err := consumerhelper.NewTraces(func(ctx context.Context, td pdata.Traces) error {
94		req := newTracesRequest(ctx, td, pusher)
95		err := be.sender.send(req)
96		if errors.Is(err, errSendingQueueIsFull) {
97			be.obsrep.recordTracesEnqueueFailure(req.context(), req.count())
98		}
99		return err
100	}, bs.consumerOptions...)
101
102	return &traceExporter{
103		baseExporter: be,
104		Traces:       tc,
105	}, err
106}
107
108type tracesExporterWithObservability struct {
109	obsrep     *obsExporter
110	nextSender requestSender
111}
112
113func (tewo *tracesExporterWithObservability) send(req request) error {
114	req.setContext(tewo.obsrep.StartTracesOp(req.context()))
115	// Forward the data to the next consumer (this pusher is the next).
116	err := tewo.nextSender.send(req)
117	tewo.obsrep.EndTracesOp(req.context(), req.count(), err)
118	return err
119}
120