1// Package cli contains the logic of the influx command line client. 2package cli // import "github.com/influxdata/influxdb/cmd/influx/cli" 3 4import ( 5 "bytes" 6 "context" 7 "encoding/csv" 8 "encoding/json" 9 "errors" 10 "fmt" 11 "io" 12 "io/ioutil" 13 "net/http" 14 "net/url" 15 "os" 16 "os/signal" 17 "path/filepath" 18 "reflect" 19 "runtime" 20 "sort" 21 "strconv" 22 "strings" 23 "syscall" 24 "text/tabwriter" 25 26 "golang.org/x/crypto/ssh/terminal" 27 28 "github.com/influxdata/influxdb/client" 29 v8 "github.com/influxdata/influxdb/importer/v8" 30 "github.com/influxdata/influxdb/models" 31 "github.com/influxdata/influxql" 32 "github.com/peterh/liner" 33) 34 35// ErrBlankCommand is returned when a parsed command is empty. 36var ErrBlankCommand = errors.New("empty input") 37 38// CommandLine holds CLI configuration and state. 39type CommandLine struct { 40 Line *liner.State 41 URL url.URL 42 Host string 43 Port int 44 PathPrefix string 45 Database string 46 Type QueryLanguage 47 Ssl bool 48 RetentionPolicy string 49 ClientVersion string 50 ServerVersion string 51 Pretty bool // controls pretty print for json 52 Format string // controls the output format. Valid values are json, csv, or column 53 Execute string 54 ShowVersion bool 55 Import bool 56 Chunked bool 57 ChunkSize int 58 NodeID int 59 Quit chan struct{} 60 IgnoreSignals bool // Ignore signals normally caught by this process (used primarily for testing) 61 ForceTTY bool // Force the CLI to act as if it were connected to a TTY 62 osSignals chan os.Signal 63 historyFilePath string 64 65 Client *client.Client 66 ClientConfig client.Config // Client config options. 67 ImporterConfig v8.Config // Importer configuration options. 68} 69 70// New returns an instance of CommandLine with the specified client version. 71func New(version string) *CommandLine { 72 return &CommandLine{ 73 ClientVersion: version, 74 Quit: make(chan struct{}, 1), 75 osSignals: make(chan os.Signal, 1), 76 Chunked: true, 77 } 78} 79 80// Run executes the CLI. 81func (c *CommandLine) Run() error { 82 hasTTY := c.ForceTTY || terminal.IsTerminal(int(os.Stdin.Fd())) 83 84 var promptForPassword bool 85 // determine if they set the password flag but provided no value 86 for _, v := range os.Args { 87 v = strings.ToLower(v) 88 if (strings.HasPrefix(v, "-password") || strings.HasPrefix(v, "--password")) && c.ClientConfig.Password == "" { 89 promptForPassword = true 90 break 91 } 92 } 93 94 // Check if we will be able to prompt for the password later. 95 if promptForPassword && !hasTTY { 96 return errors.New("unable to prompt for a password with no TTY") 97 } 98 99 // Read environment variables for username/password. 100 if c.ClientConfig.Username == "" { 101 c.ClientConfig.Username = os.Getenv("INFLUX_USERNAME") 102 } 103 // If we are going to be prompted for a password, always use the entered password. 104 if promptForPassword { 105 // Open the liner (temporarily) and prompt for the password. 106 p, e := func() (string, error) { 107 l := liner.NewLiner() 108 defer l.Close() 109 return l.PasswordPrompt("password: ") 110 }() 111 if e != nil { 112 return errors.New("Unable to parse password") 113 } 114 c.ClientConfig.Password = p 115 } else if c.ClientConfig.Password == "" { 116 c.ClientConfig.Password = os.Getenv("INFLUX_PASSWORD") 117 } 118 119 addr := fmt.Sprintf("%s:%d/%s", c.Host, c.Port, c.PathPrefix) 120 url, err := client.ParseConnectionString(addr, c.Ssl) 121 if err != nil { 122 return err 123 } 124 125 c.URL = url 126 127 if err := c.Connect(""); err != nil { 128 msg := "Please check your connection settings and ensure 'influxd' is running." 129 if !c.Ssl && strings.Contains(err.Error(), "malformed HTTP response") { 130 // Attempt to connect with SSL and disable secure SSL for this test. 131 c.Ssl = true 132 unsafeSsl := c.ClientConfig.UnsafeSsl 133 c.ClientConfig.UnsafeSsl = true 134 if err := c.Connect(""); err == nil { 135 msg = "Please use the -ssl flag to connect using SSL." 136 } 137 c.Ssl = false 138 c.ClientConfig.UnsafeSsl = unsafeSsl 139 } else if c.Ssl && !c.ClientConfig.UnsafeSsl && strings.Contains(err.Error(), "certificate is valid for") { 140 // Attempt to connect with an insecure connection just to see if it works. 141 c.ClientConfig.UnsafeSsl = true 142 if err := c.Connect(""); err == nil { 143 msg = "You may use -unsafeSsl to connect anyway, but the SSL connection will not be secure." 144 } 145 c.ClientConfig.UnsafeSsl = false 146 } 147 return fmt.Errorf("Failed to connect to %s: %s\n%s", c.Client.Addr(), err.Error(), msg) 148 } 149 150 // Modify precision. 151 c.SetPrecision(c.ClientConfig.Precision) 152 153 if c.Execute != "" { 154 switch c.Type { 155 case QueryLanguageFlux: 156 return c.ExecuteFluxQuery(c.Execute) 157 default: 158 // Make the non-interactive mode send everything through the CLI's parser 159 // the same way the interactive mode works 160 lines := strings.Split(c.Execute, "\n") 161 for _, line := range lines { 162 if err := c.ParseCommand(line); err != nil { 163 return err 164 } 165 } 166 } 167 return nil 168 } 169 170 if c.Import { 171 // Copy the latest importer config and inject the latest client config 172 // into it. 173 config := c.ImporterConfig 174 config.Config = c.ClientConfig 175 config.URL = c.URL 176 177 i := v8.NewImporter(config) 178 if err := i.Import(); err != nil { 179 err = fmt.Errorf("ERROR: %s", err) 180 return err 181 } 182 return nil 183 } 184 185 if !hasTTY { 186 cmd, err := ioutil.ReadAll(os.Stdin) 187 if err != nil { 188 return err 189 } 190 191 switch c.Type { 192 case QueryLanguageFlux: 193 return c.ExecuteFluxQuery(string(cmd)) 194 default: 195 return c.ExecuteQuery(string(cmd)) 196 } 197 } 198 199 if !c.IgnoreSignals { 200 // register OS signals for graceful termination 201 signal.Notify(c.osSignals, syscall.SIGINT, syscall.SIGTERM) 202 } 203 204 if len(c.ServerVersion) == 0 { 205 fmt.Printf("WARN: Connected to %s, but found no server version.\n", c.Client.Addr()) 206 fmt.Printf("Are you sure an InfluxDB server is listening at the given address?\n") 207 } else { 208 fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.ServerVersion) 209 } 210 211 c.Version() 212 213 if c.Type == QueryLanguageFlux { 214 repl, err := getFluxREPL(c.URL, c.ClientConfig.Username, c.ClientConfig.Password) 215 if err != nil { 216 return err 217 } 218 repl.Run() 219 os.Exit(0) 220 } 221 222 c.Line = liner.NewLiner() 223 defer c.Line.Close() 224 225 c.Line.SetMultiLineMode(true) 226 227 // Only load/write history if HOME environment variable is set. 228 var historyDir string 229 if runtime.GOOS == "windows" { 230 if userDir := os.Getenv("USERPROFILE"); userDir != "" { 231 historyDir = userDir 232 } 233 } 234 235 if homeDir := os.Getenv("HOME"); homeDir != "" { 236 historyDir = homeDir 237 } 238 239 // Attempt to load the history file. 240 if historyDir != "" { 241 c.historyFilePath = filepath.Join(historyDir, ".influx_history") 242 if historyFile, err := os.Open(c.historyFilePath); err == nil { 243 c.Line.ReadHistory(historyFile) 244 historyFile.Close() 245 } 246 } 247 248 // read from prompt until exit is run 249 return c.mainLoop() 250} 251 252// mainLoop runs the main prompt loop for the CLI. 253func (c *CommandLine) mainLoop() error { 254 for { 255 select { 256 case <-c.osSignals: 257 c.exit() 258 return nil 259 case <-c.Quit: 260 c.exit() 261 return nil 262 default: 263 l, e := c.Line.Prompt("> ") 264 if e == io.EOF { 265 // Instead of die, register that someone exited the program gracefully 266 l = "exit" 267 } else if e != nil { 268 c.exit() 269 return e 270 } 271 if err := c.ParseCommand(l); err != ErrBlankCommand && !strings.HasPrefix(strings.TrimSpace(l), "auth") { 272 l = influxql.Sanitize(l) 273 c.Line.AppendHistory(l) 274 c.saveHistory() 275 } 276 } 277 } 278} 279 280// ParseCommand parses an instruction and calls the related method 281// or executes the command as a query against InfluxDB. 282func (c *CommandLine) ParseCommand(cmd string) error { 283 lcmd := strings.TrimSpace(strings.ToLower(cmd)) 284 tokens := strings.Fields(lcmd) 285 286 if len(tokens) > 0 { 287 switch tokens[0] { 288 case "exit", "quit": 289 close(c.Quit) 290 case "gopher": 291 c.gopher() 292 case "connect": 293 return c.Connect(cmd) 294 case "auth": 295 c.SetAuth(cmd) 296 case "help": 297 c.help() 298 case "history": 299 c.history() 300 case "format": 301 c.SetFormat(cmd) 302 case "precision": 303 c.SetPrecision(cmd) 304 case "consistency": 305 c.SetWriteConsistency(cmd) 306 case "settings": 307 c.Settings() 308 case "chunked": 309 c.Chunked = !c.Chunked 310 if c.Chunked { 311 fmt.Println("chunked responses enabled") 312 } else { 313 fmt.Println("chunked reponses disabled") 314 } 315 case "chunk": 316 c.SetChunkSize(cmd) 317 case "pretty": 318 c.Pretty = !c.Pretty 319 if c.Pretty { 320 fmt.Println("Pretty print enabled") 321 } else { 322 fmt.Println("Pretty print disabled") 323 } 324 case "use": 325 c.use(cmd) 326 case "node": 327 c.node(cmd) 328 case "insert": 329 return c.Insert(cmd) 330 case "clear": 331 c.clear(cmd) 332 default: 333 return c.ExecuteQuery(cmd) 334 } 335 336 return nil 337 } 338 return ErrBlankCommand 339} 340 341// Connect connects to a server. 342func (c *CommandLine) Connect(cmd string) error { 343 // normalize cmd 344 cmd = strings.ToLower(cmd) 345 346 ClientConfig := c.ClientConfig 347 348 // Remove the "connect" keyword if it exists 349 addr := strings.TrimSpace(strings.Replace(cmd, "connect", "", -1)) 350 if addr == "" { 351 ClientConfig.URL = c.URL 352 } else { 353 url, err := client.ParseConnectionString(addr, c.Ssl) 354 if err != nil { 355 return err 356 } 357 358 ClientConfig.URL = url 359 } 360 361 ClientConfig.UserAgent = "InfluxDBShell/" + c.ClientVersion 362 ClientConfig.Proxy = http.ProxyFromEnvironment 363 364 client, err := client.NewClient(ClientConfig) 365 if err != nil { 366 return fmt.Errorf("Could not create client %s", err) 367 } 368 c.Client = client 369 370 _, v, err := c.Client.Ping() 371 if err != nil { 372 return err 373 } 374 c.ServerVersion = v 375 376 // Update the command with the current connection information 377 c.URL = ClientConfig.URL 378 379 return nil 380} 381 382// SetAuth sets client authentication credentials. 383func (c *CommandLine) SetAuth(cmd string) { 384 // If they pass in the entire command, we should parse it 385 // auth <username> <password> 386 args := strings.Fields(cmd) 387 if len(args) == 3 { 388 args = args[1:] 389 } else { 390 args = []string{} 391 } 392 393 if len(args) == 2 { 394 c.ClientConfig.Username = args[0] 395 c.ClientConfig.Password = args[1] 396 } else { 397 u, e := c.Line.Prompt("username: ") 398 if e != nil { 399 fmt.Printf("Unable to process input: %s", e) 400 return 401 } 402 c.ClientConfig.Username = strings.TrimSpace(u) 403 p, e := c.Line.PasswordPrompt("password: ") 404 if e != nil { 405 fmt.Printf("Unable to process input: %s", e) 406 return 407 } 408 c.ClientConfig.Password = p 409 } 410 411 // Update the client as well 412 c.Client.SetAuth(c.ClientConfig.Username, c.ClientConfig.Password) 413} 414 415func (c *CommandLine) clear(cmd string) { 416 args := strings.Split(strings.TrimSuffix(strings.TrimSpace(cmd), ";"), " ") 417 v := strings.ToLower(strings.Join(args[1:], " ")) 418 switch v { 419 case "database", "db": 420 c.Database = "" 421 fmt.Println("database context cleared") 422 return 423 case "retention policy", "rp": 424 c.RetentionPolicy = "" 425 fmt.Println("retention policy context cleared") 426 return 427 default: 428 if len(args) > 1 { 429 fmt.Printf("invalid command %q.\n", v) 430 } 431 fmt.Println(`Possible commands for 'clear' are: 432 # Clear the database context 433 clear database 434 clear db 435 436 # Clear the retention policy context 437 clear retention policy 438 clear rp 439 `) 440 } 441} 442 443func (c *CommandLine) use(cmd string) { 444 args := strings.SplitAfterN(strings.TrimSuffix(strings.TrimSpace(cmd), ";"), " ", 2) 445 if len(args) != 2 { 446 fmt.Printf("Could not parse database name from %q.\n", cmd) 447 return 448 } 449 450 stmt := args[1] 451 db, rp, err := parseDatabaseAndRetentionPolicy([]byte(stmt)) 452 if err != nil { 453 fmt.Printf("Unable to parse database or retention policy from %s", stmt) 454 return 455 } 456 457 if !c.databaseExists(db) { 458 fmt.Println("DB does not exist!") 459 return 460 } 461 462 c.Database = db 463 fmt.Printf("Using database %s\n", db) 464 465 if rp != "" { 466 if !c.retentionPolicyExists(db, rp) { 467 return 468 } 469 c.RetentionPolicy = rp 470 fmt.Printf("Using retention policy %s\n", rp) 471 } 472} 473 474func (c *CommandLine) databaseExists(db string) bool { 475 // Validate if specified database exists 476 response, err := c.Client.Query(client.Query{Command: "SHOW DATABASES"}) 477 if err != nil { 478 fmt.Printf("ERR: %s\n", err) 479 return false 480 } else if err := response.Error(); err != nil { 481 if c.ClientConfig.Username == "" { 482 fmt.Printf("ERR: %s\n", err) 483 return false 484 } 485 // TODO(jsternberg): Fix SHOW DATABASES to be user-aware #6397. 486 // If we are unable to run SHOW DATABASES, display a warning and use the 487 // database anyway in case the person doesn't have permission to run the 488 // command, but does have permission to use the database. 489 fmt.Printf("WARN: %s\n", err) 490 } else { 491 // Verify the provided database exists 492 if databaseExists := func() bool { 493 for _, result := range response.Results { 494 for _, row := range result.Series { 495 if row.Name == "databases" { 496 for _, values := range row.Values { 497 for _, database := range values { 498 if database == db { 499 return true 500 } 501 } 502 } 503 } 504 } 505 } 506 return false 507 }(); !databaseExists { 508 fmt.Printf("ERR: Database %s doesn't exist. Run SHOW DATABASES for a list of existing databases.\n", db) 509 return false 510 } 511 } 512 return true 513} 514 515func (c *CommandLine) retentionPolicyExists(db, rp string) bool { 516 // Validate if specified database exists 517 response, err := c.Client.Query(client.Query{Command: fmt.Sprintf("SHOW RETENTION POLICIES ON %q", db)}) 518 if err != nil { 519 fmt.Printf("ERR: %s\n", err) 520 return false 521 } else if err := response.Error(); err != nil { 522 if c.ClientConfig.Username == "" { 523 fmt.Printf("ERR: %s\n", err) 524 return false 525 } 526 fmt.Printf("WARN: %s\n", err) 527 } else { 528 // Verify the provided database exists 529 if retentionPolicyExists := func() bool { 530 for _, result := range response.Results { 531 for _, row := range result.Series { 532 for _, values := range row.Values { 533 for i, v := range values { 534 if i != 0 { 535 continue 536 } 537 if v == rp { 538 return true 539 } 540 } 541 } 542 } 543 } 544 return false 545 }(); !retentionPolicyExists { 546 fmt.Printf("ERR: RETENTION POLICY %s doesn't exist. Run SHOW RETENTION POLICIES ON %q for a list of existing retention polices.\n", rp, db) 547 return false 548 } 549 } 550 return true 551} 552 553func (c *CommandLine) node(cmd string) { 554 args := strings.Split(strings.TrimSuffix(strings.TrimSpace(cmd), ";"), " ") 555 if len(args) != 2 { 556 fmt.Println("Improper number of arguments for 'node' command, requires exactly one.") 557 return 558 } 559 560 if args[1] == "clear" { 561 c.NodeID = 0 562 return 563 } 564 565 id, err := strconv.Atoi(args[1]) 566 if err != nil { 567 fmt.Printf("Unable to parse node id from %s. Must be an integer or 'clear'.\n", args[1]) 568 return 569 } 570 c.NodeID = id 571} 572 573// SetChunkSize sets the chunk size 574// 0 sets it back to the default 575func (c *CommandLine) SetChunkSize(cmd string) { 576 // normalize cmd 577 cmd = strings.ToLower(cmd) 578 cmd = strings.Join(strings.Fields(cmd), " ") 579 580 // Remove the "chunk size" keyword if it exists 581 cmd = strings.TrimPrefix(cmd, "chunk size ") 582 583 // Remove the "chunk" keyword if it exists 584 // allows them to use `chunk 50` as a shortcut 585 cmd = strings.TrimPrefix(cmd, "chunk ") 586 587 if n, err := strconv.ParseInt(cmd, 10, 64); err == nil { 588 c.ChunkSize = int(n) 589 if c.ChunkSize <= 0 { 590 c.ChunkSize = 0 591 } 592 fmt.Printf("chunk size set to %d\n", c.ChunkSize) 593 } else { 594 fmt.Printf("unable to parse chunk size from %q\n", cmd) 595 } 596} 597 598// SetPrecision sets client precision. 599func (c *CommandLine) SetPrecision(cmd string) { 600 // normalize cmd 601 cmd = strings.ToLower(cmd) 602 603 // Remove the "precision" keyword if it exists 604 cmd = strings.TrimSpace(strings.Replace(cmd, "precision", "", -1)) 605 606 switch cmd { 607 case "h", "m", "s", "ms", "u", "ns": 608 c.ClientConfig.Precision = cmd 609 c.Client.SetPrecision(c.ClientConfig.Precision) 610 case "rfc3339": 611 c.ClientConfig.Precision = "" 612 c.Client.SetPrecision(c.ClientConfig.Precision) 613 default: 614 fmt.Printf("Unknown precision %q. Please use rfc3339, h, m, s, ms, u or ns.\n", cmd) 615 } 616} 617 618// SetFormat sets output format. 619func (c *CommandLine) SetFormat(cmd string) { 620 // normalize cmd 621 cmd = strings.ToLower(cmd) 622 // Remove the "format" keyword if it exists 623 cmd = strings.TrimSpace(strings.Replace(cmd, "format", "", -1)) 624 625 switch cmd { 626 case "json", "csv", "column": 627 c.Format = cmd 628 default: 629 fmt.Printf("Unknown format %q. Please use json, csv, or column.\n", cmd) 630 } 631} 632 633// SetWriteConsistency sets write consistency level. 634func (c *CommandLine) SetWriteConsistency(cmd string) { 635 // normalize cmd 636 cmd = strings.ToLower(cmd) 637 // Remove the "consistency" keyword if it exists 638 cmd = strings.TrimSpace(strings.Replace(cmd, "consistency", "", -1)) 639 640 _, err := models.ParseConsistencyLevel(cmd) 641 if err != nil { 642 fmt.Printf("Unknown consistency level %q. Please use any, one, quorum, or all.\n", cmd) 643 return 644 } 645 c.ClientConfig.WriteConsistency = cmd 646} 647 648// isWhitespace returns true if the rune is a space, tab, or newline. 649func isWhitespace(ch rune) bool { return ch == ' ' || ch == '\t' || ch == '\n' } 650 651// isLetter returns true if the rune is a letter. 652func isLetter(ch rune) bool { return (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') } 653 654// isDigit returns true if the rune is a digit. 655func isDigit(ch rune) bool { return (ch >= '0' && ch <= '9') } 656 657// isIdentFirstChar returns true if the rune can be used as the first char in an unquoted identifer. 658func isIdentFirstChar(ch rune) bool { return isLetter(ch) || ch == '_' } 659 660// isIdentChar returns true if the rune can be used in an unquoted identifier. 661func isNotIdentChar(ch rune) bool { return !(isLetter(ch) || isDigit(ch) || ch == '_') } 662 663func parseUnquotedIdentifier(stmt string) (string, string) { 664 if fields := strings.FieldsFunc(stmt, isNotIdentChar); len(fields) > 0 { 665 return fields[0], strings.TrimPrefix(stmt, fields[0]) 666 } 667 return "", stmt 668} 669 670func parseDoubleQuotedIdentifier(stmt string) (string, string) { 671 escapeNext := false 672 fields := strings.FieldsFunc(stmt, func(ch rune) bool { 673 if ch == '\\' { 674 escapeNext = true 675 } else if ch == '"' { 676 if !escapeNext { 677 return true 678 } 679 escapeNext = false 680 } 681 return false 682 }) 683 if len(fields) > 0 { 684 return fields[0], strings.TrimPrefix(stmt, "\""+fields[0]+"\"") 685 } 686 return "", stmt 687} 688 689func parseNextIdentifier(stmt string) (ident, remainder string) { 690 if len(stmt) > 0 { 691 switch { 692 case isWhitespace(rune(stmt[0])): 693 return parseNextIdentifier(stmt[1:]) 694 case isIdentFirstChar(rune(stmt[0])): 695 return parseUnquotedIdentifier(stmt) 696 case stmt[0] == '"': 697 return parseDoubleQuotedIdentifier(stmt) 698 } 699 } 700 return "", stmt 701} 702 703func (c *CommandLine) parseInto(stmt string) *client.BatchPoints { 704 ident, stmt := parseNextIdentifier(stmt) 705 db, rp := c.Database, c.RetentionPolicy 706 if strings.HasPrefix(stmt, ".") { 707 db = ident 708 ident, stmt = parseNextIdentifier(stmt[1:]) 709 } 710 if strings.HasPrefix(stmt, " ") { 711 rp = ident 712 stmt = stmt[1:] 713 } 714 715 return &client.BatchPoints{ 716 Points: []client.Point{ 717 client.Point{Raw: stmt}, 718 }, 719 Database: db, 720 RetentionPolicy: rp, 721 Precision: c.ClientConfig.Precision, 722 WriteConsistency: c.ClientConfig.WriteConsistency, 723 } 724} 725 726func (c *CommandLine) parseInsert(stmt string) (*client.BatchPoints, error) { 727 i, point := parseNextIdentifier(stmt) 728 if !strings.EqualFold(i, "insert") { 729 return nil, fmt.Errorf("found %s, expected INSERT", i) 730 } 731 if i, r := parseNextIdentifier(point); strings.EqualFold(i, "into") { 732 bp := c.parseInto(r) 733 return bp, nil 734 } 735 return &client.BatchPoints{ 736 Points: []client.Point{ 737 client.Point{Raw: point}, 738 }, 739 Database: c.Database, 740 RetentionPolicy: c.RetentionPolicy, 741 Precision: c.ClientConfig.Precision, 742 WriteConsistency: c.ClientConfig.WriteConsistency, 743 }, nil 744} 745 746// Insert runs an INSERT statement. 747func (c *CommandLine) Insert(stmt string) error { 748 bp, err := c.parseInsert(stmt) 749 if err != nil { 750 fmt.Printf("ERR: %s\n", err) 751 return nil 752 } 753 if _, err := c.Client.Write(*bp); err != nil { 754 fmt.Printf("ERR: %s\n", err) 755 if c.Database == "" { 756 fmt.Println("Note: error may be due to not setting a database or retention policy.") 757 fmt.Println(`Please set a database with the command "use <database>" or`) 758 fmt.Println("INSERT INTO <database>.<retention-policy> <point>") 759 } 760 } 761 return nil 762} 763 764// query creates a query struct to be used with the client. 765func (c *CommandLine) query(query string) client.Query { 766 return client.Query{ 767 Command: query, 768 Database: c.Database, 769 RetentionPolicy: c.RetentionPolicy, 770 Chunked: c.Chunked, 771 ChunkSize: c.ChunkSize, 772 NodeID: c.NodeID, 773 } 774} 775 776// ExecuteQuery runs any query statement. 777func (c *CommandLine) ExecuteQuery(query string) error { 778 // If we have a retention policy, we need to rewrite the statement sources 779 if c.RetentionPolicy != "" { 780 pq, err := influxql.NewParser(strings.NewReader(query)).ParseQuery() 781 if err != nil { 782 fmt.Printf("ERR: %s\n", err) 783 return err 784 } 785 for _, stmt := range pq.Statements { 786 if selectStatement, ok := stmt.(*influxql.SelectStatement); ok { 787 influxql.WalkFunc(selectStatement.Sources, func(n influxql.Node) { 788 if t, ok := n.(*influxql.Measurement); ok { 789 if t.Database == "" && c.Database != "" { 790 t.Database = c.Database 791 } 792 if t.RetentionPolicy == "" && c.RetentionPolicy != "" { 793 t.RetentionPolicy = c.RetentionPolicy 794 } 795 } 796 }) 797 } 798 } 799 query = pq.String() 800 } 801 802 ctx := context.Background() 803 if !c.IgnoreSignals { 804 done := make(chan struct{}) 805 defer close(done) 806 807 var cancel func() 808 ctx, cancel = context.WithCancel(ctx) 809 go func() { 810 select { 811 case <-done: 812 case <-c.osSignals: 813 cancel() 814 } 815 }() 816 } 817 818 response, err := c.Client.QueryContext(ctx, c.query(query)) 819 if err != nil { 820 if err.Error() == "" { 821 err = ctx.Err() 822 if err == context.Canceled { 823 err = errors.New("aborted by user") 824 } else if err == nil { 825 err = errors.New("no data received") 826 } 827 } 828 fmt.Printf("ERR: %s\n", err) 829 return err 830 } 831 c.FormatResponse(response, os.Stdout) 832 if err := response.Error(); err != nil { 833 fmt.Printf("ERR: %s\n", response.Error()) 834 if c.Database == "" { 835 fmt.Println("Warning: It is possible this error is due to not setting a database.") 836 fmt.Println(`Please set a database with the command "use <database>".`) 837 } 838 return err 839 } 840 return nil 841} 842 843// FormatResponse formats output to the previously chosen format. 844func (c *CommandLine) FormatResponse(response *client.Response, w io.Writer) { 845 switch c.Format { 846 case "json": 847 c.writeJSON(response, w) 848 case "csv": 849 c.writeCSV(response, w) 850 case "column": 851 c.writeColumns(response, w) 852 default: 853 fmt.Fprintf(w, "Unknown output format %q.\n", c.Format) 854 } 855} 856 857func (c *CommandLine) writeJSON(response *client.Response, w io.Writer) { 858 var data []byte 859 var err error 860 if c.Pretty { 861 data, err = json.MarshalIndent(response, "", " ") 862 } else { 863 data, err = json.Marshal(response) 864 } 865 if err != nil { 866 fmt.Fprintf(w, "Unable to parse json: %s\n", err) 867 return 868 } 869 fmt.Fprintln(w, string(data)) 870} 871 872func tagsEqual(prev, current map[string]string) bool { 873 return reflect.DeepEqual(prev, current) 874} 875 876func columnsEqual(prev, current []string) bool { 877 return reflect.DeepEqual(prev, current) 878} 879 880func headersEqual(prev, current models.Row) bool { 881 if prev.Name != current.Name { 882 return false 883 } 884 return tagsEqual(prev.Tags, current.Tags) && columnsEqual(prev.Columns, current.Columns) 885} 886 887func (c *CommandLine) writeCSV(response *client.Response, w io.Writer) { 888 csvw := csv.NewWriter(w) 889 var previousHeaders models.Row 890 for _, result := range response.Results { 891 suppressHeaders := len(result.Series) > 0 && headersEqual(previousHeaders, result.Series[0]) 892 if !suppressHeaders && len(result.Series) > 0 { 893 previousHeaders = models.Row{ 894 Name: result.Series[0].Name, 895 Tags: result.Series[0].Tags, 896 Columns: result.Series[0].Columns, 897 } 898 } 899 900 // Create a tabbed writer for each result as they won't always line up 901 rows := c.formatResults(result, "\t", suppressHeaders) 902 for _, r := range rows { 903 csvw.Write(strings.Split(r, "\t")) 904 } 905 } 906 csvw.Flush() 907} 908 909func (c *CommandLine) writeColumns(response *client.Response, w io.Writer) { 910 // Create a tabbed writer for each result as they won't always line up 911 writer := new(tabwriter.Writer) 912 writer.Init(w, 0, 8, 1, ' ', 0) 913 914 var previousHeaders models.Row 915 for i, result := range response.Results { 916 // Print out all messages first 917 for _, m := range result.Messages { 918 fmt.Fprintf(w, "%s: %s.\n", m.Level, m.Text) 919 } 920 // Check to see if the headers are the same as the previous row. If so, suppress them in the output 921 suppressHeaders := len(result.Series) > 0 && headersEqual(previousHeaders, result.Series[0]) 922 if !suppressHeaders && len(result.Series) > 0 { 923 previousHeaders = models.Row{ 924 Name: result.Series[0].Name, 925 Tags: result.Series[0].Tags, 926 Columns: result.Series[0].Columns, 927 } 928 } 929 930 // If we are suppressing headers, don't output the extra line return. If we 931 // aren't suppressing headers, then we put out line returns between results 932 // (not before the first result, and not after the last result). 933 if !suppressHeaders && i > 0 { 934 fmt.Fprintln(writer, "") 935 } 936 937 rows := c.formatResults(result, "\t", suppressHeaders) 938 for _, r := range rows { 939 fmt.Fprintln(writer, r) 940 } 941 942 } 943 writer.Flush() 944} 945 946// formatResults will behave differently if you are formatting for columns or csv 947func (c *CommandLine) formatResults(result client.Result, separator string, suppressHeaders bool) []string { 948 rows := []string{} 949 // Create a tabbed writer for each result as they won't always line up 950 for i, row := range result.Series { 951 // gather tags 952 tags := []string{} 953 for k, v := range row.Tags { 954 tags = append(tags, fmt.Sprintf("%s=%s", k, v)) 955 sort.Strings(tags) 956 } 957 958 columnNames := []string{} 959 960 // Only put name/tags in a column if format is csv 961 if c.Format == "csv" { 962 if len(tags) > 0 { 963 columnNames = append([]string{"tags"}, columnNames...) 964 } 965 966 if row.Name != "" { 967 columnNames = append([]string{"name"}, columnNames...) 968 } 969 } 970 971 columnNames = append(columnNames, row.Columns...) 972 973 // Output a line separator if we have more than one set or results and format is column 974 if i > 0 && c.Format == "column" && !suppressHeaders { 975 rows = append(rows, "") 976 } 977 978 // If we are column format, we break out the name/tag to separate lines 979 if c.Format == "column" && !suppressHeaders { 980 if row.Name != "" { 981 n := fmt.Sprintf("name: %s", row.Name) 982 rows = append(rows, n) 983 } 984 if len(tags) > 0 { 985 t := fmt.Sprintf("tags: %s", (strings.Join(tags, ", "))) 986 rows = append(rows, t) 987 } 988 } 989 990 if !suppressHeaders { 991 rows = append(rows, strings.Join(columnNames, separator)) 992 } 993 994 // if format is column, write dashes under each column 995 if c.Format == "column" && !suppressHeaders { 996 lines := []string{} 997 for _, columnName := range columnNames { 998 lines = append(lines, strings.Repeat("-", len(columnName))) 999 } 1000 rows = append(rows, strings.Join(lines, separator)) 1001 } 1002 1003 for _, v := range row.Values { 1004 var values []string 1005 if c.Format == "csv" { 1006 if row.Name != "" { 1007 values = append(values, row.Name) 1008 } 1009 if len(tags) > 0 { 1010 values = append(values, strings.Join(tags, ",")) 1011 } 1012 } 1013 1014 for _, vv := range v { 1015 values = append(values, interfaceToString(vv)) 1016 } 1017 rows = append(rows, strings.Join(values, separator)) 1018 } 1019 } 1020 return rows 1021} 1022 1023func interfaceToString(v interface{}) string { 1024 switch t := v.(type) { 1025 case nil: 1026 return "" 1027 case bool: 1028 return fmt.Sprintf("%v", v) 1029 case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, uintptr: 1030 return fmt.Sprintf("%d", t) 1031 case float32, float64: 1032 return fmt.Sprintf("%v", t) 1033 default: 1034 return fmt.Sprintf("%v", t) 1035 } 1036} 1037 1038// Settings prints current settings. 1039func (c *CommandLine) Settings() { 1040 w := new(tabwriter.Writer) 1041 w.Init(os.Stdout, 0, 1, 1, ' ', 0) 1042 fmt.Fprintln(w, "Setting\tValue") 1043 fmt.Fprintln(w, "--------\t--------") 1044 fmt.Fprintf(w, "URL\t%s\n", c.URL.String()) 1045 fmt.Fprintf(w, "Username\t%s\n", c.ClientConfig.Username) 1046 fmt.Fprintf(w, "Database\t%s\n", c.Database) 1047 fmt.Fprintf(w, "RetentionPolicy\t%s\n", c.RetentionPolicy) 1048 fmt.Fprintf(w, "Pretty\t%v\n", c.Pretty) 1049 fmt.Fprintf(w, "Format\t%s\n", c.Format) 1050 fmt.Fprintf(w, "Write Consistency\t%s\n", c.ClientConfig.WriteConsistency) 1051 fmt.Fprintf(w, "Chunked\t%v\n", c.Chunked) 1052 fmt.Fprintf(w, "Chunk Size\t%d\n", c.ChunkSize) 1053 fmt.Fprintln(w) 1054 w.Flush() 1055} 1056 1057func (c *CommandLine) help() { 1058 fmt.Println(`Usage: 1059 connect <host:port> connects to another node specified by host:port 1060 auth prompts for username and password 1061 pretty toggles pretty print for the json format 1062 chunked turns on chunked responses from server 1063 chunk size <size> sets the size of the chunked responses. Set to 0 to reset to the default chunked size 1064 use <db_name> sets current database 1065 format <format> specifies the format of the server responses: json, csv, or column 1066 precision <format> specifies the format of the timestamp: rfc3339, h, m, s, ms, u or ns 1067 consistency <level> sets write consistency level: any, one, quorum, or all 1068 history displays command history 1069 settings outputs the current settings for the shell 1070 clear clears settings such as database or retention policy. run 'clear' for help 1071 exit/quit/ctrl+d quits the influx shell 1072 1073 show databases show database names 1074 show series show series information 1075 show measurements show measurement information 1076 show tag keys show tag key information 1077 show field keys show field key information 1078 1079 A full list of influxql commands can be found at: 1080 https://docs.influxdata.com/influxdb/latest/query_language/spec/`) 1081} 1082 1083func (c *CommandLine) history() { 1084 var buf bytes.Buffer 1085 c.Line.WriteHistory(&buf) 1086 fmt.Print(buf.String()) 1087} 1088 1089func (c *CommandLine) saveHistory() { 1090 if c.historyFilePath == "" { 1091 return 1092 } 1093 if historyFile, err := os.Create(c.historyFilePath); err != nil { 1094 fmt.Printf("There was an error writing history file: %s\n", err) 1095 } else { 1096 c.Line.WriteHistory(historyFile) 1097 historyFile.Close() 1098 } 1099} 1100 1101func (c *CommandLine) gopher() { 1102 fmt.Println(` 1103 .-::-::://:-::- .:/++/' 1104 '://:-''/oo+//++o+/.://o- ./+: 1105 .:-. '++- .o/ '+yydhy' o- 1106 .:/. .h: :osoys .smMN- :/ 1107 -/:.' s- /MMMymh. '/y/ s' 1108 -+s:'''' d -mMMms// '-/o: 1109 -/++/++/////:. o: '... s- :s. 1110 :+-+s-' ':/' 's- /+ 'o: 1111 '+-'o: /ydhsh. '//. '-o- o- 1112 .y. o: .MMMdm+y ':+++:::/+:.' s: 1113 .-h/ y- 'sdmds'h -+ydds:::-.' 'h. 1114 .//-.d' o: '.' 'dsNMMMNh:.:++' :y 1115 +y. 'd 's. .s:mddds: ++ o/ 1116 'N- odd 'o/. './o-s-' .---+++' o- 1117 'N' yNd .://:/:::::. -s -+/s/./s' 'o/' 1118 so' .h '''' ////s: '+. .s +y' 1119 os/-.y' 's' 'y::+ +d' 1120 '.:o/ -+:-:.' so.---.' 1121 o' 'd-.''/s' 1122 .s' :y.''.y 1123 -s mo:::' 1124 :: yh 1125 // '''' /M' 1126 o+ .s///:/. 'N: 1127 :+ /: -s' ho 1128 's- -/s/:+/.+h' +h 1129 ys' ':' '-. -d 1130 oh .h 1131 /o .s 1132 s. .h 1133 -y .d 1134 m/ -h 1135 +d /o 1136 'N- y: 1137 h: m. 1138 s- -d 1139 o- s+ 1140 +- 'm' 1141 s/ oo--. 1142 y- /s ':+' 1143 s' 'od--' .d: 1144 -+ ':o: ':+-/+ 1145 y- .:+- ' 1146 //o- '.:+/. 1147 .-:+/' ''-/+/. 1148 ./:' ''.:o+/-' 1149 .+o:/:/+-' ''.-+ooo/-' 1150 o: -h///++////-. 1151 /: .o/ 1152 //+ 'y 1153 ./sooy.`) 1154} 1155 1156// Version prints the CLI version. 1157func (c *CommandLine) Version() { 1158 fmt.Println("InfluxDB shell version:", c.ClientVersion) 1159} 1160 1161func (c *CommandLine) exit() { 1162 // write to history file 1163 c.saveHistory() 1164 // release line resources 1165 c.Line.Close() 1166 c.Line = nil 1167} 1168 1169func (c *CommandLine) ExecuteFluxQuery(query string) error { 1170 ctx := context.Background() 1171 if !c.IgnoreSignals { 1172 done := make(chan struct{}) 1173 defer close(done) 1174 1175 var cancel func() 1176 ctx, cancel = context.WithCancel(ctx) 1177 go func() { 1178 select { 1179 case <-done: 1180 case <-c.osSignals: 1181 cancel() 1182 } 1183 }() 1184 } 1185 1186 repl, err := getFluxREPL(c.URL, c.ClientConfig.Username, c.ClientConfig.Password) 1187 if err != nil { 1188 return err 1189 } 1190 1191 return repl.Input(query) 1192} 1193 1194type QueryLanguage uint8 1195 1196const ( 1197 QueryLanguageInfluxQL QueryLanguage = iota 1198 QueryLanguageFlux 1199) 1200 1201func (l *QueryLanguage) Set(s string) error { 1202 switch s { 1203 case "influxql": 1204 *l = QueryLanguageInfluxQL 1205 case "flux": 1206 *l = QueryLanguageFlux 1207 default: 1208 return fmt.Errorf("%q not supported: specify influxql or flux", s) 1209 } 1210 return nil 1211} 1212 1213func (l *QueryLanguage) String() string { 1214 switch *l { 1215 case QueryLanguageInfluxQL: 1216 return "influxql" 1217 case QueryLanguageFlux: 1218 return "flux" 1219 } 1220 return fmt.Sprintf("QueryLanguage(%d)", uint8(*l)) 1221} 1222