1package query
2
3import (
4	"context"
5	"errors"
6	"flag"
7	"fmt"
8	"log"
9	"os"
10	"sort"
11	"strings"
12	"text/tabwriter"
13	"time"
14
15	util_log "github.com/cortexproject/cortex/pkg/util/log"
16	"github.com/fatih/color"
17	json "github.com/json-iterator/go"
18	"github.com/prometheus/client_golang/prometheus"
19	"github.com/weaveworks/common/user"
20
21	"github.com/grafana/loki/pkg/logcli/client"
22	"github.com/grafana/loki/pkg/logcli/output"
23	"github.com/grafana/loki/pkg/loghttp"
24	"github.com/grafana/loki/pkg/logproto"
25	"github.com/grafana/loki/pkg/logql"
26	"github.com/grafana/loki/pkg/logqlmodel"
27	"github.com/grafana/loki/pkg/logqlmodel/stats"
28	"github.com/grafana/loki/pkg/loki"
29	"github.com/grafana/loki/pkg/storage"
30	chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage"
31	"github.com/grafana/loki/pkg/storage/stores/shipper"
32	"github.com/grafana/loki/pkg/util/cfg"
33	"github.com/grafana/loki/pkg/util/marshal"
34	"github.com/grafana/loki/pkg/validation"
35)
36
37type streamEntryPair struct {
38	entry  loghttp.Entry
39	labels loghttp.LabelSet
40}
41
42// Query contains all necessary fields to execute instant and range queries and print the results.
43type Query struct {
44	QueryString     string
45	Start           time.Time
46	End             time.Time
47	Limit           int
48	BatchSize       int
49	Forward         bool
50	Step            time.Duration
51	Interval        time.Duration
52	Quiet           bool
53	NoLabels        bool
54	IgnoreLabelsKey []string
55	ShowLabelsKey   []string
56	FixedLabelsLen  int
57	ColoredOutput   bool
58	LocalConfig     string
59}
60
61// DoQuery executes the query and prints out the results
62func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) {
63	if q.LocalConfig != "" {
64		if err := q.DoLocalQuery(out, statistics, c.GetOrgID()); err != nil {
65			log.Fatalf("Query failed: %+v", err)
66		}
67		return
68	}
69
70	d := q.resultsDirection()
71
72	var resp *loghttp.QueryResponse
73	var err error
74
75	if q.isInstant() {
76		resp, err = c.Query(q.QueryString, q.Limit, q.Start, d, q.Quiet)
77		if err != nil {
78			log.Fatalf("Query failed: %+v", err)
79		}
80		if statistics {
81			q.printStats(resp.Data.Statistics)
82		}
83		_, _ = q.printResult(resp.Data.Result, out, nil)
84	} else {
85		if q.Limit < q.BatchSize {
86			q.BatchSize = q.Limit
87		}
88		resultLength := 0
89		total := 0
90		start := q.Start
91		end := q.End
92		var lastEntry []*loghttp.Entry
93		for total < q.Limit {
94			bs := q.BatchSize
95			// We want to truncate the batch size if the remaining number
96			// of items needed to reach the limit is less than the batch size
97			if q.Limit-total < q.BatchSize {
98				// Truncated batchsize is q.Limit - total, however we add to this
99				// the length of the overlap from the last query to make sure we get the
100				// correct amount of new logs knowing there will be some overlapping logs returned.
101				bs = q.Limit - total + len(lastEntry)
102			}
103			resp, err = c.QueryRange(q.QueryString, bs, start, end, d, q.Step, q.Interval, q.Quiet)
104			if err != nil {
105				log.Fatalf("Query failed: %+v", err)
106			}
107
108			if statistics {
109				q.printStats(resp.Data.Statistics)
110			}
111
112			resultLength, lastEntry = q.printResult(resp.Data.Result, out, lastEntry)
113			// Was not a log stream query, or no results, no more batching
114			if resultLength <= 0 {
115				break
116			}
117			// Also no result, wouldn't expect to hit this.
118			if len(lastEntry) == 0 {
119				break
120			}
121			// Can only happen if all the results return in one request
122			if resultLength == q.Limit {
123				break
124			}
125			if len(lastEntry) >= q.BatchSize {
126				log.Fatalf("Invalid batch size %v, the next query will have %v overlapping entries "+
127					"(there will always be 1 overlapping entry but Loki allows multiple entries to have "+
128					"the same timestamp, so when a batch ends in this scenario the next query will include "+
129					"all the overlapping entries again).  Please increase your batch size to at least %v to account "+
130					"for overlapping entryes\n", q.BatchSize, len(lastEntry), len(lastEntry)+1)
131			}
132
133			// Batching works by taking the timestamp of the last query and using it in the next query,
134			// because Loki supports multiple entries with the same timestamp it's possible for a batch to have
135			// fallen in the middle of a list of entries for the same time, so to make sure we get all entries
136			// we start the query on the same time as the last entry from the last batch, and then we keep this last
137			// entry and remove the duplicate when printing the results.
138			// Because of this duplicate entry, we have to subtract it here from the total for each batch
139			// to get the desired limit.
140			total += resultLength
141			// Based on the query direction we either set the start or end for the next query.
142			// If there are multiple entries in `lastEntry` they have to have the same timestamp so we can pick just the first
143			if q.Forward {
144				start = lastEntry[0].Timestamp
145			} else {
146				// The end timestamp is exclusive on a backward query, so to make sure we get back an overlapping result
147				// fudge the timestamp forward in time to make sure to get the last entry from this batch in the next query
148				end = lastEntry[0].Timestamp.Add(1 * time.Nanosecond)
149			}
150
151		}
152	}
153}
154
155func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
156	length := -1
157	var entry []*loghttp.Entry
158	switch value.Type() {
159	case logqlmodel.ValueTypeStreams:
160		length, entry = q.printStream(value.(loghttp.Streams), out, lastEntry)
161	case loghttp.ResultTypeScalar:
162		q.printScalar(value.(loghttp.Scalar))
163	case loghttp.ResultTypeMatrix:
164		q.printMatrix(value.(loghttp.Matrix))
165	case loghttp.ResultTypeVector:
166		q.printVector(value.(loghttp.Vector))
167	default:
168		log.Fatalf("Unable to print unsupported type: %v", value.Type())
169	}
170	return length, entry
171}
172
173// DoLocalQuery executes the query against the local store using a Loki configuration file.
174func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string) error {
175	var conf loki.Config
176	conf.RegisterFlags(flag.CommandLine)
177	if q.LocalConfig == "" {
178		return errors.New("no supplied config file")
179	}
180	if err := cfg.YAML(q.LocalConfig, false)(&conf); err != nil {
181		return err
182	}
183
184	if err := conf.Validate(); err != nil {
185		return err
186	}
187
188	limits, err := validation.NewOverrides(conf.LimitsConfig, nil)
189	if err != nil {
190		return err
191	}
192	storage.RegisterCustomIndexClients(&conf.StorageConfig, prometheus.DefaultRegisterer)
193	conf.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
194	chunkStore, err := chunk_storage.NewStore(conf.StorageConfig.Config, conf.ChunkStoreConfig.StoreConfig, conf.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
195	if err != nil {
196		return err
197	}
198
199	querier, err := storage.NewStore(conf.StorageConfig, conf.SchemaConfig, chunkStore, prometheus.DefaultRegisterer)
200	if err != nil {
201		return err
202	}
203
204	eng := logql.NewEngine(conf.Querier.Engine, querier, limits)
205	var query logql.Query
206
207	if q.isInstant() {
208		query = eng.Query(logql.NewLiteralParams(
209			q.QueryString,
210			q.Start,
211			q.Start,
212			0,
213			0,
214			q.resultsDirection(),
215			uint32(q.Limit),
216			nil,
217		))
218	} else {
219		query = eng.Query(logql.NewLiteralParams(
220			q.QueryString,
221			q.Start,
222			q.End,
223			q.Step,
224			q.Interval,
225			q.resultsDirection(),
226			uint32(q.Limit),
227			nil,
228		))
229	}
230
231	// execute the query
232	ctx := user.InjectOrgID(context.Background(), orgID)
233	result, err := query.Exec(ctx)
234	if err != nil {
235		return err
236	}
237
238	if statistics {
239		q.printStats(result.Statistics)
240	}
241
242	value, err := marshal.NewResultValue(result.Data)
243	if err != nil {
244		return err
245	}
246
247	q.printResult(value, out, nil)
248	return nil
249}
250
251// SetInstant makes the Query an instant type
252func (q *Query) SetInstant(time time.Time) {
253	q.Start = time
254	q.End = time
255}
256
257func (q *Query) isInstant() bool {
258	return q.Start == q.End && q.Step == 0
259}
260
261func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
262	common := commonLabels(streams)
263
264	// Remove the labels we want to show from common
265	if len(q.ShowLabelsKey) > 0 {
266		common = matchLabels(false, common, q.ShowLabelsKey)
267	}
268
269	if len(common) > 0 && !q.Quiet {
270		log.Println("Common labels:", color.RedString(common.String()))
271	}
272
273	if len(q.IgnoreLabelsKey) > 0 && !q.Quiet {
274		log.Println("Ignoring labels key:", color.RedString(strings.Join(q.IgnoreLabelsKey, ",")))
275	}
276
277	if len(q.ShowLabelsKey) > 0 && !q.Quiet {
278		log.Println("Print only labels key:", color.RedString(strings.Join(q.ShowLabelsKey, ",")))
279	}
280
281	// Remove ignored and common labels from the cached labels and
282	// calculate the max labels length
283	maxLabelsLen := q.FixedLabelsLen
284	for i, s := range streams {
285		// Remove common labels
286		ls := subtract(s.Labels, common)
287
288		if len(q.ShowLabelsKey) > 0 {
289			ls = matchLabels(true, ls, q.ShowLabelsKey)
290		}
291
292		// Remove ignored labels
293		if len(q.IgnoreLabelsKey) > 0 {
294			ls = matchLabels(false, ls, q.IgnoreLabelsKey)
295		}
296
297		// Overwrite existing Labels
298		streams[i].Labels = ls
299
300		// Update max labels length
301		len := len(ls.String())
302		if maxLabelsLen < len {
303			maxLabelsLen = len
304		}
305	}
306
307	// sort and display entries
308	allEntries := make([]streamEntryPair, 0)
309
310	for _, s := range streams {
311		for _, e := range s.Entries {
312			allEntries = append(allEntries, streamEntryPair{
313				entry:  e,
314				labels: s.Labels,
315			})
316		}
317	}
318
319	if len(allEntries) == 0 {
320		return 0, nil
321	}
322
323	if q.Forward {
324		sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.Before(allEntries[j].entry.Timestamp) })
325	} else {
326		sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.After(allEntries[j].entry.Timestamp) })
327	}
328
329	printed := 0
330	for _, e := range allEntries {
331		// Skip the last entry if it overlaps, this happens because batching includes the last entry from the last batch
332		if len(lastEntry) > 0 && e.entry.Timestamp == lastEntry[0].Timestamp {
333			skip := false
334			// Because many logs can share a timestamp in the unlucky event a batch ends with a timestamp
335			// shared by multiple entries we have to check all that were stored to see if we've already
336			// printed them.
337			for _, le := range lastEntry {
338				if e.entry.Line == le.Line {
339					skip = true
340				}
341			}
342			if skip {
343				continue
344			}
345		}
346		out.FormatAndPrintln(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line)
347		printed++
348	}
349
350	// Loki allows multiple entries at the same timestamp, this is a bit of a mess if a batch ends
351	// with an entry that shared multiple timestamps, so we need to keep a list of all these entries
352	// because the next query is going to contain them too and we want to not duplicate anything already
353	// printed.
354	lel := []*loghttp.Entry{}
355	// Start with the timestamp of the last entry
356	le := allEntries[len(allEntries)-1].entry
357	for i, e := range allEntries {
358		// Save any entry which has this timestamp (most of the time this will only be the single last entry)
359		if e.entry.Timestamp.Equal(le.Timestamp) {
360			lel = append(lel, &allEntries[i].entry)
361		}
362	}
363
364	return printed, lel
365}
366
367func (q *Query) printMatrix(matrix loghttp.Matrix) {
368	// yes we are effectively unmarshalling and then immediately marshalling this object back to json.  we are doing this b/c
369	// it gives us more flexibility with regard to output types in the future.  initially we are supporting just formatted json but eventually
370	// we might add output options such as render to an image file on disk
371	bytes, err := json.MarshalIndent(matrix, "", "  ")
372	if err != nil {
373		log.Fatalf("Error marshalling matrix: %v", err)
374	}
375
376	fmt.Print(string(bytes))
377}
378
379func (q *Query) printVector(vector loghttp.Vector) {
380	bytes, err := json.MarshalIndent(vector, "", "  ")
381	if err != nil {
382		log.Fatalf("Error marshalling vector: %v", err)
383	}
384
385	fmt.Print(string(bytes))
386}
387
388func (q *Query) printScalar(scalar loghttp.Scalar) {
389	bytes, err := json.MarshalIndent(scalar, "", "  ")
390	if err != nil {
391		log.Fatalf("Error marshalling scalar: %v", err)
392	}
393
394	fmt.Print(string(bytes))
395}
396
397type kvLogger struct {
398	*tabwriter.Writer
399}
400
401func (k kvLogger) Log(keyvals ...interface{}) error {
402	for i := 0; i < len(keyvals); i += 2 {
403		fmt.Fprintln(k.Writer, color.BlueString("%s", keyvals[i]), "\t", fmt.Sprintf("%v", keyvals[i+1]))
404	}
405	k.Flush()
406	return nil
407}
408
409func (q *Query) printStats(stats stats.Result) {
410	writer := tabwriter.NewWriter(os.Stderr, 0, 8, 0, '\t', 0)
411	stats.Log(kvLogger{Writer: writer})
412}
413
414func (q *Query) resultsDirection() logproto.Direction {
415	if q.Forward {
416		return logproto.FORWARD
417	}
418	return logproto.BACKWARD
419}
420