1//+build ignore 2 3package query 4 5import ( 6 "bufio" 7 "context" 8 "errors" 9 "flag" 10 "fmt" 11 "io" 12 "os" 13 "path/filepath" 14 "strconv" 15 "strings" 16 "time" 17 18 "github.com/gogo/protobuf/types" 19 "github.com/influxdata/influxdb/models" 20 "github.com/influxdata/influxdb/services/storage" 21 "github.com/influxdata/influxdb/storage/reads/datatypes" 22 "github.com/influxdata/influxql" 23 "go.uber.org/zap" 24 "google.golang.org/grpc" 25) 26 27// Command represents the program execution for "store query". 28type Command struct { 29 // Standard input/output, overridden for testing. 30 Stderr io.Writer 31 Stdout io.Writer 32 Logger *zap.Logger 33 34 addr string 35 cpuProfile string 36 memProfile string 37 database string 38 retentionPolicy string 39 startTime int64 40 endTime int64 41 limit int64 42 slimit int64 43 soffset int64 44 desc bool 45 silent bool 46 expr string 47 agg string 48 groupArg string 49 group datatypes.ReadRequest_Group 50 groupKeys string 51 keys []string 52 hintsArg string 53 hints datatypes.HintFlags 54 55 aggType datatypes.Aggregate_AggregateType 56 57 // response 58 integerSum int64 59 unsignedSum uint64 60 floatSum float64 61 pointCount uint64 62} 63 64// NewCommand returns a new instance of Command. 65func NewCommand() *Command { 66 return &Command{ 67 Stderr: os.Stderr, 68 Stdout: os.Stdout, 69 } 70} 71 72func parseTime(v string) (int64, error) { 73 if s, err := time.Parse(time.RFC3339, v); err == nil { 74 return s.UnixNano(), nil 75 } 76 77 if i, err := strconv.ParseInt(v, 10, 64); err == nil { 78 return i, nil 79 } 80 81 return 0, errors.New("invalid time") 82} 83 84// Run executes the command. 85func (cmd *Command) Run(args ...string) error { 86 var start, end string 87 fs := flag.NewFlagSet("query", flag.ExitOnError) 88 fs.StringVar(&cmd.cpuProfile, "cpuprofile", "", "CPU profile name") 89 fs.StringVar(&cmd.memProfile, "memprofile", "", "memory profile name") 90 fs.StringVar(&cmd.addr, "addr", ":8082", "the RPC address") 91 fs.StringVar(&cmd.database, "database", "", "the database to query") 92 fs.StringVar(&cmd.retentionPolicy, "retention", "", "Optional: the retention policy to query") 93 fs.StringVar(&start, "start", "", "Optional: the start time to query (RFC3339 format)") 94 fs.StringVar(&end, "end", "", "Optional: the end time to query (RFC3339 format)") 95 fs.Int64Var(&cmd.slimit, "slimit", 0, "Optional: limit number of series") 96 fs.Int64Var(&cmd.soffset, "soffset", 0, "Optional: start offset for series") 97 fs.Int64Var(&cmd.limit, "limit", 0, "Optional: limit number of values per series (-1 to return series only)") 98 fs.BoolVar(&cmd.desc, "desc", false, "Optional: return results in descending order") 99 fs.BoolVar(&cmd.silent, "silent", false, "silence output") 100 fs.StringVar(&cmd.expr, "expr", "", "InfluxQL conditional expression") 101 fs.StringVar(&cmd.agg, "agg", "", "aggregate functions (sum, count)") 102 fs.StringVar(&cmd.groupArg, "group", "none", "group operation (none,all,by,except,disable)") 103 fs.StringVar(&cmd.groupKeys, "group-keys", "", "comma-separated list of tags to specify series order") 104 fs.StringVar(&cmd.hintsArg, "hints", "none", "comma-separated list of read hints (none,no_points,no_series)") 105 106 fs.SetOutput(cmd.Stdout) 107 fs.Usage = func() { 108 fmt.Fprintln(cmd.Stdout, "Query via RPC") 109 fmt.Fprintf(cmd.Stdout, "Usage: %s query [flags]\n\n", filepath.Base(os.Args[0])) 110 fs.PrintDefaults() 111 } 112 113 if err := fs.Parse(args); err != nil { 114 return err 115 } 116 117 // set defaults 118 if start != "" { 119 t, err := parseTime(start) 120 if err != nil { 121 return err 122 } 123 cmd.startTime = t 124 125 } else { 126 cmd.startTime = models.MinNanoTime 127 } 128 if end != "" { 129 t, err := parseTime(end) 130 if err != nil { 131 return err 132 } 133 cmd.endTime = t 134 135 } else { 136 // set end time to max if it is not set. 137 cmd.endTime = models.MaxNanoTime 138 } 139 140 if cmd.groupKeys != "" { 141 cmd.keys = strings.Split(cmd.groupKeys, ",") 142 } 143 144 if err := cmd.validate(); err != nil { 145 return err 146 } 147 148 conn, err := grpc.Dial(cmd.addr, grpc.WithInsecure()) 149 if err != nil { 150 return err 151 } 152 defer conn.Close() 153 154 return cmd.query(datatypes.NewStorageClient(conn)) 155} 156 157func (cmd *Command) validate() error { 158 if cmd.database == "" { 159 return fmt.Errorf("must specify a database") 160 } 161 if cmd.startTime != 0 && cmd.endTime != 0 && cmd.endTime < cmd.startTime { 162 return fmt.Errorf("end time before start time") 163 } 164 165 if cmd.agg != "" { 166 agg, ok := datatypes.Aggregate_AggregateType_value[strings.ToUpper(cmd.agg)] 167 if !ok { 168 return errors.New("invalid aggregate function: " + cmd.agg) 169 } 170 cmd.aggType = datatypes.Aggregate_AggregateType(agg) 171 } 172 173 group, ok := datatypes.ReadRequest_Group_value["GROUP_"+strings.ToUpper(cmd.groupArg)] 174 if !ok { 175 return errors.New("invalid group type: " + cmd.groupArg) 176 } 177 cmd.group = datatypes.ReadRequest_Group(group) 178 179 for _, h := range strings.Split(cmd.hintsArg, ",") { 180 cmd.hints |= datatypes.HintFlags(datatypes.ReadRequest_HintFlags_value["HINT_"+strings.ToUpper(h)]) 181 } 182 183 return nil 184} 185 186func (cmd *Command) query(c datatypes.StorageClient) error { 187 src := storage.ReadSource{ 188 Database: cmd.database, 189 RetentionPolicy: cmd.retentionPolicy, 190 } 191 192 var req datatypes.ReadRequest 193 if any, err := types.MarshalAny(&src); err != nil { 194 return err 195 } else { 196 req.ReadSource = any 197 } 198 req.TimestampRange.Start = cmd.startTime 199 req.TimestampRange.End = cmd.endTime 200 req.SeriesLimit = cmd.slimit 201 req.SeriesOffset = cmd.soffset 202 req.PointsLimit = cmd.limit 203 req.Descending = cmd.desc 204 req.Group = cmd.group 205 req.GroupKeys = cmd.keys 206 req.Hints = cmd.hints 207 208 if cmd.aggType != datatypes.AggregateTypeNone { 209 req.Aggregate = &datatypes.Aggregate{Type: cmd.aggType} 210 } 211 212 if cmd.expr != "" { 213 expr, err := influxql.ParseExpr(cmd.expr) 214 if err != nil { 215 return nil 216 } 217 fmt.Fprintln(cmd.Stdout, expr) 218 var v exprToNodeVisitor 219 influxql.Walk(&v, expr) 220 if v.Err() != nil { 221 return v.Err() 222 } 223 224 req.Predicate = &datatypes.Predicate{Root: v.nodes[0]} 225 } 226 227 stream, err := c.Read(context.Background(), &req) 228 if err != nil { 229 fmt.Fprintln(cmd.Stdout, err) 230 return err 231 } 232 233 wr := bufio.NewWriter(os.Stdout) 234 235 now := time.Now() 236 defer func() { 237 dur := time.Since(now) 238 fmt.Fprintf(cmd.Stdout, "time: %v\n", dur) 239 }() 240 241 for { 242 var rep datatypes.ReadResponse 243 244 if err = stream.RecvMsg(&rep); err != nil { 245 if err == io.EOF { 246 break 247 } 248 249 return err 250 } 251 252 if cmd.silent { 253 cmd.processFramesSilent(rep.Frames) 254 } else { 255 cmd.processFrames(wr, rep.Frames) 256 } 257 } 258 259 fmt.Fprintln(cmd.Stdout) 260 fmt.Fprint(cmd.Stdout, "points(count): ", cmd.pointCount, ", sum(int64): ", cmd.integerSum, ", sum(uint64): ", cmd.unsignedSum, ", sum(float64): ", cmd.floatSum, "\n") 261 262 return nil 263} 264 265func (cmd *Command) processFramesSilent(frames []datatypes.ReadResponse_Frame) { 266 for _, frame := range frames { 267 switch f := frame.Data.(type) { 268 case *datatypes.ReadResponse_Frame_IntegerPoints: 269 for _, v := range f.IntegerPoints.Values { 270 cmd.integerSum += v 271 } 272 cmd.pointCount += uint64(len(f.IntegerPoints.Values)) 273 274 case *datatypes.ReadResponse_Frame_UnsignedPoints: 275 for _, v := range f.UnsignedPoints.Values { 276 cmd.unsignedSum += v 277 } 278 cmd.pointCount += uint64(len(f.UnsignedPoints.Values)) 279 280 case *datatypes.ReadResponse_Frame_FloatPoints: 281 for _, v := range f.FloatPoints.Values { 282 cmd.floatSum += v 283 } 284 cmd.pointCount += uint64(len(f.FloatPoints.Values)) 285 286 case *datatypes.ReadResponse_Frame_StringPoints: 287 cmd.pointCount += uint64(len(f.StringPoints.Values)) 288 289 case *datatypes.ReadResponse_Frame_BooleanPoints: 290 cmd.pointCount += uint64(len(f.BooleanPoints.Values)) 291 } 292 } 293} 294 295func printByteSlice(wr *bufio.Writer, v [][]byte) { 296 wr.WriteString("[\033[36m") 297 first := true 298 for _, t := range v { 299 if !first { 300 wr.WriteByte(',') 301 } else { 302 first = false 303 } 304 wr.Write(t) 305 } 306 wr.WriteString("\033[0m]\n") 307} 308 309func (cmd *Command) processFrames(wr *bufio.Writer, frames []datatypes.ReadResponse_Frame) { 310 var buf [1024]byte 311 var line []byte 312 313 for _, frame := range frames { 314 switch f := frame.Data.(type) { 315 case *datatypes.ReadResponse_Frame_Group: 316 g := f.Group 317 wr.WriteString("partition values") 318 printByteSlice(wr, g.PartitionKeyVals) 319 wr.WriteString("group keys") 320 printByteSlice(wr, g.TagKeys) 321 wr.Flush() 322 323 case *datatypes.ReadResponse_Frame_Series: 324 s := f.Series 325 wr.WriteString("\033[36m") 326 first := true 327 for _, t := range s.Tags { 328 if !first { 329 wr.WriteByte(',') 330 } else { 331 first = false 332 } 333 wr.Write(t.Key) 334 wr.WriteByte(':') 335 wr.Write(t.Value) 336 } 337 wr.WriteString("\033[0m\n") 338 wr.Flush() 339 340 case *datatypes.ReadResponse_Frame_IntegerPoints: 341 p := f.IntegerPoints 342 for i := 0; i < len(p.Timestamps); i++ { 343 line = buf[:0] 344 wr.Write(strconv.AppendInt(line, p.Timestamps[i], 10)) 345 wr.WriteByte(' ') 346 347 line = buf[:0] 348 wr.Write(strconv.AppendInt(line, p.Values[i], 10)) 349 wr.WriteString("\n") 350 wr.Flush() 351 352 cmd.integerSum += p.Values[i] 353 } 354 cmd.pointCount += uint64(len(f.IntegerPoints.Values)) 355 356 case *datatypes.ReadResponse_Frame_UnsignedPoints: 357 p := f.UnsignedPoints 358 for i := 0; i < len(p.Timestamps); i++ { 359 line = buf[:0] 360 wr.Write(strconv.AppendInt(line, p.Timestamps[i], 10)) 361 wr.WriteByte(' ') 362 363 line = buf[:0] 364 wr.Write(strconv.AppendUint(line, p.Values[i], 10)) 365 wr.WriteString("\n") 366 wr.Flush() 367 368 cmd.unsignedSum += p.Values[i] 369 } 370 cmd.pointCount += uint64(len(f.UnsignedPoints.Values)) 371 372 case *datatypes.ReadResponse_Frame_FloatPoints: 373 p := f.FloatPoints 374 for i := 0; i < len(p.Timestamps); i++ { 375 line = buf[:0] 376 wr.Write(strconv.AppendInt(line, p.Timestamps[i], 10)) 377 wr.WriteByte(' ') 378 379 line = buf[:0] 380 wr.Write(strconv.AppendFloat(line, p.Values[i], 'f', 10, 64)) 381 wr.WriteString("\n") 382 wr.Flush() 383 384 cmd.floatSum += p.Values[i] 385 } 386 cmd.pointCount += uint64(len(f.FloatPoints.Values)) 387 388 case *datatypes.ReadResponse_Frame_StringPoints: 389 p := f.StringPoints 390 for i := 0; i < len(p.Timestamps); i++ { 391 line = buf[:0] 392 wr.Write(strconv.AppendInt(line, p.Timestamps[i], 10)) 393 wr.WriteByte(' ') 394 395 wr.WriteString(p.Values[i]) 396 wr.WriteString("\n") 397 wr.Flush() 398 } 399 cmd.pointCount += uint64(len(f.StringPoints.Values)) 400 401 case *datatypes.ReadResponse_Frame_BooleanPoints: 402 p := f.BooleanPoints 403 for i := 0; i < len(p.Timestamps); i++ { 404 line = buf[:0] 405 wr.Write(strconv.AppendInt(line, p.Timestamps[i], 10)) 406 wr.WriteByte(' ') 407 408 if p.Values[i] { 409 wr.WriteString("true") 410 } else { 411 wr.WriteString("false") 412 } 413 wr.WriteString("\n") 414 wr.Flush() 415 } 416 cmd.pointCount += uint64(len(f.BooleanPoints.Values)) 417 } 418 } 419} 420 421type exprToNodeVisitor struct { 422 nodes []*datatypes.Node 423 err error 424} 425 426func (v *exprToNodeVisitor) Err() error { 427 return v.err 428} 429 430func (v *exprToNodeVisitor) pop() (top *datatypes.Node) { 431 if len(v.nodes) < 1 { 432 panic("exprToNodeVisitor: stack empty") 433 } 434 435 top, v.nodes = v.nodes[len(v.nodes)-1], v.nodes[:len(v.nodes)-1] 436 return 437} 438 439func (v *exprToNodeVisitor) pop2() (lhs, rhs *datatypes.Node) { 440 if len(v.nodes) < 2 { 441 panic("exprToNodeVisitor: stack empty") 442 } 443 444 rhs = v.nodes[len(v.nodes)-1] 445 lhs = v.nodes[len(v.nodes)-2] 446 v.nodes = v.nodes[:len(v.nodes)-2] 447 return 448} 449 450func mapOpToComparison(op influxql.Token) datatypes.Node_Comparison { 451 switch op { 452 case influxql.EQ: 453 return datatypes.ComparisonEqual 454 case influxql.EQREGEX: 455 return datatypes.ComparisonRegex 456 case influxql.NEQ: 457 return datatypes.ComparisonNotEqual 458 case influxql.NEQREGEX: 459 return datatypes.ComparisonNotEqual 460 case influxql.LT: 461 return datatypes.ComparisonLess 462 case influxql.LTE: 463 return datatypes.ComparisonLessEqual 464 case influxql.GT: 465 return datatypes.ComparisonGreater 466 case influxql.GTE: 467 return datatypes.ComparisonGreaterEqual 468 469 default: 470 return -1 471 } 472} 473 474func (v *exprToNodeVisitor) Visit(node influxql.Node) influxql.Visitor { 475 switch n := node.(type) { 476 case *influxql.BinaryExpr: 477 if v.err != nil { 478 return nil 479 } 480 481 influxql.Walk(v, n.LHS) 482 if v.err != nil { 483 return nil 484 } 485 486 influxql.Walk(v, n.RHS) 487 if v.err != nil { 488 return nil 489 } 490 491 if comp := mapOpToComparison(n.Op); comp != -1 { 492 lhs, rhs := v.pop2() 493 v.nodes = append(v.nodes, &datatypes.Node{ 494 NodeType: datatypes.NodeTypeComparisonExpression, 495 Value: &datatypes.Node_Comparison_{Comparison: comp}, 496 Children: []*datatypes.Node{lhs, rhs}, 497 }) 498 } else if n.Op == influxql.AND || n.Op == influxql.OR { 499 var op datatypes.Node_Logical 500 if n.Op == influxql.AND { 501 op = datatypes.LogicalAnd 502 } else { 503 op = datatypes.LogicalOr 504 } 505 506 lhs, rhs := v.pop2() 507 v.nodes = append(v.nodes, &datatypes.Node{ 508 NodeType: datatypes.NodeTypeLogicalExpression, 509 Value: &datatypes.Node_Logical_{Logical: op}, 510 Children: []*datatypes.Node{lhs, rhs}, 511 }) 512 } else { 513 v.err = fmt.Errorf("unsupported operator, %s", n.Op) 514 } 515 516 return nil 517 518 case *influxql.ParenExpr: 519 influxql.Walk(v, n.Expr) 520 if v.err != nil { 521 return nil 522 } 523 524 v.nodes = append(v.nodes, &datatypes.Node{ 525 NodeType: datatypes.NodeTypeParenExpression, 526 Children: []*datatypes.Node{v.pop()}, 527 }) 528 return nil 529 530 case *influxql.StringLiteral: 531 v.nodes = append(v.nodes, &datatypes.Node{ 532 NodeType: datatypes.NodeTypeLiteral, 533 Value: &datatypes.Node_StringValue{StringValue: n.Val}, 534 }) 535 return nil 536 537 case *influxql.NumberLiteral: 538 v.nodes = append(v.nodes, &datatypes.Node{ 539 NodeType: datatypes.NodeTypeLiteral, 540 Value: &datatypes.Node_FloatValue{FloatValue: n.Val}, 541 }) 542 return nil 543 544 case *influxql.IntegerLiteral: 545 v.nodes = append(v.nodes, &datatypes.Node{ 546 NodeType: datatypes.NodeTypeLiteral, 547 Value: &datatypes.Node_IntegerValue{IntegerValue: n.Val}, 548 }) 549 return nil 550 551 case *influxql.UnsignedLiteral: 552 v.nodes = append(v.nodes, &datatypes.Node{ 553 NodeType: datatypes.NodeTypeLiteral, 554 Value: &datatypes.Node_UnsignedValue{UnsignedValue: n.Val}, 555 }) 556 return nil 557 558 case *influxql.VarRef: 559 v.nodes = append(v.nodes, &datatypes.Node{ 560 NodeType: datatypes.NodeTypeTagRef, 561 Value: &datatypes.Node_TagRefValue{TagRefValue: n.Val}, 562 }) 563 return nil 564 565 case *influxql.RegexLiteral: 566 v.nodes = append(v.nodes, &datatypes.Node{ 567 NodeType: datatypes.NodeTypeLiteral, 568 Value: &datatypes.Node_RegexValue{RegexValue: n.Val.String()}, 569 }) 570 return nil 571 default: 572 v.err = fmt.Errorf("unsupported expression %T", n) 573 return nil 574 } 575} 576