1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package store
5
6import (
7	"bufio"
8	"bytes"
9	"context"
10	"io"
11	"math"
12	"sort"
13
14	"github.com/go-kit/kit/log"
15	"github.com/go-kit/kit/log/level"
16	"github.com/gogo/protobuf/jsonpb"
17	"github.com/pkg/errors"
18	"github.com/prometheus/prometheus/pkg/labels"
19	"github.com/prometheus/prometheus/tsdb/fileutil"
20	"google.golang.org/grpc/codes"
21	"google.golang.org/grpc/status"
22
23	"github.com/thanos-io/thanos/pkg/component"
24	"github.com/thanos-io/thanos/pkg/runutil"
25	"github.com/thanos-io/thanos/pkg/store/labelpb"
26	"github.com/thanos-io/thanos/pkg/store/storepb"
27)
28
29// LocalStore implements the store API against single file with stream of proto-based SeriesResponses in JSON format.
30// Inefficient implementation for quick StoreAPI view.
31// Chunk order is exactly the same as in a given file.
32type LocalStore struct {
33	logger    log.Logger
34	extLabels labels.Labels
35
36	info *storepb.InfoResponse
37	c    io.Closer
38
39	// TODO(bwplotka): This is very naive in-memory DB. We can support much larger files, by
40	// indexing labels, symbolizing strings and get chunk refs only without storing protobufs in memory.
41	// For small debug purposes, this is good enough.
42	series       []*storepb.Series
43	sortedChunks [][]int
44}
45
46// TODO(bwplotka): Add remote read so Prometheus users can use this. Potentially after streaming will be added
47// https://github.com/prometheus/prometheus/issues/5926.
48// TODO(bwplotka): Consider non mmaped version of this, as well different versions.
49func NewLocalStoreFromJSONMmappableFile(
50	logger log.Logger,
51	component component.StoreAPI,
52	extLabels labels.Labels,
53	path string,
54	split bufio.SplitFunc,
55) (*LocalStore, error) {
56	f, err := fileutil.OpenMmapFile(path)
57	if err != nil {
58		return nil, err
59	}
60	defer func() {
61		if err != nil {
62			runutil.CloseWithErrCapture(&err, f, "json file %s close", path)
63		}
64	}()
65
66	s := &LocalStore{
67		logger:    logger,
68		extLabels: extLabels,
69		c:         f,
70		info: &storepb.InfoResponse{
71			LabelSets: []labelpb.ZLabelSet{
72				{Labels: labelpb.ZLabelsFromPromLabels(extLabels)},
73			},
74			StoreType: component.ToProto(),
75			MinTime:   math.MaxInt64,
76			MaxTime:   math.MinInt64,
77		},
78	}
79
80	// Do quick pass for in-mem index.
81	content := f.Bytes()
82	contentStart := bytes.Index(content, []byte("{"))
83	if contentStart != -1 {
84		content = content[contentStart:]
85	}
86
87	if idx := bytes.LastIndex(content, []byte("}")); idx != -1 {
88		content = content[:idx+1]
89	}
90
91	skanner := NewNoCopyScanner(content, split)
92	resp := &storepb.SeriesResponse{}
93	for skanner.Scan() {
94		if err := jsonpb.Unmarshal(bytes.NewReader(skanner.Bytes()), resp); err != nil {
95			return nil, errors.Wrapf(err, "unmarshal storepb.SeriesResponse frame for file %s", path)
96		}
97		series := resp.GetSeries()
98		if series == nil {
99			level.Warn(logger).Log("msg", "not a valid series", "frame", resp.String())
100			continue
101		}
102		chks := make([]int, 0, len(series.Chunks))
103		// Sort chunks in separate slice by MinTime for easier lookup. Find global max and min.
104		for ci, c := range series.Chunks {
105			if s.info.MinTime > c.MinTime {
106				s.info.MinTime = c.MinTime
107			}
108			if s.info.MaxTime < c.MaxTime {
109				s.info.MaxTime = c.MaxTime
110			}
111			chks = append(chks, ci)
112		}
113
114		sort.Slice(chks, func(i, j int) bool {
115			return series.Chunks[chks[i]].MinTime < series.Chunks[chks[j]].MinTime
116		})
117		s.series = append(s.series, series)
118		s.sortedChunks = append(s.sortedChunks, chks)
119	}
120
121	if err := skanner.Err(); err != nil {
122		return nil, errors.Wrapf(err, "scanning file %s", path)
123	}
124	level.Info(logger).Log("msg", "loading JSON file succeeded", "file", path, "info", s.info.String(), "series", len(s.series))
125	return s, nil
126}
127
128// ScanGRPCCurlProtoStreamMessages allows to tokenize each streamed gRPC message from grpcurl tool.
129func ScanGRPCCurlProtoStreamMessages(data []byte, atEOF bool) (advance int, token []byte, err error) {
130	var delim = []byte(`}
131{`)
132	if atEOF && len(data) == 0 {
133		return 0, nil, nil
134	}
135	if idx := bytes.LastIndex(data, delim); idx != -1 {
136		return idx + 2, data[:idx+1], nil
137	}
138	// If we're at EOF, let's return all.
139	if atEOF {
140		return len(data), data, nil
141	}
142	// Incomplete; get more bytes.
143	return len(delim), nil, nil
144}
145
146// Info returns store information about the Prometheus instance.
147func (s *LocalStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.InfoResponse, error) {
148	return s.info, nil
149}
150
151// Series returns all series for a requested time range and label matcher. The returned data may
152// exceed the requested time bounds.
153func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
154	match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels)
155	if err != nil {
156		return status.Error(codes.InvalidArgument, err.Error())
157	}
158	if !match {
159		return nil
160	}
161	if len(matchers) == 0 {
162		return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error())
163	}
164
165	var chosen []int
166	for si, series := range s.series {
167		lbls := labelpb.ZLabelsToPromLabels(series.Labels)
168		var noMatch bool
169		for _, m := range matchers {
170			extValue := lbls.Get(m.Name)
171			if extValue == "" {
172				continue
173			}
174			if !m.Matches(extValue) {
175				noMatch = true
176				break
177			}
178		}
179		if noMatch {
180			continue
181		}
182
183		chosen = chosen[:0]
184		resp := &storepb.Series{
185			Labels: series.Labels,
186			Chunks: make([]storepb.AggrChunk, 0, len(s.sortedChunks[si])),
187		}
188
189		for _, ci := range s.sortedChunks[si] {
190			if series.Chunks[ci].MaxTime < r.MinTime {
191				continue
192			}
193			if series.Chunks[ci].MinTime > r.MaxTime {
194				continue
195			}
196			chosen = append(chosen, ci)
197		}
198
199		sort.Ints(chosen)
200		for _, ci := range chosen {
201			resp.Chunks = append(resp.Chunks, series.Chunks[ci])
202		}
203
204		if err := srv.Send(storepb.NewSeriesResponse(resp)); err != nil {
205			return status.Error(codes.Aborted, err.Error())
206		}
207	}
208	return nil
209}
210
211// LabelNames returns all known label names.
212func (s *LocalStore) LabelNames(_ context.Context, _ *storepb.LabelNamesRequest) (
213	*storepb.LabelNamesResponse, error,
214) {
215	// TODO(bwplotka): Consider precomputing.
216	names := map[string]struct{}{}
217	for _, series := range s.series {
218		for _, l := range series.Labels {
219			names[l.Name] = struct{}{}
220		}
221	}
222	resp := &storepb.LabelNamesResponse{}
223	for n := range names {
224		resp.Names = append(resp.Names, n)
225	}
226	return resp, nil
227}
228
229// LabelValues returns all known label values for a given label name.
230func (s *LocalStore) LabelValues(_ context.Context, r *storepb.LabelValuesRequest) (
231	*storepb.LabelValuesResponse, error,
232) {
233	vals := map[string]struct{}{}
234	for _, series := range s.series {
235		lbls := labelpb.ZLabelsToPromLabels(series.Labels)
236		val := lbls.Get(r.Label)
237		if val == "" {
238			continue
239		}
240		vals[val] = struct{}{}
241	}
242	resp := &storepb.LabelValuesResponse{}
243	for val := range vals {
244		resp.Values = append(resp.Values, val)
245	}
246	return resp, nil
247}
248
249func (s *LocalStore) Close() (err error) {
250	return s.c.Close()
251}
252
253type noCopyScanner struct {
254	b         []byte
255	splitFunc bufio.SplitFunc
256
257	start, end int
258	err        error
259
260	token []byte
261}
262
263// NewNoCopyScanner returns bufio.Scanner-like scanner that is meant to be used on already allocated byte slice (or mmapped)
264// one. Returned tokens are shared.
265func NewNoCopyScanner(b []byte, splitFunc bufio.SplitFunc) *noCopyScanner {
266	return &noCopyScanner{
267		b:         b,
268		splitFunc: splitFunc,
269		start:     0,
270		end:       0,
271	}
272}
273
274func (s *noCopyScanner) Scan() bool {
275	if s.start >= len(s.b) {
276		return false
277	}
278
279	advance := 1
280	for s.end+advance < len(s.b) {
281		s.end += advance
282
283		advance, s.token, s.err = s.splitFunc(s.b[s.start:s.end], false)
284		if s.err != nil {
285			return false
286		}
287
288		if len(s.token) > 0 {
289			s.start += advance
290			s.end = s.start
291			return true
292		}
293	}
294
295	_, s.token, s.err = s.splitFunc(s.b[s.start:], true)
296	if s.err != nil {
297		return false
298	}
299	s.start = len(s.b)
300	return len(s.token) > 0
301}
302
303func (s *noCopyScanner) Bytes() []byte {
304	return s.token
305}
306
307func (s *noCopyScanner) Err() error {
308	return s.err
309}
310