1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package internal 16 17import ( 18 "context" 19 "errors" 20 "math" 21 "net" 22 "sync/atomic" 23 24 commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" 25 metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" 26 resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" 27 "github.com/prometheus/common/model" 28 "github.com/prometheus/prometheus/pkg/exemplar" 29 "github.com/prometheus/prometheus/pkg/labels" 30 "github.com/prometheus/prometheus/storage" 31 "go.uber.org/zap" 32 "google.golang.org/protobuf/types/known/timestamppb" 33 34 "go.opentelemetry.io/collector/config" 35 "go.opentelemetry.io/collector/consumer" 36 "go.opentelemetry.io/collector/obsreport" 37 "go.opentelemetry.io/collector/translator/internaldata" 38) 39 40const ( 41 portAttr = "port" 42 schemeAttr = "scheme" 43 jobAttr = "job" 44 instanceAttr = "instance" 45 46 transport = "http" 47 dataformat = "prometheus" 48) 49 50var errMetricNameNotFound = errors.New("metricName not found from labels") 51var errTransactionAborted = errors.New("transaction aborted") 52var errNoJobInstance = errors.New("job or instance cannot be found from labels") 53var errNoStartTimeMetrics = errors.New("process_start_time_seconds metric is missing") 54 55// A transaction is corresponding to an individual scrape operation or stale report. 56// That said, whenever prometheus receiver scrapped a target metric endpoint a page of raw metrics is returned, 57// a transaction, which acts as appender, is created to process this page of data, the scrapeLoop will call the Add or 58// AddFast method to insert metrics data points, when finished either Commit, which means success, is called and data 59// will be flush to the downstream consumer, or Rollback, which means discard all the data, is called and all data 60// points are discarded. 61type transaction struct { 62 id int64 63 ctx context.Context 64 isNew bool 65 sink consumer.Metrics 66 job string 67 instance string 68 jobsMap *JobsMap 69 useStartTimeMetric bool 70 startTimeMetricRegex string 71 receiverID config.ComponentID 72 ms *metadataService 73 node *commonpb.Node 74 resource *resourcepb.Resource 75 metricBuilder *metricBuilder 76 externalLabels labels.Labels 77 logger *zap.Logger 78 obsrecv *obsreport.Receiver 79 stalenessStore *stalenessStore 80 startTimeMs int64 81} 82 83func newTransaction( 84 ctx context.Context, 85 jobsMap *JobsMap, 86 useStartTimeMetric bool, 87 startTimeMetricRegex string, 88 receiverID config.ComponentID, 89 ms *metadataService, 90 sink consumer.Metrics, 91 externalLabels labels.Labels, 92 logger *zap.Logger, stalenessStore *stalenessStore) *transaction { 93 return &transaction{ 94 id: atomic.AddInt64(&idSeq, 1), 95 ctx: ctx, 96 isNew: true, 97 sink: sink, 98 jobsMap: jobsMap, 99 useStartTimeMetric: useStartTimeMetric, 100 startTimeMetricRegex: startTimeMetricRegex, 101 receiverID: receiverID, 102 ms: ms, 103 externalLabels: externalLabels, 104 logger: logger, 105 obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiverID, Transport: transport}), 106 stalenessStore: stalenessStore, 107 startTimeMs: -1, 108 } 109} 110 111// ensure *transaction has implemented the storage.Appender interface 112var _ storage.Appender = (*transaction)(nil) 113 114// Append always returns 0 to disable label caching. 115func (tr *transaction) Append(ref uint64, ls labels.Labels, t int64, v float64) (uint64, error) { 116 if tr.startTimeMs < 0 { 117 tr.startTimeMs = t 118 } 119 // Important, must handle. prometheus will still try to feed the appender some data even if it failed to 120 // scrape the remote target, if the previous scrape was success and some data were cached internally 121 // in our case, we don't need these data, simply drop them shall be good enough. more details: 122 // https://github.com/prometheus/prometheus/blob/851131b0740be7291b98f295567a97f32fffc655/scrape/scrape.go#L933-L935 123 if math.IsNaN(v) { 124 return 0, nil 125 } 126 127 select { 128 case <-tr.ctx.Done(): 129 return 0, errTransactionAborted 130 default: 131 } 132 if len(tr.externalLabels) > 0 { 133 // TODO(jbd): Improve the allocs. 134 ls = append(ls, tr.externalLabels...) 135 } 136 if tr.isNew { 137 if err := tr.initTransaction(ls); err != nil { 138 return 0, err 139 } 140 } 141 142 return 0, tr.metricBuilder.AddDataPoint(ls, t, v) 143} 144 145func (tr *transaction) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { 146 return 0, nil 147} 148 149// AddFast always returns error since caching is not supported by Add() function. 150func (tr *transaction) AddFast(_ uint64, _ int64, _ float64) error { 151 return storage.ErrNotFound 152} 153 154func (tr *transaction) initTransaction(ls labels.Labels) error { 155 job, instance := ls.Get(model.JobLabel), ls.Get(model.InstanceLabel) 156 if job == "" || instance == "" { 157 return errNoJobInstance 158 } 159 // discover the binding target when this method is called for the first time during a transaction 160 mc, err := tr.ms.Get(job, instance) 161 if err != nil { 162 return err 163 } 164 if tr.jobsMap != nil { 165 tr.job = job 166 tr.instance = instance 167 } 168 tr.node, tr.resource = createNodeAndResource(job, instance, mc.SharedLabels().Get(model.SchemeLabel)) 169 tr.metricBuilder = newMetricBuilder(mc, tr.useStartTimeMetric, tr.startTimeMetricRegex, tr.logger, tr.stalenessStore) 170 tr.isNew = false 171 return nil 172} 173 174// Commit submits metrics data to consumers. 175func (tr *transaction) Commit() error { 176 if tr.isNew { 177 // In a situation like not able to connect to the remote server, scrapeloop will still commit even if it had 178 // never added any data points, that the transaction has not been initialized. 179 return nil 180 } 181 182 // Before building metrics, issue staleness markers for every stale metric. 183 staleLabels := tr.stalenessStore.emitStaleLabels() 184 185 for _, sEntry := range staleLabels { 186 tr.metricBuilder.AddDataPoint(sEntry.labels, sEntry.seenAtMs, stalenessSpecialValue) 187 } 188 189 tr.startTimeMs = -1 190 191 ctx := tr.obsrecv.StartMetricsOp(tr.ctx) 192 metrics, _, _, err := tr.metricBuilder.Build() 193 if err != nil { 194 // Only error by Build() is errNoDataToBuild, with numReceivedPoints set to zero. 195 tr.obsrecv.EndMetricsOp(ctx, dataformat, 0, err) 196 return err 197 } 198 199 if tr.useStartTimeMetric { 200 // startTime is mandatory in this case, but may be zero when the 201 // process_start_time_seconds metric is missing from the target endpoint. 202 if tr.metricBuilder.startTime == 0.0 { 203 // Since we are unable to adjust metrics properly, we will drop them 204 // and return an error. 205 err = errNoStartTimeMetrics 206 tr.obsrecv.EndMetricsOp(ctx, dataformat, 0, err) 207 return err 208 } 209 210 adjustStartTimestamp(tr.metricBuilder.startTime, metrics) 211 } else { 212 // AdjustMetrics - jobsMap has to be non-nil in this case. 213 // Note: metrics could be empty after adjustment, which needs to be checked before passing it on to ConsumeMetrics() 214 metrics, _ = NewMetricsAdjuster(tr.jobsMap.get(tr.job, tr.instance), tr.logger).AdjustMetrics(metrics) 215 } 216 217 numPoints := 0 218 if len(metrics) > 0 { 219 md := internaldata.OCToMetrics(tr.node, tr.resource, metrics) 220 numPoints = md.DataPointCount() 221 err = tr.sink.ConsumeMetrics(ctx, md) 222 } 223 tr.obsrecv.EndMetricsOp(ctx, dataformat, numPoints, err) 224 return err 225} 226 227func (tr *transaction) Rollback() error { 228 tr.startTimeMs = -1 229 return nil 230} 231 232func adjustStartTimestamp(startTime float64, metrics []*metricspb.Metric) { 233 startTimeTs := timestampFromFloat64(startTime) 234 for _, metric := range metrics { 235 switch metric.GetMetricDescriptor().GetType() { 236 case metricspb.MetricDescriptor_GAUGE_DOUBLE, metricspb.MetricDescriptor_GAUGE_DISTRIBUTION: 237 continue 238 default: 239 for _, ts := range metric.GetTimeseries() { 240 ts.StartTimestamp = startTimeTs 241 } 242 } 243 } 244} 245 246func timestampFromFloat64(ts float64) *timestamppb.Timestamp { 247 secs := int64(ts) 248 nanos := int64((ts - float64(secs)) * 1e9) 249 return ×tamppb.Timestamp{ 250 Seconds: secs, 251 Nanos: int32(nanos), 252 } 253} 254 255func createNodeAndResource(job, instance, scheme string) (*commonpb.Node, *resourcepb.Resource) { 256 host, port, err := net.SplitHostPort(instance) 257 if err != nil { 258 host = instance 259 } 260 node := &commonpb.Node{ 261 ServiceInfo: &commonpb.ServiceInfo{Name: job}, 262 Identifier: &commonpb.ProcessIdentifier{ 263 HostName: host, 264 }, 265 } 266 resource := &resourcepb.Resource{ 267 Labels: map[string]string{ 268 jobAttr: job, 269 instanceAttr: instance, 270 portAttr: port, 271 schemeAttr: scheme, 272 }, 273 } 274 return node, resource 275} 276