1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package store 5 6import ( 7 "bufio" 8 "bytes" 9 "context" 10 "io" 11 "math" 12 "sort" 13 14 "github.com/go-kit/kit/log" 15 "github.com/go-kit/kit/log/level" 16 "github.com/gogo/protobuf/jsonpb" 17 "github.com/pkg/errors" 18 "github.com/prometheus/prometheus/pkg/labels" 19 "github.com/prometheus/prometheus/tsdb/fileutil" 20 "google.golang.org/grpc/codes" 21 "google.golang.org/grpc/status" 22 23 "github.com/thanos-io/thanos/pkg/component" 24 "github.com/thanos-io/thanos/pkg/runutil" 25 "github.com/thanos-io/thanos/pkg/store/labelpb" 26 "github.com/thanos-io/thanos/pkg/store/storepb" 27) 28 29// LocalStore implements the store API against single file with stream of proto-based SeriesResponses in JSON format. 30// Inefficient implementation for quick StoreAPI view. 31// Chunk order is exactly the same as in a given file. 32type LocalStore struct { 33 logger log.Logger 34 extLabels labels.Labels 35 36 info *storepb.InfoResponse 37 c io.Closer 38 39 // TODO(bwplotka): This is very naive in-memory DB. We can support much larger files, by 40 // indexing labels, symbolizing strings and get chunk refs only without storing protobufs in memory. 41 // For small debug purposes, this is good enough. 42 series []*storepb.Series 43 sortedChunks [][]int 44} 45 46// TODO(bwplotka): Add remote read so Prometheus users can use this. Potentially after streaming will be added 47// https://github.com/prometheus/prometheus/issues/5926. 48// TODO(bwplotka): Consider non mmaped version of this, as well different versions. 49func NewLocalStoreFromJSONMmappableFile( 50 logger log.Logger, 51 component component.StoreAPI, 52 extLabels labels.Labels, 53 path string, 54 split bufio.SplitFunc, 55) (*LocalStore, error) { 56 f, err := fileutil.OpenMmapFile(path) 57 if err != nil { 58 return nil, err 59 } 60 defer func() { 61 if err != nil { 62 runutil.CloseWithErrCapture(&err, f, "json file %s close", path) 63 } 64 }() 65 66 s := &LocalStore{ 67 logger: logger, 68 extLabels: extLabels, 69 c: f, 70 info: &storepb.InfoResponse{ 71 LabelSets: []labelpb.ZLabelSet{ 72 {Labels: labelpb.ZLabelsFromPromLabels(extLabels)}, 73 }, 74 StoreType: component.ToProto(), 75 MinTime: math.MaxInt64, 76 MaxTime: math.MinInt64, 77 }, 78 } 79 80 // Do quick pass for in-mem index. 81 content := f.Bytes() 82 contentStart := bytes.Index(content, []byte("{")) 83 if contentStart != -1 { 84 content = content[contentStart:] 85 } 86 87 if idx := bytes.LastIndex(content, []byte("}")); idx != -1 { 88 content = content[:idx+1] 89 } 90 91 skanner := NewNoCopyScanner(content, split) 92 resp := &storepb.SeriesResponse{} 93 for skanner.Scan() { 94 if err := jsonpb.Unmarshal(bytes.NewReader(skanner.Bytes()), resp); err != nil { 95 return nil, errors.Wrapf(err, "unmarshal storepb.SeriesResponse frame for file %s", path) 96 } 97 series := resp.GetSeries() 98 if series == nil { 99 level.Warn(logger).Log("msg", "not a valid series", "frame", resp.String()) 100 continue 101 } 102 chks := make([]int, 0, len(series.Chunks)) 103 // Sort chunks in separate slice by MinTime for easier lookup. Find global max and min. 104 for ci, c := range series.Chunks { 105 if s.info.MinTime > c.MinTime { 106 s.info.MinTime = c.MinTime 107 } 108 if s.info.MaxTime < c.MaxTime { 109 s.info.MaxTime = c.MaxTime 110 } 111 chks = append(chks, ci) 112 } 113 114 sort.Slice(chks, func(i, j int) bool { 115 return series.Chunks[chks[i]].MinTime < series.Chunks[chks[j]].MinTime 116 }) 117 s.series = append(s.series, series) 118 s.sortedChunks = append(s.sortedChunks, chks) 119 } 120 121 if err := skanner.Err(); err != nil { 122 return nil, errors.Wrapf(err, "scanning file %s", path) 123 } 124 level.Info(logger).Log("msg", "loading JSON file succeeded", "file", path, "info", s.info.String(), "series", len(s.series)) 125 return s, nil 126} 127 128// ScanGRPCCurlProtoStreamMessages allows to tokenize each streamed gRPC message from grpcurl tool. 129func ScanGRPCCurlProtoStreamMessages(data []byte, atEOF bool) (advance int, token []byte, err error) { 130 var delim = []byte(`} 131{`) 132 if atEOF && len(data) == 0 { 133 return 0, nil, nil 134 } 135 if idx := bytes.LastIndex(data, delim); idx != -1 { 136 return idx + 2, data[:idx+1], nil 137 } 138 // If we're at EOF, let's return all. 139 if atEOF { 140 return len(data), data, nil 141 } 142 // Incomplete; get more bytes. 143 return len(delim), nil, nil 144} 145 146// Info returns store information about the Prometheus instance. 147func (s *LocalStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.InfoResponse, error) { 148 return s.info, nil 149} 150 151// Series returns all series for a requested time range and label matcher. The returned data may 152// exceed the requested time bounds. 153func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { 154 match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels) 155 if err != nil { 156 return status.Error(codes.InvalidArgument, err.Error()) 157 } 158 if !match { 159 return nil 160 } 161 if len(matchers) == 0 { 162 return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error()) 163 } 164 165 var chosen []int 166 for si, series := range s.series { 167 lbls := labelpb.ZLabelsToPromLabels(series.Labels) 168 var noMatch bool 169 for _, m := range matchers { 170 extValue := lbls.Get(m.Name) 171 if extValue == "" { 172 continue 173 } 174 if !m.Matches(extValue) { 175 noMatch = true 176 break 177 } 178 } 179 if noMatch { 180 continue 181 } 182 183 chosen = chosen[:0] 184 resp := &storepb.Series{ 185 Labels: series.Labels, 186 Chunks: make([]storepb.AggrChunk, 0, len(s.sortedChunks[si])), 187 } 188 189 for _, ci := range s.sortedChunks[si] { 190 if series.Chunks[ci].MaxTime < r.MinTime { 191 continue 192 } 193 if series.Chunks[ci].MinTime > r.MaxTime { 194 continue 195 } 196 chosen = append(chosen, ci) 197 } 198 199 sort.Ints(chosen) 200 for _, ci := range chosen { 201 resp.Chunks = append(resp.Chunks, series.Chunks[ci]) 202 } 203 204 if err := srv.Send(storepb.NewSeriesResponse(resp)); err != nil { 205 return status.Error(codes.Aborted, err.Error()) 206 } 207 } 208 return nil 209} 210 211// LabelNames returns all known label names. 212func (s *LocalStore) LabelNames(_ context.Context, _ *storepb.LabelNamesRequest) ( 213 *storepb.LabelNamesResponse, error, 214) { 215 // TODO(bwplotka): Consider precomputing. 216 names := map[string]struct{}{} 217 for _, series := range s.series { 218 for _, l := range series.Labels { 219 names[l.Name] = struct{}{} 220 } 221 } 222 resp := &storepb.LabelNamesResponse{} 223 for n := range names { 224 resp.Names = append(resp.Names, n) 225 } 226 return resp, nil 227} 228 229// LabelValues returns all known label values for a given label name. 230func (s *LocalStore) LabelValues(_ context.Context, r *storepb.LabelValuesRequest) ( 231 *storepb.LabelValuesResponse, error, 232) { 233 vals := map[string]struct{}{} 234 for _, series := range s.series { 235 lbls := labelpb.ZLabelsToPromLabels(series.Labels) 236 val := lbls.Get(r.Label) 237 if val == "" { 238 continue 239 } 240 vals[val] = struct{}{} 241 } 242 resp := &storepb.LabelValuesResponse{} 243 for val := range vals { 244 resp.Values = append(resp.Values, val) 245 } 246 return resp, nil 247} 248 249func (s *LocalStore) Close() (err error) { 250 return s.c.Close() 251} 252 253type noCopyScanner struct { 254 b []byte 255 splitFunc bufio.SplitFunc 256 257 start, end int 258 err error 259 260 token []byte 261} 262 263// NewNoCopyScanner returns bufio.Scanner-like scanner that is meant to be used on already allocated byte slice (or mmapped) 264// one. Returned tokens are shared. 265func NewNoCopyScanner(b []byte, splitFunc bufio.SplitFunc) *noCopyScanner { 266 return &noCopyScanner{ 267 b: b, 268 splitFunc: splitFunc, 269 start: 0, 270 end: 0, 271 } 272} 273 274func (s *noCopyScanner) Scan() bool { 275 if s.start >= len(s.b) { 276 return false 277 } 278 279 advance := 1 280 for s.end+advance < len(s.b) { 281 s.end += advance 282 283 advance, s.token, s.err = s.splitFunc(s.b[s.start:s.end], false) 284 if s.err != nil { 285 return false 286 } 287 288 if len(s.token) > 0 { 289 s.start += advance 290 s.end = s.start 291 return true 292 } 293 } 294 295 _, s.token, s.err = s.splitFunc(s.b[s.start:], true) 296 if s.err != nil { 297 return false 298 } 299 s.start = len(s.b) 300 return len(s.token) > 0 301} 302 303func (s *noCopyScanner) Bytes() []byte { 304 return s.token 305} 306 307func (s *noCopyScanner) Err() error { 308 return s.err 309} 310