1//+build ignore
2
3package query
4
5import (
6	"bufio"
7	"context"
8	"errors"
9	"flag"
10	"fmt"
11	"io"
12	"os"
13	"path/filepath"
14	"strconv"
15	"strings"
16	"time"
17
18	"github.com/gogo/protobuf/types"
19	"github.com/influxdata/influxdb/models"
20	"github.com/influxdata/influxdb/services/storage"
21	"github.com/influxdata/influxdb/storage/reads/datatypes"
22	"github.com/influxdata/influxql"
23	"go.uber.org/zap"
24	"google.golang.org/grpc"
25)
26
27// Command represents the program execution for "store query".
28type Command struct {
29	// Standard input/output, overridden for testing.
30	Stderr io.Writer
31	Stdout io.Writer
32	Logger *zap.Logger
33
34	addr            string
35	cpuProfile      string
36	memProfile      string
37	database        string
38	retentionPolicy string
39	startTime       int64
40	endTime         int64
41	limit           int64
42	slimit          int64
43	soffset         int64
44	desc            bool
45	silent          bool
46	expr            string
47	agg             string
48	groupArg        string
49	group           datatypes.ReadRequest_Group
50	groupKeys       string
51	keys            []string
52	hintsArg        string
53	hints           datatypes.HintFlags
54
55	aggType datatypes.Aggregate_AggregateType
56
57	// response
58	integerSum  int64
59	unsignedSum uint64
60	floatSum    float64
61	pointCount  uint64
62}
63
64// NewCommand returns a new instance of Command.
65func NewCommand() *Command {
66	return &Command{
67		Stderr: os.Stderr,
68		Stdout: os.Stdout,
69	}
70}
71
72func parseTime(v string) (int64, error) {
73	if s, err := time.Parse(time.RFC3339, v); err == nil {
74		return s.UnixNano(), nil
75	}
76
77	if i, err := strconv.ParseInt(v, 10, 64); err == nil {
78		return i, nil
79	}
80
81	return 0, errors.New("invalid time")
82}
83
84// Run executes the command.
85func (cmd *Command) Run(args ...string) error {
86	var start, end string
87	fs := flag.NewFlagSet("query", flag.ExitOnError)
88	fs.StringVar(&cmd.cpuProfile, "cpuprofile", "", "CPU profile name")
89	fs.StringVar(&cmd.memProfile, "memprofile", "", "memory profile name")
90	fs.StringVar(&cmd.addr, "addr", ":8082", "the RPC address")
91	fs.StringVar(&cmd.database, "database", "", "the database to query")
92	fs.StringVar(&cmd.retentionPolicy, "retention", "", "Optional: the retention policy to query")
93	fs.StringVar(&start, "start", "", "Optional: the start time to query (RFC3339 format)")
94	fs.StringVar(&end, "end", "", "Optional: the end time to query (RFC3339 format)")
95	fs.Int64Var(&cmd.slimit, "slimit", 0, "Optional: limit number of series")
96	fs.Int64Var(&cmd.soffset, "soffset", 0, "Optional: start offset for series")
97	fs.Int64Var(&cmd.limit, "limit", 0, "Optional: limit number of values per series (-1 to return series only)")
98	fs.BoolVar(&cmd.desc, "desc", false, "Optional: return results in descending order")
99	fs.BoolVar(&cmd.silent, "silent", false, "silence output")
100	fs.StringVar(&cmd.expr, "expr", "", "InfluxQL conditional expression")
101	fs.StringVar(&cmd.agg, "agg", "", "aggregate functions (sum, count)")
102	fs.StringVar(&cmd.groupArg, "group", "none", "group operation (none,all,by,except,disable)")
103	fs.StringVar(&cmd.groupKeys, "group-keys", "", "comma-separated list of tags to specify series order")
104	fs.StringVar(&cmd.hintsArg, "hints", "none", "comma-separated list of read hints (none,no_points,no_series)")
105
106	fs.SetOutput(cmd.Stdout)
107	fs.Usage = func() {
108		fmt.Fprintln(cmd.Stdout, "Query via RPC")
109		fmt.Fprintf(cmd.Stdout, "Usage: %s query [flags]\n\n", filepath.Base(os.Args[0]))
110		fs.PrintDefaults()
111	}
112
113	if err := fs.Parse(args); err != nil {
114		return err
115	}
116
117	// set defaults
118	if start != "" {
119		t, err := parseTime(start)
120		if err != nil {
121			return err
122		}
123		cmd.startTime = t
124
125	} else {
126		cmd.startTime = models.MinNanoTime
127	}
128	if end != "" {
129		t, err := parseTime(end)
130		if err != nil {
131			return err
132		}
133		cmd.endTime = t
134
135	} else {
136		// set end time to max if it is not set.
137		cmd.endTime = models.MaxNanoTime
138	}
139
140	if cmd.groupKeys != "" {
141		cmd.keys = strings.Split(cmd.groupKeys, ",")
142	}
143
144	if err := cmd.validate(); err != nil {
145		return err
146	}
147
148	conn, err := grpc.Dial(cmd.addr, grpc.WithInsecure())
149	if err != nil {
150		return err
151	}
152	defer conn.Close()
153
154	return cmd.query(datatypes.NewStorageClient(conn))
155}
156
157func (cmd *Command) validate() error {
158	if cmd.database == "" {
159		return fmt.Errorf("must specify a database")
160	}
161	if cmd.startTime != 0 && cmd.endTime != 0 && cmd.endTime < cmd.startTime {
162		return fmt.Errorf("end time before start time")
163	}
164
165	if cmd.agg != "" {
166		agg, ok := datatypes.Aggregate_AggregateType_value[strings.ToUpper(cmd.agg)]
167		if !ok {
168			return errors.New("invalid aggregate function: " + cmd.agg)
169		}
170		cmd.aggType = datatypes.Aggregate_AggregateType(agg)
171	}
172
173	group, ok := datatypes.ReadRequest_Group_value["GROUP_"+strings.ToUpper(cmd.groupArg)]
174	if !ok {
175		return errors.New("invalid group type: " + cmd.groupArg)
176	}
177	cmd.group = datatypes.ReadRequest_Group(group)
178
179	for _, h := range strings.Split(cmd.hintsArg, ",") {
180		cmd.hints |= datatypes.HintFlags(datatypes.ReadRequest_HintFlags_value["HINT_"+strings.ToUpper(h)])
181	}
182
183	return nil
184}
185
186func (cmd *Command) query(c datatypes.StorageClient) error {
187	src := storage.ReadSource{
188		Database:        cmd.database,
189		RetentionPolicy: cmd.retentionPolicy,
190	}
191
192	var req datatypes.ReadRequest
193	if any, err := types.MarshalAny(&src); err != nil {
194		return err
195	} else {
196		req.ReadSource = any
197	}
198	req.TimestampRange.Start = cmd.startTime
199	req.TimestampRange.End = cmd.endTime
200	req.SeriesLimit = cmd.slimit
201	req.SeriesOffset = cmd.soffset
202	req.PointsLimit = cmd.limit
203	req.Descending = cmd.desc
204	req.Group = cmd.group
205	req.GroupKeys = cmd.keys
206	req.Hints = cmd.hints
207
208	if cmd.aggType != datatypes.AggregateTypeNone {
209		req.Aggregate = &datatypes.Aggregate{Type: cmd.aggType}
210	}
211
212	if cmd.expr != "" {
213		expr, err := influxql.ParseExpr(cmd.expr)
214		if err != nil {
215			return nil
216		}
217		fmt.Fprintln(cmd.Stdout, expr)
218		var v exprToNodeVisitor
219		influxql.Walk(&v, expr)
220		if v.Err() != nil {
221			return v.Err()
222		}
223
224		req.Predicate = &datatypes.Predicate{Root: v.nodes[0]}
225	}
226
227	stream, err := c.Read(context.Background(), &req)
228	if err != nil {
229		fmt.Fprintln(cmd.Stdout, err)
230		return err
231	}
232
233	wr := bufio.NewWriter(os.Stdout)
234
235	now := time.Now()
236	defer func() {
237		dur := time.Since(now)
238		fmt.Fprintf(cmd.Stdout, "time: %v\n", dur)
239	}()
240
241	for {
242		var rep datatypes.ReadResponse
243
244		if err = stream.RecvMsg(&rep); err != nil {
245			if err == io.EOF {
246				break
247			}
248
249			return err
250		}
251
252		if cmd.silent {
253			cmd.processFramesSilent(rep.Frames)
254		} else {
255			cmd.processFrames(wr, rep.Frames)
256		}
257	}
258
259	fmt.Fprintln(cmd.Stdout)
260	fmt.Fprint(cmd.Stdout, "points(count): ", cmd.pointCount, ", sum(int64): ", cmd.integerSum, ", sum(uint64): ", cmd.unsignedSum, ", sum(float64): ", cmd.floatSum, "\n")
261
262	return nil
263}
264
265func (cmd *Command) processFramesSilent(frames []datatypes.ReadResponse_Frame) {
266	for _, frame := range frames {
267		switch f := frame.Data.(type) {
268		case *datatypes.ReadResponse_Frame_IntegerPoints:
269			for _, v := range f.IntegerPoints.Values {
270				cmd.integerSum += v
271			}
272			cmd.pointCount += uint64(len(f.IntegerPoints.Values))
273
274		case *datatypes.ReadResponse_Frame_UnsignedPoints:
275			for _, v := range f.UnsignedPoints.Values {
276				cmd.unsignedSum += v
277			}
278			cmd.pointCount += uint64(len(f.UnsignedPoints.Values))
279
280		case *datatypes.ReadResponse_Frame_FloatPoints:
281			for _, v := range f.FloatPoints.Values {
282				cmd.floatSum += v
283			}
284			cmd.pointCount += uint64(len(f.FloatPoints.Values))
285
286		case *datatypes.ReadResponse_Frame_StringPoints:
287			cmd.pointCount += uint64(len(f.StringPoints.Values))
288
289		case *datatypes.ReadResponse_Frame_BooleanPoints:
290			cmd.pointCount += uint64(len(f.BooleanPoints.Values))
291		}
292	}
293}
294
295func printByteSlice(wr *bufio.Writer, v [][]byte) {
296	wr.WriteString("[\033[36m")
297	first := true
298	for _, t := range v {
299		if !first {
300			wr.WriteByte(',')
301		} else {
302			first = false
303		}
304		wr.Write(t)
305	}
306	wr.WriteString("\033[0m]\n")
307}
308
309func (cmd *Command) processFrames(wr *bufio.Writer, frames []datatypes.ReadResponse_Frame) {
310	var buf [1024]byte
311	var line []byte
312
313	for _, frame := range frames {
314		switch f := frame.Data.(type) {
315		case *datatypes.ReadResponse_Frame_Group:
316			g := f.Group
317			wr.WriteString("partition values")
318			printByteSlice(wr, g.PartitionKeyVals)
319			wr.WriteString("group keys")
320			printByteSlice(wr, g.TagKeys)
321			wr.Flush()
322
323		case *datatypes.ReadResponse_Frame_Series:
324			s := f.Series
325			wr.WriteString("\033[36m")
326			first := true
327			for _, t := range s.Tags {
328				if !first {
329					wr.WriteByte(',')
330				} else {
331					first = false
332				}
333				wr.Write(t.Key)
334				wr.WriteByte(':')
335				wr.Write(t.Value)
336			}
337			wr.WriteString("\033[0m\n")
338			wr.Flush()
339
340		case *datatypes.ReadResponse_Frame_IntegerPoints:
341			p := f.IntegerPoints
342			for i := 0; i < len(p.Timestamps); i++ {
343				line = buf[:0]
344				wr.Write(strconv.AppendInt(line, p.Timestamps[i], 10))
345				wr.WriteByte(' ')
346
347				line = buf[:0]
348				wr.Write(strconv.AppendInt(line, p.Values[i], 10))
349				wr.WriteString("\n")
350				wr.Flush()
351
352				cmd.integerSum += p.Values[i]
353			}
354			cmd.pointCount += uint64(len(f.IntegerPoints.Values))
355
356		case *datatypes.ReadResponse_Frame_UnsignedPoints:
357			p := f.UnsignedPoints
358			for i := 0; i < len(p.Timestamps); i++ {
359				line = buf[:0]
360				wr.Write(strconv.AppendInt(line, p.Timestamps[i], 10))
361				wr.WriteByte(' ')
362
363				line = buf[:0]
364				wr.Write(strconv.AppendUint(line, p.Values[i], 10))
365				wr.WriteString("\n")
366				wr.Flush()
367
368				cmd.unsignedSum += p.Values[i]
369			}
370			cmd.pointCount += uint64(len(f.UnsignedPoints.Values))
371
372		case *datatypes.ReadResponse_Frame_FloatPoints:
373			p := f.FloatPoints
374			for i := 0; i < len(p.Timestamps); i++ {
375				line = buf[:0]
376				wr.Write(strconv.AppendInt(line, p.Timestamps[i], 10))
377				wr.WriteByte(' ')
378
379				line = buf[:0]
380				wr.Write(strconv.AppendFloat(line, p.Values[i], 'f', 10, 64))
381				wr.WriteString("\n")
382				wr.Flush()
383
384				cmd.floatSum += p.Values[i]
385			}
386			cmd.pointCount += uint64(len(f.FloatPoints.Values))
387
388		case *datatypes.ReadResponse_Frame_StringPoints:
389			p := f.StringPoints
390			for i := 0; i < len(p.Timestamps); i++ {
391				line = buf[:0]
392				wr.Write(strconv.AppendInt(line, p.Timestamps[i], 10))
393				wr.WriteByte(' ')
394
395				wr.WriteString(p.Values[i])
396				wr.WriteString("\n")
397				wr.Flush()
398			}
399			cmd.pointCount += uint64(len(f.StringPoints.Values))
400
401		case *datatypes.ReadResponse_Frame_BooleanPoints:
402			p := f.BooleanPoints
403			for i := 0; i < len(p.Timestamps); i++ {
404				line = buf[:0]
405				wr.Write(strconv.AppendInt(line, p.Timestamps[i], 10))
406				wr.WriteByte(' ')
407
408				if p.Values[i] {
409					wr.WriteString("true")
410				} else {
411					wr.WriteString("false")
412				}
413				wr.WriteString("\n")
414				wr.Flush()
415			}
416			cmd.pointCount += uint64(len(f.BooleanPoints.Values))
417		}
418	}
419}
420
421type exprToNodeVisitor struct {
422	nodes []*datatypes.Node
423	err   error
424}
425
426func (v *exprToNodeVisitor) Err() error {
427	return v.err
428}
429
430func (v *exprToNodeVisitor) pop() (top *datatypes.Node) {
431	if len(v.nodes) < 1 {
432		panic("exprToNodeVisitor: stack empty")
433	}
434
435	top, v.nodes = v.nodes[len(v.nodes)-1], v.nodes[:len(v.nodes)-1]
436	return
437}
438
439func (v *exprToNodeVisitor) pop2() (lhs, rhs *datatypes.Node) {
440	if len(v.nodes) < 2 {
441		panic("exprToNodeVisitor: stack empty")
442	}
443
444	rhs = v.nodes[len(v.nodes)-1]
445	lhs = v.nodes[len(v.nodes)-2]
446	v.nodes = v.nodes[:len(v.nodes)-2]
447	return
448}
449
450func mapOpToComparison(op influxql.Token) datatypes.Node_Comparison {
451	switch op {
452	case influxql.EQ:
453		return datatypes.ComparisonEqual
454	case influxql.EQREGEX:
455		return datatypes.ComparisonRegex
456	case influxql.NEQ:
457		return datatypes.ComparisonNotEqual
458	case influxql.NEQREGEX:
459		return datatypes.ComparisonNotEqual
460	case influxql.LT:
461		return datatypes.ComparisonLess
462	case influxql.LTE:
463		return datatypes.ComparisonLessEqual
464	case influxql.GT:
465		return datatypes.ComparisonGreater
466	case influxql.GTE:
467		return datatypes.ComparisonGreaterEqual
468
469	default:
470		return -1
471	}
472}
473
474func (v *exprToNodeVisitor) Visit(node influxql.Node) influxql.Visitor {
475	switch n := node.(type) {
476	case *influxql.BinaryExpr:
477		if v.err != nil {
478			return nil
479		}
480
481		influxql.Walk(v, n.LHS)
482		if v.err != nil {
483			return nil
484		}
485
486		influxql.Walk(v, n.RHS)
487		if v.err != nil {
488			return nil
489		}
490
491		if comp := mapOpToComparison(n.Op); comp != -1 {
492			lhs, rhs := v.pop2()
493			v.nodes = append(v.nodes, &datatypes.Node{
494				NodeType: datatypes.NodeTypeComparisonExpression,
495				Value:    &datatypes.Node_Comparison_{Comparison: comp},
496				Children: []*datatypes.Node{lhs, rhs},
497			})
498		} else if n.Op == influxql.AND || n.Op == influxql.OR {
499			var op datatypes.Node_Logical
500			if n.Op == influxql.AND {
501				op = datatypes.LogicalAnd
502			} else {
503				op = datatypes.LogicalOr
504			}
505
506			lhs, rhs := v.pop2()
507			v.nodes = append(v.nodes, &datatypes.Node{
508				NodeType: datatypes.NodeTypeLogicalExpression,
509				Value:    &datatypes.Node_Logical_{Logical: op},
510				Children: []*datatypes.Node{lhs, rhs},
511			})
512		} else {
513			v.err = fmt.Errorf("unsupported operator, %s", n.Op)
514		}
515
516		return nil
517
518	case *influxql.ParenExpr:
519		influxql.Walk(v, n.Expr)
520		if v.err != nil {
521			return nil
522		}
523
524		v.nodes = append(v.nodes, &datatypes.Node{
525			NodeType: datatypes.NodeTypeParenExpression,
526			Children: []*datatypes.Node{v.pop()},
527		})
528		return nil
529
530	case *influxql.StringLiteral:
531		v.nodes = append(v.nodes, &datatypes.Node{
532			NodeType: datatypes.NodeTypeLiteral,
533			Value:    &datatypes.Node_StringValue{StringValue: n.Val},
534		})
535		return nil
536
537	case *influxql.NumberLiteral:
538		v.nodes = append(v.nodes, &datatypes.Node{
539			NodeType: datatypes.NodeTypeLiteral,
540			Value:    &datatypes.Node_FloatValue{FloatValue: n.Val},
541		})
542		return nil
543
544	case *influxql.IntegerLiteral:
545		v.nodes = append(v.nodes, &datatypes.Node{
546			NodeType: datatypes.NodeTypeLiteral,
547			Value:    &datatypes.Node_IntegerValue{IntegerValue: n.Val},
548		})
549		return nil
550
551	case *influxql.UnsignedLiteral:
552		v.nodes = append(v.nodes, &datatypes.Node{
553			NodeType: datatypes.NodeTypeLiteral,
554			Value:    &datatypes.Node_UnsignedValue{UnsignedValue: n.Val},
555		})
556		return nil
557
558	case *influxql.VarRef:
559		v.nodes = append(v.nodes, &datatypes.Node{
560			NodeType: datatypes.NodeTypeTagRef,
561			Value:    &datatypes.Node_TagRefValue{TagRefValue: n.Val},
562		})
563		return nil
564
565	case *influxql.RegexLiteral:
566		v.nodes = append(v.nodes, &datatypes.Node{
567			NodeType: datatypes.NodeTypeLiteral,
568			Value:    &datatypes.Node_RegexValue{RegexValue: n.Val.String()},
569		})
570		return nil
571	default:
572		v.err = fmt.Errorf("unsupported expression %T", n)
573		return nil
574	}
575}
576