1// Copyright 2015 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package web 15 16import ( 17 "fmt" 18 "net/http" 19 "sort" 20 21 "github.com/go-kit/kit/log/level" 22 "github.com/gogo/protobuf/proto" 23 "github.com/prometheus/client_golang/prometheus" 24 dto "github.com/prometheus/client_model/go" 25 "github.com/prometheus/common/expfmt" 26 "github.com/prometheus/common/model" 27 28 "github.com/prometheus/prometheus/pkg/labels" 29 "github.com/prometheus/prometheus/pkg/timestamp" 30 "github.com/prometheus/prometheus/pkg/value" 31 "github.com/prometheus/prometheus/promql" 32 "github.com/prometheus/prometheus/storage" 33) 34 35var ( 36 federationErrors = prometheus.NewCounter(prometheus.CounterOpts{ 37 Name: "prometheus_web_federation_errors_total", 38 Help: "Total number of errors that occurred while sending federation responses.", 39 }) 40 federationWarnings = prometheus.NewCounter(prometheus.CounterOpts{ 41 Name: "prometheus_web_federation_warnings_total", 42 Help: "Total number of warnings that occurred while sending federation responses.", 43 }) 44) 45 46func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { 47 h.mtx.RLock() 48 defer h.mtx.RUnlock() 49 50 if err := req.ParseForm(); err != nil { 51 http.Error(w, fmt.Sprintf("error parsing form values: %v", err), http.StatusBadRequest) 52 return 53 } 54 55 var matcherSets [][]*labels.Matcher 56 for _, s := range req.Form["match[]"] { 57 matchers, err := promql.ParseMetricSelector(s) 58 if err != nil { 59 http.Error(w, err.Error(), http.StatusBadRequest) 60 return 61 } 62 matcherSets = append(matcherSets, matchers) 63 } 64 65 var ( 66 mint = timestamp.FromTime(h.now().Time().Add(-promql.LookbackDelta)) 67 maxt = timestamp.FromTime(h.now().Time()) 68 format = expfmt.Negotiate(req.Header) 69 enc = expfmt.NewEncoder(w, format) 70 ) 71 w.Header().Set("Content-Type", string(format)) 72 73 q, err := h.storage.Querier(req.Context(), mint, maxt) 74 if err != nil { 75 federationErrors.Inc() 76 http.Error(w, err.Error(), http.StatusInternalServerError) 77 return 78 } 79 defer q.Close() 80 81 vec := make(promql.Vector, 0, 8000) 82 83 params := &storage.SelectParams{ 84 Start: mint, 85 End: maxt, 86 } 87 88 var sets []storage.SeriesSet 89 for _, mset := range matcherSets { 90 s, wrns, err := q.Select(params, mset...) 91 if wrns != nil { 92 level.Debug(h.logger).Log("msg", "federation select returned warnings", "warnings", wrns) 93 federationWarnings.Add(float64(len(wrns))) 94 } 95 if err != nil { 96 federationErrors.Inc() 97 http.Error(w, err.Error(), http.StatusInternalServerError) 98 return 99 } 100 sets = append(sets, s) 101 } 102 103 set := storage.NewMergeSeriesSet(sets, nil) 104 it := storage.NewBuffer(int64(promql.LookbackDelta / 1e6)) 105 for set.Next() { 106 s := set.At() 107 108 // TODO(fabxc): allow fast path for most recent sample either 109 // in the storage itself or caching layer in Prometheus. 110 it.Reset(s.Iterator()) 111 112 var t int64 113 var v float64 114 115 ok := it.Seek(maxt) 116 if ok { 117 t, v = it.Values() 118 } else { 119 t, v, ok = it.PeekBack(1) 120 if !ok { 121 continue 122 } 123 } 124 // The exposition formats do not support stale markers, so drop them. This 125 // is good enough for staleness handling of federated data, as the 126 // interval-based limits on staleness will do the right thing for supported 127 // use cases (which is to say federating aggregated time series). 128 if value.IsStaleNaN(v) { 129 continue 130 } 131 132 vec = append(vec, promql.Sample{ 133 Metric: s.Labels(), 134 Point: promql.Point{T: t, V: v}, 135 }) 136 } 137 if set.Err() != nil { 138 federationErrors.Inc() 139 http.Error(w, set.Err().Error(), http.StatusInternalServerError) 140 return 141 } 142 143 sort.Sort(byName(vec)) 144 145 externalLabels := h.config.GlobalConfig.ExternalLabels.Map() 146 if _, ok := externalLabels[model.InstanceLabel]; !ok { 147 externalLabels[model.InstanceLabel] = "" 148 } 149 externalLabelNames := make([]string, 0, len(externalLabels)) 150 for ln := range externalLabels { 151 externalLabelNames = append(externalLabelNames, ln) 152 } 153 sort.Strings(externalLabelNames) 154 155 var ( 156 lastMetricName string 157 protMetricFam *dto.MetricFamily 158 ) 159 for _, s := range vec { 160 nameSeen := false 161 globalUsed := map[string]struct{}{} 162 protMetric := &dto.Metric{ 163 Untyped: &dto.Untyped{}, 164 } 165 166 for _, l := range s.Metric { 167 if l.Value == "" { 168 // No value means unset. Never consider those labels. 169 // This is also important to protect against nameless metrics. 170 continue 171 } 172 if l.Name == labels.MetricName { 173 nameSeen = true 174 if l.Value == lastMetricName { 175 // We already have the name in the current MetricFamily, 176 // and we ignore nameless metrics. 177 continue 178 } 179 // Need to start a new MetricFamily. Ship off the old one (if any) before 180 // creating the new one. 181 if protMetricFam != nil { 182 if err := enc.Encode(protMetricFam); err != nil { 183 federationErrors.Inc() 184 level.Error(h.logger).Log("msg", "federation failed", "err", err) 185 return 186 } 187 } 188 protMetricFam = &dto.MetricFamily{ 189 Type: dto.MetricType_UNTYPED.Enum(), 190 Name: proto.String(l.Value), 191 } 192 lastMetricName = l.Value 193 continue 194 } 195 protMetric.Label = append(protMetric.Label, &dto.LabelPair{ 196 Name: proto.String(l.Name), 197 Value: proto.String(l.Value), 198 }) 199 if _, ok := externalLabels[l.Name]; ok { 200 globalUsed[l.Name] = struct{}{} 201 } 202 } 203 if !nameSeen { 204 level.Warn(h.logger).Log("msg", "Ignoring nameless metric during federation", "metric", s.Metric) 205 continue 206 } 207 // Attach global labels if they do not exist yet. 208 for _, ln := range externalLabelNames { 209 lv := externalLabels[ln] 210 if _, ok := globalUsed[string(ln)]; !ok { 211 protMetric.Label = append(protMetric.Label, &dto.LabelPair{ 212 Name: proto.String(string(ln)), 213 Value: proto.String(string(lv)), 214 }) 215 } 216 } 217 218 protMetric.TimestampMs = proto.Int64(s.T) 219 protMetric.Untyped.Value = proto.Float64(s.V) 220 221 protMetricFam.Metric = append(protMetricFam.Metric, protMetric) 222 } 223 // Still have to ship off the last MetricFamily, if any. 224 if protMetricFam != nil { 225 if err := enc.Encode(protMetricFam); err != nil { 226 federationErrors.Inc() 227 level.Error(h.logger).Log("msg", "federation failed", "err", err) 228 } 229 } 230} 231 232// byName makes a model.Vector sortable by metric name. 233type byName promql.Vector 234 235func (vec byName) Len() int { return len(vec) } 236func (vec byName) Swap(i, j int) { vec[i], vec[j] = vec[j], vec[i] } 237 238func (vec byName) Less(i, j int) bool { 239 ni := vec[i].Metric.Get(labels.MetricName) 240 nj := vec[j].Metric.Get(labels.MetricName) 241 return ni < nj 242} 243