1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//       http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package internal
16
17import (
18	"context"
19	"errors"
20	"math"
21	"net"
22	"sync/atomic"
23
24	commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
25	metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
26	resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
27	"github.com/prometheus/common/model"
28	"github.com/prometheus/prometheus/pkg/exemplar"
29	"github.com/prometheus/prometheus/pkg/labels"
30	"github.com/prometheus/prometheus/storage"
31	"go.uber.org/zap"
32	"google.golang.org/protobuf/types/known/timestamppb"
33
34	"go.opentelemetry.io/collector/config"
35	"go.opentelemetry.io/collector/consumer"
36	"go.opentelemetry.io/collector/obsreport"
37	"go.opentelemetry.io/collector/translator/internaldata"
38)
39
40const (
41	portAttr     = "port"
42	schemeAttr   = "scheme"
43	jobAttr      = "job"
44	instanceAttr = "instance"
45
46	transport  = "http"
47	dataformat = "prometheus"
48)
49
50var errMetricNameNotFound = errors.New("metricName not found from labels")
51var errTransactionAborted = errors.New("transaction aborted")
52var errNoJobInstance = errors.New("job or instance cannot be found from labels")
53var errNoStartTimeMetrics = errors.New("process_start_time_seconds metric is missing")
54
55// A transaction is corresponding to an individual scrape operation or stale report.
56// That said, whenever prometheus receiver scrapped a target metric endpoint a page of raw metrics is returned,
57// a transaction, which acts as appender, is created to process this page of data, the scrapeLoop will call the Add or
58// AddFast method to insert metrics data points, when finished either Commit, which means success, is called and data
59// will be flush to the downstream consumer, or Rollback, which means discard all the data, is called and all data
60// points are discarded.
61type transaction struct {
62	id                   int64
63	ctx                  context.Context
64	isNew                bool
65	sink                 consumer.Metrics
66	job                  string
67	instance             string
68	jobsMap              *JobsMap
69	useStartTimeMetric   bool
70	startTimeMetricRegex string
71	receiverID           config.ComponentID
72	ms                   *metadataService
73	node                 *commonpb.Node
74	resource             *resourcepb.Resource
75	metricBuilder        *metricBuilder
76	externalLabels       labels.Labels
77	logger               *zap.Logger
78	obsrecv              *obsreport.Receiver
79	stalenessStore       *stalenessStore
80	startTimeMs          int64
81}
82
83func newTransaction(
84	ctx context.Context,
85	jobsMap *JobsMap,
86	useStartTimeMetric bool,
87	startTimeMetricRegex string,
88	receiverID config.ComponentID,
89	ms *metadataService,
90	sink consumer.Metrics,
91	externalLabels labels.Labels,
92	logger *zap.Logger, stalenessStore *stalenessStore) *transaction {
93	return &transaction{
94		id:                   atomic.AddInt64(&idSeq, 1),
95		ctx:                  ctx,
96		isNew:                true,
97		sink:                 sink,
98		jobsMap:              jobsMap,
99		useStartTimeMetric:   useStartTimeMetric,
100		startTimeMetricRegex: startTimeMetricRegex,
101		receiverID:           receiverID,
102		ms:                   ms,
103		externalLabels:       externalLabels,
104		logger:               logger,
105		obsrecv:              obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiverID, Transport: transport}),
106		stalenessStore:       stalenessStore,
107		startTimeMs:          -1,
108	}
109}
110
111// ensure *transaction has implemented the storage.Appender interface
112var _ storage.Appender = (*transaction)(nil)
113
114// Append always returns 0 to disable label caching.
115func (tr *transaction) Append(ref uint64, ls labels.Labels, t int64, v float64) (uint64, error) {
116	if tr.startTimeMs < 0 {
117		tr.startTimeMs = t
118	}
119	// Important, must handle. prometheus will still try to feed the appender some data even if it failed to
120	// scrape the remote target,  if the previous scrape was success and some data were cached internally
121	// in our case, we don't need these data, simply drop them shall be good enough. more details:
122	// https://github.com/prometheus/prometheus/blob/851131b0740be7291b98f295567a97f32fffc655/scrape/scrape.go#L933-L935
123	if math.IsNaN(v) {
124		return 0, nil
125	}
126
127	select {
128	case <-tr.ctx.Done():
129		return 0, errTransactionAborted
130	default:
131	}
132	if len(tr.externalLabels) > 0 {
133		// TODO(jbd): Improve the allocs.
134		ls = append(ls, tr.externalLabels...)
135	}
136	if tr.isNew {
137		if err := tr.initTransaction(ls); err != nil {
138			return 0, err
139		}
140	}
141
142	return 0, tr.metricBuilder.AddDataPoint(ls, t, v)
143}
144
145func (tr *transaction) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) {
146	return 0, nil
147}
148
149// AddFast always returns error since caching is not supported by Add() function.
150func (tr *transaction) AddFast(_ uint64, _ int64, _ float64) error {
151	return storage.ErrNotFound
152}
153
154func (tr *transaction) initTransaction(ls labels.Labels) error {
155	job, instance := ls.Get(model.JobLabel), ls.Get(model.InstanceLabel)
156	if job == "" || instance == "" {
157		return errNoJobInstance
158	}
159	// discover the binding target when this method is called for the first time during a transaction
160	mc, err := tr.ms.Get(job, instance)
161	if err != nil {
162		return err
163	}
164	if tr.jobsMap != nil {
165		tr.job = job
166		tr.instance = instance
167	}
168	tr.node, tr.resource = createNodeAndResource(job, instance, mc.SharedLabels().Get(model.SchemeLabel))
169	tr.metricBuilder = newMetricBuilder(mc, tr.useStartTimeMetric, tr.startTimeMetricRegex, tr.logger, tr.stalenessStore)
170	tr.isNew = false
171	return nil
172}
173
174// Commit submits metrics data to consumers.
175func (tr *transaction) Commit() error {
176	if tr.isNew {
177		// In a situation like not able to connect to the remote server, scrapeloop will still commit even if it had
178		// never added any data points, that the transaction has not been initialized.
179		return nil
180	}
181
182	// Before building metrics, issue staleness markers for every stale metric.
183	staleLabels := tr.stalenessStore.emitStaleLabels()
184
185	for _, sEntry := range staleLabels {
186		tr.metricBuilder.AddDataPoint(sEntry.labels, sEntry.seenAtMs, stalenessSpecialValue)
187	}
188
189	tr.startTimeMs = -1
190
191	ctx := tr.obsrecv.StartMetricsOp(tr.ctx)
192	metrics, _, _, err := tr.metricBuilder.Build()
193	if err != nil {
194		// Only error by Build() is errNoDataToBuild, with numReceivedPoints set to zero.
195		tr.obsrecv.EndMetricsOp(ctx, dataformat, 0, err)
196		return err
197	}
198
199	if tr.useStartTimeMetric {
200		// startTime is mandatory in this case, but may be zero when the
201		// process_start_time_seconds metric is missing from the target endpoint.
202		if tr.metricBuilder.startTime == 0.0 {
203			// Since we are unable to adjust metrics properly, we will drop them
204			// and return an error.
205			err = errNoStartTimeMetrics
206			tr.obsrecv.EndMetricsOp(ctx, dataformat, 0, err)
207			return err
208		}
209
210		adjustStartTimestamp(tr.metricBuilder.startTime, metrics)
211	} else {
212		// AdjustMetrics - jobsMap has to be non-nil in this case.
213		// Note: metrics could be empty after adjustment, which needs to be checked before passing it on to ConsumeMetrics()
214		metrics, _ = NewMetricsAdjuster(tr.jobsMap.get(tr.job, tr.instance), tr.logger).AdjustMetrics(metrics)
215	}
216
217	numPoints := 0
218	if len(metrics) > 0 {
219		md := internaldata.OCToMetrics(tr.node, tr.resource, metrics)
220		numPoints = md.DataPointCount()
221		err = tr.sink.ConsumeMetrics(ctx, md)
222	}
223	tr.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, err)
224	return err
225}
226
227func (tr *transaction) Rollback() error {
228	tr.startTimeMs = -1
229	return nil
230}
231
232func adjustStartTimestamp(startTime float64, metrics []*metricspb.Metric) {
233	startTimeTs := timestampFromFloat64(startTime)
234	for _, metric := range metrics {
235		switch metric.GetMetricDescriptor().GetType() {
236		case metricspb.MetricDescriptor_GAUGE_DOUBLE, metricspb.MetricDescriptor_GAUGE_DISTRIBUTION:
237			continue
238		default:
239			for _, ts := range metric.GetTimeseries() {
240				ts.StartTimestamp = startTimeTs
241			}
242		}
243	}
244}
245
246func timestampFromFloat64(ts float64) *timestamppb.Timestamp {
247	secs := int64(ts)
248	nanos := int64((ts - float64(secs)) * 1e9)
249	return &timestamppb.Timestamp{
250		Seconds: secs,
251		Nanos:   int32(nanos),
252	}
253}
254
255func createNodeAndResource(job, instance, scheme string) (*commonpb.Node, *resourcepb.Resource) {
256	host, port, err := net.SplitHostPort(instance)
257	if err != nil {
258		host = instance
259	}
260	node := &commonpb.Node{
261		ServiceInfo: &commonpb.ServiceInfo{Name: job},
262		Identifier: &commonpb.ProcessIdentifier{
263			HostName: host,
264		},
265	}
266	resource := &resourcepb.Resource{
267		Labels: map[string]string{
268			jobAttr:      job,
269			instanceAttr: instance,
270			portAttr:     port,
271			schemeAttr:   scheme,
272		},
273	}
274	return node, resource
275}
276