1// Package cli contains the logic of the influx command line client.
2package cli // import "github.com/influxdata/influxdb/cmd/influx/cli"
3
4import (
5	"bytes"
6	"context"
7	"encoding/csv"
8	"encoding/json"
9	"errors"
10	"fmt"
11	"io"
12	"io/ioutil"
13	"net/http"
14	"net/url"
15	"os"
16	"os/signal"
17	"path/filepath"
18	"reflect"
19	"runtime"
20	"sort"
21	"strconv"
22	"strings"
23	"syscall"
24	"text/tabwriter"
25
26	"golang.org/x/crypto/ssh/terminal"
27
28	"github.com/influxdata/influxdb/client"
29	v8 "github.com/influxdata/influxdb/importer/v8"
30	"github.com/influxdata/influxdb/models"
31	"github.com/influxdata/influxql"
32	"github.com/peterh/liner"
33)
34
35// ErrBlankCommand is returned when a parsed command is empty.
36var ErrBlankCommand = errors.New("empty input")
37
38// CommandLine holds CLI configuration and state.
39type CommandLine struct {
40	Line            *liner.State
41	URL             url.URL
42	Host            string
43	Port            int
44	PathPrefix      string
45	Database        string
46	Type            QueryLanguage
47	Ssl             bool
48	RetentionPolicy string
49	ClientVersion   string
50	ServerVersion   string
51	Pretty          bool   // controls pretty print for json
52	Format          string // controls the output format.  Valid values are json, csv, or column
53	Execute         string
54	ShowVersion     bool
55	Import          bool
56	Chunked         bool
57	ChunkSize       int
58	NodeID          int
59	Quit            chan struct{}
60	IgnoreSignals   bool // Ignore signals normally caught by this process (used primarily for testing)
61	ForceTTY        bool // Force the CLI to act as if it were connected to a TTY
62	osSignals       chan os.Signal
63	historyFilePath string
64
65	Client         *client.Client
66	ClientConfig   client.Config // Client config options.
67	ImporterConfig v8.Config     // Importer configuration options.
68}
69
70// New returns an instance of CommandLine with the specified client version.
71func New(version string) *CommandLine {
72	return &CommandLine{
73		ClientVersion: version,
74		Quit:          make(chan struct{}, 1),
75		osSignals:     make(chan os.Signal, 1),
76		Chunked:       true,
77	}
78}
79
80// Run executes the CLI.
81func (c *CommandLine) Run() error {
82	hasTTY := c.ForceTTY || terminal.IsTerminal(int(os.Stdin.Fd()))
83
84	var promptForPassword bool
85	// determine if they set the password flag but provided no value
86	for _, v := range os.Args {
87		v = strings.ToLower(v)
88		if (strings.HasPrefix(v, "-password") || strings.HasPrefix(v, "--password")) && c.ClientConfig.Password == "" {
89			promptForPassword = true
90			break
91		}
92	}
93
94	// Check if we will be able to prompt for the password later.
95	if promptForPassword && !hasTTY {
96		return errors.New("unable to prompt for a password with no TTY")
97	}
98
99	// Read environment variables for username/password.
100	if c.ClientConfig.Username == "" {
101		c.ClientConfig.Username = os.Getenv("INFLUX_USERNAME")
102	}
103	// If we are going to be prompted for a password, always use the entered password.
104	if promptForPassword {
105		// Open the liner (temporarily) and prompt for the password.
106		p, e := func() (string, error) {
107			l := liner.NewLiner()
108			defer l.Close()
109			return l.PasswordPrompt("password: ")
110		}()
111		if e != nil {
112			return errors.New("Unable to parse password")
113		}
114		c.ClientConfig.Password = p
115	} else if c.ClientConfig.Password == "" {
116		c.ClientConfig.Password = os.Getenv("INFLUX_PASSWORD")
117	}
118
119	addr := fmt.Sprintf("%s:%d/%s", c.Host, c.Port, c.PathPrefix)
120	url, err := client.ParseConnectionString(addr, c.Ssl)
121	if err != nil {
122		return err
123	}
124
125	c.URL = url
126
127	if err := c.Connect(""); err != nil {
128		msg := "Please check your connection settings and ensure 'influxd' is running."
129		if !c.Ssl && strings.Contains(err.Error(), "malformed HTTP response") {
130			// Attempt to connect with SSL and disable secure SSL for this test.
131			c.Ssl = true
132			unsafeSsl := c.ClientConfig.UnsafeSsl
133			c.ClientConfig.UnsafeSsl = true
134			if err := c.Connect(""); err == nil {
135				msg = "Please use the -ssl flag to connect using SSL."
136			}
137			c.Ssl = false
138			c.ClientConfig.UnsafeSsl = unsafeSsl
139		} else if c.Ssl && !c.ClientConfig.UnsafeSsl && strings.Contains(err.Error(), "certificate is valid for") {
140			// Attempt to connect with an insecure connection just to see if it works.
141			c.ClientConfig.UnsafeSsl = true
142			if err := c.Connect(""); err == nil {
143				msg = "You may use -unsafeSsl to connect anyway, but the SSL connection will not be secure."
144			}
145			c.ClientConfig.UnsafeSsl = false
146		}
147		return fmt.Errorf("Failed to connect to %s: %s\n%s", c.Client.Addr(), err.Error(), msg)
148	}
149
150	// Modify precision.
151	c.SetPrecision(c.ClientConfig.Precision)
152
153	if c.Execute != "" {
154		switch c.Type {
155		case QueryLanguageFlux:
156			return c.ExecuteFluxQuery(c.Execute)
157		default:
158			// Make the non-interactive mode send everything through the CLI's parser
159			// the same way the interactive mode works
160			lines := strings.Split(c.Execute, "\n")
161			for _, line := range lines {
162				if err := c.ParseCommand(line); err != nil {
163					return err
164				}
165			}
166		}
167		return nil
168	}
169
170	if c.Import {
171		// Copy the latest importer config and inject the latest client config
172		// into it.
173		config := c.ImporterConfig
174		config.Config = c.ClientConfig
175		config.URL = c.URL
176
177		i := v8.NewImporter(config)
178		if err := i.Import(); err != nil {
179			err = fmt.Errorf("ERROR: %s", err)
180			return err
181		}
182		return nil
183	}
184
185	if !hasTTY {
186		cmd, err := ioutil.ReadAll(os.Stdin)
187		if err != nil {
188			return err
189		}
190
191		switch c.Type {
192		case QueryLanguageFlux:
193			return c.ExecuteFluxQuery(string(cmd))
194		default:
195			return c.ExecuteQuery(string(cmd))
196		}
197	}
198
199	if !c.IgnoreSignals {
200		// register OS signals for graceful termination
201		signal.Notify(c.osSignals, syscall.SIGINT, syscall.SIGTERM)
202	}
203
204	if len(c.ServerVersion) == 0 {
205		fmt.Printf("WARN: Connected to %s, but found no server version.\n", c.Client.Addr())
206		fmt.Printf("Are you sure an InfluxDB server is listening at the given address?\n")
207	} else {
208		fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.ServerVersion)
209	}
210
211	c.Version()
212
213	if c.Type == QueryLanguageFlux {
214		repl, err := getFluxREPL(c.URL, c.ClientConfig.Username, c.ClientConfig.Password)
215		if err != nil {
216			return err
217		}
218		repl.Run()
219		os.Exit(0)
220	}
221
222	c.Line = liner.NewLiner()
223	defer c.Line.Close()
224
225	c.Line.SetMultiLineMode(true)
226
227	// Only load/write history if HOME environment variable is set.
228	var historyDir string
229	if runtime.GOOS == "windows" {
230		if userDir := os.Getenv("USERPROFILE"); userDir != "" {
231			historyDir = userDir
232		}
233	}
234
235	if homeDir := os.Getenv("HOME"); homeDir != "" {
236		historyDir = homeDir
237	}
238
239	// Attempt to load the history file.
240	if historyDir != "" {
241		c.historyFilePath = filepath.Join(historyDir, ".influx_history")
242		if historyFile, err := os.Open(c.historyFilePath); err == nil {
243			c.Line.ReadHistory(historyFile)
244			historyFile.Close()
245		}
246	}
247
248	// read from prompt until exit is run
249	return c.mainLoop()
250}
251
252// mainLoop runs the main prompt loop for the CLI.
253func (c *CommandLine) mainLoop() error {
254	for {
255		select {
256		case <-c.osSignals:
257			c.exit()
258			return nil
259		case <-c.Quit:
260			c.exit()
261			return nil
262		default:
263			l, e := c.Line.Prompt("> ")
264			if e == io.EOF {
265				// Instead of die, register that someone exited the program gracefully
266				l = "exit"
267			} else if e != nil {
268				c.exit()
269				return e
270			}
271			if err := c.ParseCommand(l); err != ErrBlankCommand && !strings.HasPrefix(strings.TrimSpace(l), "auth") {
272				l = influxql.Sanitize(l)
273				c.Line.AppendHistory(l)
274				c.saveHistory()
275			}
276		}
277	}
278}
279
280// ParseCommand parses an instruction and calls the related method
281// or executes the command as a query against InfluxDB.
282func (c *CommandLine) ParseCommand(cmd string) error {
283	lcmd := strings.TrimSpace(strings.ToLower(cmd))
284	tokens := strings.Fields(lcmd)
285
286	if len(tokens) > 0 {
287		switch tokens[0] {
288		case "exit", "quit":
289			close(c.Quit)
290		case "gopher":
291			c.gopher()
292		case "connect":
293			return c.Connect(cmd)
294		case "auth":
295			c.SetAuth(cmd)
296		case "help":
297			c.help()
298		case "history":
299			c.history()
300		case "format":
301			c.SetFormat(cmd)
302		case "precision":
303			c.SetPrecision(cmd)
304		case "consistency":
305			c.SetWriteConsistency(cmd)
306		case "settings":
307			c.Settings()
308		case "chunked":
309			c.Chunked = !c.Chunked
310			if c.Chunked {
311				fmt.Println("chunked responses enabled")
312			} else {
313				fmt.Println("chunked reponses disabled")
314			}
315		case "chunk":
316			c.SetChunkSize(cmd)
317		case "pretty":
318			c.Pretty = !c.Pretty
319			if c.Pretty {
320				fmt.Println("Pretty print enabled")
321			} else {
322				fmt.Println("Pretty print disabled")
323			}
324		case "use":
325			c.use(cmd)
326		case "node":
327			c.node(cmd)
328		case "insert":
329			return c.Insert(cmd)
330		case "clear":
331			c.clear(cmd)
332		default:
333			return c.ExecuteQuery(cmd)
334		}
335
336		return nil
337	}
338	return ErrBlankCommand
339}
340
341// Connect connects to a server.
342func (c *CommandLine) Connect(cmd string) error {
343	// normalize cmd
344	cmd = strings.ToLower(cmd)
345
346	ClientConfig := c.ClientConfig
347
348	// Remove the "connect" keyword if it exists
349	addr := strings.TrimSpace(strings.Replace(cmd, "connect", "", -1))
350	if addr == "" {
351		ClientConfig.URL = c.URL
352	} else {
353		url, err := client.ParseConnectionString(addr, c.Ssl)
354		if err != nil {
355			return err
356		}
357
358		ClientConfig.URL = url
359	}
360
361	ClientConfig.UserAgent = "InfluxDBShell/" + c.ClientVersion
362	ClientConfig.Proxy = http.ProxyFromEnvironment
363
364	client, err := client.NewClient(ClientConfig)
365	if err != nil {
366		return fmt.Errorf("Could not create client %s", err)
367	}
368	c.Client = client
369
370	_, v, err := c.Client.Ping()
371	if err != nil {
372		return err
373	}
374	c.ServerVersion = v
375
376	// Update the command with the current connection information
377	c.URL = ClientConfig.URL
378
379	return nil
380}
381
382// SetAuth sets client authentication credentials.
383func (c *CommandLine) SetAuth(cmd string) {
384	// If they pass in the entire command, we should parse it
385	// auth <username> <password>
386	args := strings.Fields(cmd)
387	if len(args) == 3 {
388		args = args[1:]
389	} else {
390		args = []string{}
391	}
392
393	if len(args) == 2 {
394		c.ClientConfig.Username = args[0]
395		c.ClientConfig.Password = args[1]
396	} else {
397		u, e := c.Line.Prompt("username: ")
398		if e != nil {
399			fmt.Printf("Unable to process input: %s", e)
400			return
401		}
402		c.ClientConfig.Username = strings.TrimSpace(u)
403		p, e := c.Line.PasswordPrompt("password: ")
404		if e != nil {
405			fmt.Printf("Unable to process input: %s", e)
406			return
407		}
408		c.ClientConfig.Password = p
409	}
410
411	// Update the client as well
412	c.Client.SetAuth(c.ClientConfig.Username, c.ClientConfig.Password)
413}
414
415func (c *CommandLine) clear(cmd string) {
416	args := strings.Split(strings.TrimSuffix(strings.TrimSpace(cmd), ";"), " ")
417	v := strings.ToLower(strings.Join(args[1:], " "))
418	switch v {
419	case "database", "db":
420		c.Database = ""
421		fmt.Println("database context cleared")
422		return
423	case "retention policy", "rp":
424		c.RetentionPolicy = ""
425		fmt.Println("retention policy context cleared")
426		return
427	default:
428		if len(args) > 1 {
429			fmt.Printf("invalid command %q.\n", v)
430		}
431		fmt.Println(`Possible commands for 'clear' are:
432    # Clear the database context
433    clear database
434    clear db
435
436    # Clear the retention policy context
437    clear retention policy
438    clear rp
439		`)
440	}
441}
442
443func (c *CommandLine) use(cmd string) {
444	args := strings.SplitAfterN(strings.TrimSuffix(strings.TrimSpace(cmd), ";"), " ", 2)
445	if len(args) != 2 {
446		fmt.Printf("Could not parse database name from %q.\n", cmd)
447		return
448	}
449
450	stmt := args[1]
451	db, rp, err := parseDatabaseAndRetentionPolicy([]byte(stmt))
452	if err != nil {
453		fmt.Printf("Unable to parse database or retention policy from %s", stmt)
454		return
455	}
456
457	if !c.databaseExists(db) {
458		fmt.Println("DB does not exist!")
459		return
460	}
461
462	c.Database = db
463	fmt.Printf("Using database %s\n", db)
464
465	if rp != "" {
466		if !c.retentionPolicyExists(db, rp) {
467			return
468		}
469		c.RetentionPolicy = rp
470		fmt.Printf("Using retention policy %s\n", rp)
471	}
472}
473
474func (c *CommandLine) databaseExists(db string) bool {
475	// Validate if specified database exists
476	response, err := c.Client.Query(client.Query{Command: "SHOW DATABASES"})
477	if err != nil {
478		fmt.Printf("ERR: %s\n", err)
479		return false
480	} else if err := response.Error(); err != nil {
481		if c.ClientConfig.Username == "" {
482			fmt.Printf("ERR: %s\n", err)
483			return false
484		}
485		// TODO(jsternberg): Fix SHOW DATABASES to be user-aware #6397.
486		// If we are unable to run SHOW DATABASES, display a warning and use the
487		// database anyway in case the person doesn't have permission to run the
488		// command, but does have permission to use the database.
489		fmt.Printf("WARN: %s\n", err)
490	} else {
491		// Verify the provided database exists
492		if databaseExists := func() bool {
493			for _, result := range response.Results {
494				for _, row := range result.Series {
495					if row.Name == "databases" {
496						for _, values := range row.Values {
497							for _, database := range values {
498								if database == db {
499									return true
500								}
501							}
502						}
503					}
504				}
505			}
506			return false
507		}(); !databaseExists {
508			fmt.Printf("ERR: Database %s doesn't exist. Run SHOW DATABASES for a list of existing databases.\n", db)
509			return false
510		}
511	}
512	return true
513}
514
515func (c *CommandLine) retentionPolicyExists(db, rp string) bool {
516	// Validate if specified database exists
517	response, err := c.Client.Query(client.Query{Command: fmt.Sprintf("SHOW RETENTION POLICIES ON %q", db)})
518	if err != nil {
519		fmt.Printf("ERR: %s\n", err)
520		return false
521	} else if err := response.Error(); err != nil {
522		if c.ClientConfig.Username == "" {
523			fmt.Printf("ERR: %s\n", err)
524			return false
525		}
526		fmt.Printf("WARN: %s\n", err)
527	} else {
528		// Verify the provided database exists
529		if retentionPolicyExists := func() bool {
530			for _, result := range response.Results {
531				for _, row := range result.Series {
532					for _, values := range row.Values {
533						for i, v := range values {
534							if i != 0 {
535								continue
536							}
537							if v == rp {
538								return true
539							}
540						}
541					}
542				}
543			}
544			return false
545		}(); !retentionPolicyExists {
546			fmt.Printf("ERR: RETENTION POLICY %s doesn't exist. Run SHOW RETENTION POLICIES ON %q for a list of existing retention polices.\n", rp, db)
547			return false
548		}
549	}
550	return true
551}
552
553func (c *CommandLine) node(cmd string) {
554	args := strings.Split(strings.TrimSuffix(strings.TrimSpace(cmd), ";"), " ")
555	if len(args) != 2 {
556		fmt.Println("Improper number of arguments for 'node' command, requires exactly one.")
557		return
558	}
559
560	if args[1] == "clear" {
561		c.NodeID = 0
562		return
563	}
564
565	id, err := strconv.Atoi(args[1])
566	if err != nil {
567		fmt.Printf("Unable to parse node id from %s. Must be an integer or 'clear'.\n", args[1])
568		return
569	}
570	c.NodeID = id
571}
572
573// SetChunkSize sets the chunk size
574// 0 sets it back to the default
575func (c *CommandLine) SetChunkSize(cmd string) {
576	// normalize cmd
577	cmd = strings.ToLower(cmd)
578	cmd = strings.Join(strings.Fields(cmd), " ")
579
580	// Remove the "chunk size" keyword if it exists
581	cmd = strings.TrimPrefix(cmd, "chunk size ")
582
583	// Remove the "chunk" keyword if it exists
584	// allows them to use `chunk 50` as a shortcut
585	cmd = strings.TrimPrefix(cmd, "chunk ")
586
587	if n, err := strconv.ParseInt(cmd, 10, 64); err == nil {
588		c.ChunkSize = int(n)
589		if c.ChunkSize <= 0 {
590			c.ChunkSize = 0
591		}
592		fmt.Printf("chunk size set to %d\n", c.ChunkSize)
593	} else {
594		fmt.Printf("unable to parse chunk size from %q\n", cmd)
595	}
596}
597
598// SetPrecision sets client precision.
599func (c *CommandLine) SetPrecision(cmd string) {
600	// normalize cmd
601	cmd = strings.ToLower(cmd)
602
603	// Remove the "precision" keyword if it exists
604	cmd = strings.TrimSpace(strings.Replace(cmd, "precision", "", -1))
605
606	switch cmd {
607	case "h", "m", "s", "ms", "u", "ns":
608		c.ClientConfig.Precision = cmd
609		c.Client.SetPrecision(c.ClientConfig.Precision)
610	case "rfc3339":
611		c.ClientConfig.Precision = ""
612		c.Client.SetPrecision(c.ClientConfig.Precision)
613	default:
614		fmt.Printf("Unknown precision %q. Please use rfc3339, h, m, s, ms, u or ns.\n", cmd)
615	}
616}
617
618// SetFormat sets output format.
619func (c *CommandLine) SetFormat(cmd string) {
620	// normalize cmd
621	cmd = strings.ToLower(cmd)
622	// Remove the "format" keyword if it exists
623	cmd = strings.TrimSpace(strings.Replace(cmd, "format", "", -1))
624
625	switch cmd {
626	case "json", "csv", "column":
627		c.Format = cmd
628	default:
629		fmt.Printf("Unknown format %q. Please use json, csv, or column.\n", cmd)
630	}
631}
632
633// SetWriteConsistency sets write consistency level.
634func (c *CommandLine) SetWriteConsistency(cmd string) {
635	// normalize cmd
636	cmd = strings.ToLower(cmd)
637	// Remove the "consistency" keyword if it exists
638	cmd = strings.TrimSpace(strings.Replace(cmd, "consistency", "", -1))
639
640	_, err := models.ParseConsistencyLevel(cmd)
641	if err != nil {
642		fmt.Printf("Unknown consistency level %q. Please use any, one, quorum, or all.\n", cmd)
643		return
644	}
645	c.ClientConfig.WriteConsistency = cmd
646}
647
648// isWhitespace returns true if the rune is a space, tab, or newline.
649func isWhitespace(ch rune) bool { return ch == ' ' || ch == '\t' || ch == '\n' }
650
651// isLetter returns true if the rune is a letter.
652func isLetter(ch rune) bool { return (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') }
653
654// isDigit returns true if the rune is a digit.
655func isDigit(ch rune) bool { return (ch >= '0' && ch <= '9') }
656
657// isIdentFirstChar returns true if the rune can be used as the first char in an unquoted identifer.
658func isIdentFirstChar(ch rune) bool { return isLetter(ch) || ch == '_' }
659
660// isIdentChar returns true if the rune can be used in an unquoted identifier.
661func isNotIdentChar(ch rune) bool { return !(isLetter(ch) || isDigit(ch) || ch == '_') }
662
663func parseUnquotedIdentifier(stmt string) (string, string) {
664	if fields := strings.FieldsFunc(stmt, isNotIdentChar); len(fields) > 0 {
665		return fields[0], strings.TrimPrefix(stmt, fields[0])
666	}
667	return "", stmt
668}
669
670func parseDoubleQuotedIdentifier(stmt string) (string, string) {
671	escapeNext := false
672	fields := strings.FieldsFunc(stmt, func(ch rune) bool {
673		if ch == '\\' {
674			escapeNext = true
675		} else if ch == '"' {
676			if !escapeNext {
677				return true
678			}
679			escapeNext = false
680		}
681		return false
682	})
683	if len(fields) > 0 {
684		return fields[0], strings.TrimPrefix(stmt, "\""+fields[0]+"\"")
685	}
686	return "", stmt
687}
688
689func parseNextIdentifier(stmt string) (ident, remainder string) {
690	if len(stmt) > 0 {
691		switch {
692		case isWhitespace(rune(stmt[0])):
693			return parseNextIdentifier(stmt[1:])
694		case isIdentFirstChar(rune(stmt[0])):
695			return parseUnquotedIdentifier(stmt)
696		case stmt[0] == '"':
697			return parseDoubleQuotedIdentifier(stmt)
698		}
699	}
700	return "", stmt
701}
702
703func (c *CommandLine) parseInto(stmt string) *client.BatchPoints {
704	ident, stmt := parseNextIdentifier(stmt)
705	db, rp := c.Database, c.RetentionPolicy
706	if strings.HasPrefix(stmt, ".") {
707		db = ident
708		ident, stmt = parseNextIdentifier(stmt[1:])
709	}
710	if strings.HasPrefix(stmt, " ") {
711		rp = ident
712		stmt = stmt[1:]
713	}
714
715	return &client.BatchPoints{
716		Points: []client.Point{
717			client.Point{Raw: stmt},
718		},
719		Database:         db,
720		RetentionPolicy:  rp,
721		Precision:        c.ClientConfig.Precision,
722		WriteConsistency: c.ClientConfig.WriteConsistency,
723	}
724}
725
726func (c *CommandLine) parseInsert(stmt string) (*client.BatchPoints, error) {
727	i, point := parseNextIdentifier(stmt)
728	if !strings.EqualFold(i, "insert") {
729		return nil, fmt.Errorf("found %s, expected INSERT", i)
730	}
731	if i, r := parseNextIdentifier(point); strings.EqualFold(i, "into") {
732		bp := c.parseInto(r)
733		return bp, nil
734	}
735	return &client.BatchPoints{
736		Points: []client.Point{
737			client.Point{Raw: point},
738		},
739		Database:         c.Database,
740		RetentionPolicy:  c.RetentionPolicy,
741		Precision:        c.ClientConfig.Precision,
742		WriteConsistency: c.ClientConfig.WriteConsistency,
743	}, nil
744}
745
746// Insert runs an INSERT statement.
747func (c *CommandLine) Insert(stmt string) error {
748	bp, err := c.parseInsert(stmt)
749	if err != nil {
750		fmt.Printf("ERR: %s\n", err)
751		return nil
752	}
753	if _, err := c.Client.Write(*bp); err != nil {
754		fmt.Printf("ERR: %s\n", err)
755		if c.Database == "" {
756			fmt.Println("Note: error may be due to not setting a database or retention policy.")
757			fmt.Println(`Please set a database with the command "use <database>" or`)
758			fmt.Println("INSERT INTO <database>.<retention-policy> <point>")
759		}
760	}
761	return nil
762}
763
764// query creates a query struct to be used with the client.
765func (c *CommandLine) query(query string) client.Query {
766	return client.Query{
767		Command:         query,
768		Database:        c.Database,
769		RetentionPolicy: c.RetentionPolicy,
770		Chunked:         c.Chunked,
771		ChunkSize:       c.ChunkSize,
772		NodeID:          c.NodeID,
773	}
774}
775
776// ExecuteQuery runs any query statement.
777func (c *CommandLine) ExecuteQuery(query string) error {
778	// If we have a retention policy, we need to rewrite the statement sources
779	if c.RetentionPolicy != "" {
780		pq, err := influxql.NewParser(strings.NewReader(query)).ParseQuery()
781		if err != nil {
782			fmt.Printf("ERR: %s\n", err)
783			return err
784		}
785		for _, stmt := range pq.Statements {
786			if selectStatement, ok := stmt.(*influxql.SelectStatement); ok {
787				influxql.WalkFunc(selectStatement.Sources, func(n influxql.Node) {
788					if t, ok := n.(*influxql.Measurement); ok {
789						if t.Database == "" && c.Database != "" {
790							t.Database = c.Database
791						}
792						if t.RetentionPolicy == "" && c.RetentionPolicy != "" {
793							t.RetentionPolicy = c.RetentionPolicy
794						}
795					}
796				})
797			}
798		}
799		query = pq.String()
800	}
801
802	ctx := context.Background()
803	if !c.IgnoreSignals {
804		done := make(chan struct{})
805		defer close(done)
806
807		var cancel func()
808		ctx, cancel = context.WithCancel(ctx)
809		go func() {
810			select {
811			case <-done:
812			case <-c.osSignals:
813				cancel()
814			}
815		}()
816	}
817
818	response, err := c.Client.QueryContext(ctx, c.query(query))
819	if err != nil {
820		if err.Error() == "" {
821			err = ctx.Err()
822			if err == context.Canceled {
823				err = errors.New("aborted by user")
824			} else if err == nil {
825				err = errors.New("no data received")
826			}
827		}
828		fmt.Printf("ERR: %s\n", err)
829		return err
830	}
831	c.FormatResponse(response, os.Stdout)
832	if err := response.Error(); err != nil {
833		fmt.Printf("ERR: %s\n", response.Error())
834		if c.Database == "" {
835			fmt.Println("Warning: It is possible this error is due to not setting a database.")
836			fmt.Println(`Please set a database with the command "use <database>".`)
837		}
838		return err
839	}
840	return nil
841}
842
843// FormatResponse formats output to the previously chosen format.
844func (c *CommandLine) FormatResponse(response *client.Response, w io.Writer) {
845	switch c.Format {
846	case "json":
847		c.writeJSON(response, w)
848	case "csv":
849		c.writeCSV(response, w)
850	case "column":
851		c.writeColumns(response, w)
852	default:
853		fmt.Fprintf(w, "Unknown output format %q.\n", c.Format)
854	}
855}
856
857func (c *CommandLine) writeJSON(response *client.Response, w io.Writer) {
858	var data []byte
859	var err error
860	if c.Pretty {
861		data, err = json.MarshalIndent(response, "", "    ")
862	} else {
863		data, err = json.Marshal(response)
864	}
865	if err != nil {
866		fmt.Fprintf(w, "Unable to parse json: %s\n", err)
867		return
868	}
869	fmt.Fprintln(w, string(data))
870}
871
872func tagsEqual(prev, current map[string]string) bool {
873	return reflect.DeepEqual(prev, current)
874}
875
876func columnsEqual(prev, current []string) bool {
877	return reflect.DeepEqual(prev, current)
878}
879
880func headersEqual(prev, current models.Row) bool {
881	if prev.Name != current.Name {
882		return false
883	}
884	return tagsEqual(prev.Tags, current.Tags) && columnsEqual(prev.Columns, current.Columns)
885}
886
887func (c *CommandLine) writeCSV(response *client.Response, w io.Writer) {
888	csvw := csv.NewWriter(w)
889	var previousHeaders models.Row
890	for _, result := range response.Results {
891		suppressHeaders := len(result.Series) > 0 && headersEqual(previousHeaders, result.Series[0])
892		if !suppressHeaders && len(result.Series) > 0 {
893			previousHeaders = models.Row{
894				Name:    result.Series[0].Name,
895				Tags:    result.Series[0].Tags,
896				Columns: result.Series[0].Columns,
897			}
898		}
899
900		// Create a tabbed writer for each result as they won't always line up
901		rows := c.formatResults(result, "\t", suppressHeaders)
902		for _, r := range rows {
903			csvw.Write(strings.Split(r, "\t"))
904		}
905	}
906	csvw.Flush()
907}
908
909func (c *CommandLine) writeColumns(response *client.Response, w io.Writer) {
910	// Create a tabbed writer for each result as they won't always line up
911	writer := new(tabwriter.Writer)
912	writer.Init(w, 0, 8, 1, ' ', 0)
913
914	var previousHeaders models.Row
915	for i, result := range response.Results {
916		// Print out all messages first
917		for _, m := range result.Messages {
918			fmt.Fprintf(w, "%s: %s.\n", m.Level, m.Text)
919		}
920		// Check to see if the headers are the same as the previous row.  If so, suppress them in the output
921		suppressHeaders := len(result.Series) > 0 && headersEqual(previousHeaders, result.Series[0])
922		if !suppressHeaders && len(result.Series) > 0 {
923			previousHeaders = models.Row{
924				Name:    result.Series[0].Name,
925				Tags:    result.Series[0].Tags,
926				Columns: result.Series[0].Columns,
927			}
928		}
929
930		// If we are suppressing headers, don't output the extra line return. If we
931		// aren't suppressing headers, then we put out line returns between results
932		// (not before the first result, and not after the last result).
933		if !suppressHeaders && i > 0 {
934			fmt.Fprintln(writer, "")
935		}
936
937		rows := c.formatResults(result, "\t", suppressHeaders)
938		for _, r := range rows {
939			fmt.Fprintln(writer, r)
940		}
941
942	}
943	writer.Flush()
944}
945
946// formatResults will behave differently if you are formatting for columns or csv
947func (c *CommandLine) formatResults(result client.Result, separator string, suppressHeaders bool) []string {
948	rows := []string{}
949	// Create a tabbed writer for each result as they won't always line up
950	for i, row := range result.Series {
951		// gather tags
952		tags := []string{}
953		for k, v := range row.Tags {
954			tags = append(tags, fmt.Sprintf("%s=%s", k, v))
955			sort.Strings(tags)
956		}
957
958		columnNames := []string{}
959
960		// Only put name/tags in a column if format is csv
961		if c.Format == "csv" {
962			if len(tags) > 0 {
963				columnNames = append([]string{"tags"}, columnNames...)
964			}
965
966			if row.Name != "" {
967				columnNames = append([]string{"name"}, columnNames...)
968			}
969		}
970
971		columnNames = append(columnNames, row.Columns...)
972
973		// Output a line separator if we have more than one set or results and format is column
974		if i > 0 && c.Format == "column" && !suppressHeaders {
975			rows = append(rows, "")
976		}
977
978		// If we are column format, we break out the name/tag to separate lines
979		if c.Format == "column" && !suppressHeaders {
980			if row.Name != "" {
981				n := fmt.Sprintf("name: %s", row.Name)
982				rows = append(rows, n)
983			}
984			if len(tags) > 0 {
985				t := fmt.Sprintf("tags: %s", (strings.Join(tags, ", ")))
986				rows = append(rows, t)
987			}
988		}
989
990		if !suppressHeaders {
991			rows = append(rows, strings.Join(columnNames, separator))
992		}
993
994		// if format is column, write dashes under each column
995		if c.Format == "column" && !suppressHeaders {
996			lines := []string{}
997			for _, columnName := range columnNames {
998				lines = append(lines, strings.Repeat("-", len(columnName)))
999			}
1000			rows = append(rows, strings.Join(lines, separator))
1001		}
1002
1003		for _, v := range row.Values {
1004			var values []string
1005			if c.Format == "csv" {
1006				if row.Name != "" {
1007					values = append(values, row.Name)
1008				}
1009				if len(tags) > 0 {
1010					values = append(values, strings.Join(tags, ","))
1011				}
1012			}
1013
1014			for _, vv := range v {
1015				values = append(values, interfaceToString(vv))
1016			}
1017			rows = append(rows, strings.Join(values, separator))
1018		}
1019	}
1020	return rows
1021}
1022
1023func interfaceToString(v interface{}) string {
1024	switch t := v.(type) {
1025	case nil:
1026		return ""
1027	case bool:
1028		return fmt.Sprintf("%v", v)
1029	case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, uintptr:
1030		return fmt.Sprintf("%d", t)
1031	case float32, float64:
1032		return fmt.Sprintf("%v", t)
1033	default:
1034		return fmt.Sprintf("%v", t)
1035	}
1036}
1037
1038// Settings prints current settings.
1039func (c *CommandLine) Settings() {
1040	w := new(tabwriter.Writer)
1041	w.Init(os.Stdout, 0, 1, 1, ' ', 0)
1042	fmt.Fprintln(w, "Setting\tValue")
1043	fmt.Fprintln(w, "--------\t--------")
1044	fmt.Fprintf(w, "URL\t%s\n", c.URL.String())
1045	fmt.Fprintf(w, "Username\t%s\n", c.ClientConfig.Username)
1046	fmt.Fprintf(w, "Database\t%s\n", c.Database)
1047	fmt.Fprintf(w, "RetentionPolicy\t%s\n", c.RetentionPolicy)
1048	fmt.Fprintf(w, "Pretty\t%v\n", c.Pretty)
1049	fmt.Fprintf(w, "Format\t%s\n", c.Format)
1050	fmt.Fprintf(w, "Write Consistency\t%s\n", c.ClientConfig.WriteConsistency)
1051	fmt.Fprintf(w, "Chunked\t%v\n", c.Chunked)
1052	fmt.Fprintf(w, "Chunk Size\t%d\n", c.ChunkSize)
1053	fmt.Fprintln(w)
1054	w.Flush()
1055}
1056
1057func (c *CommandLine) help() {
1058	fmt.Println(`Usage:
1059        connect <host:port>   connects to another node specified by host:port
1060        auth                  prompts for username and password
1061        pretty                toggles pretty print for the json format
1062        chunked               turns on chunked responses from server
1063        chunk size <size>     sets the size of the chunked responses.  Set to 0 to reset to the default chunked size
1064        use <db_name>         sets current database
1065        format <format>       specifies the format of the server responses: json, csv, or column
1066        precision <format>    specifies the format of the timestamp: rfc3339, h, m, s, ms, u or ns
1067        consistency <level>   sets write consistency level: any, one, quorum, or all
1068        history               displays command history
1069        settings              outputs the current settings for the shell
1070        clear                 clears settings such as database or retention policy.  run 'clear' for help
1071        exit/quit/ctrl+d      quits the influx shell
1072
1073        show databases        show database names
1074        show series           show series information
1075        show measurements     show measurement information
1076        show tag keys         show tag key information
1077        show field keys       show field key information
1078
1079        A full list of influxql commands can be found at:
1080        https://docs.influxdata.com/influxdb/latest/query_language/spec/`)
1081}
1082
1083func (c *CommandLine) history() {
1084	var buf bytes.Buffer
1085	c.Line.WriteHistory(&buf)
1086	fmt.Print(buf.String())
1087}
1088
1089func (c *CommandLine) saveHistory() {
1090	if c.historyFilePath == "" {
1091		return
1092	}
1093	if historyFile, err := os.Create(c.historyFilePath); err != nil {
1094		fmt.Printf("There was an error writing history file: %s\n", err)
1095	} else {
1096		c.Line.WriteHistory(historyFile)
1097		historyFile.Close()
1098	}
1099}
1100
1101func (c *CommandLine) gopher() {
1102	fmt.Println(`
1103                                          .-::-::://:-::-    .:/++/'
1104                                     '://:-''/oo+//++o+/.://o-    ./+:
1105                                  .:-.    '++-         .o/ '+yydhy'  o-
1106                               .:/.      .h:         :osoys  .smMN-  :/
1107                            -/:.'        s-         /MMMymh.   '/y/  s'
1108                         -+s:''''        d          -mMMms//     '-/o:
1109                       -/++/++/////:.    o:          '... s-        :s.
1110                     :+-+s-'       ':/'  's-             /+          'o:
1111                   '+-'o:        /ydhsh.  '//.        '-o-             o-
1112                  .y. o:        .MMMdm+y    ':+++:::/+:.'               s:
1113                .-h/  y-        'sdmds'h -+ydds:::-.'                   'h.
1114             .//-.d'  o:          '.' 'dsNMMMNh:.:++'                    :y
1115            +y.  'd   's.            .s:mddds:     ++                     o/
1116           'N-  odd    'o/.       './o-s-'   .---+++'                      o-
1117           'N'  yNd      .://:/:::::. -s   -+/s/./s'                       'o/'
1118            so'  .h         ''''       ////s: '+. .s                         +y'
1119             os/-.y'                       's' 'y::+                          +d'
1120               '.:o/                        -+:-:.'                            so.---.'
1121                   o'                                                          'd-.''/s'
1122                   .s'                                                          :y.''.y
1123                    -s                                                           mo:::'
1124                     ::                                                          yh
1125                      //                                      ''''               /M'
1126                       o+                                    .s///:/.            'N:
1127                        :+                                   /:    -s'            ho
1128                         's-                               -/s/:+/.+h'            +h
1129                           ys'                            ':'    '-.              -d
1130                            oh                                                    .h
1131                             /o                                                   .s
1132                              s.                                                  .h
1133                              -y                                                  .d
1134                               m/                                                 -h
1135                               +d                                                 /o
1136                               'N-                                                y:
1137                                h:                                                m.
1138                                s-                                               -d
1139                                o-                                               s+
1140                                +-                                              'm'
1141                                s/                                              oo--.
1142                                y-                                             /s  ':+'
1143                                s'                                           'od--' .d:
1144                                -+                                         ':o: ':+-/+
1145                                 y-                                      .:+-      '
1146                                //o-                                 '.:+/.
1147                                .-:+/'                           ''-/+/.
1148                                    ./:'                    ''.:o+/-'
1149                                      .+o:/:/+-'      ''.-+ooo/-'
1150                                         o:   -h///++////-.
1151                                        /:   .o/
1152                                       //+  'y
1153                                       ./sooy.`)
1154}
1155
1156// Version prints the CLI version.
1157func (c *CommandLine) Version() {
1158	fmt.Println("InfluxDB shell version:", c.ClientVersion)
1159}
1160
1161func (c *CommandLine) exit() {
1162	// write to history file
1163	c.saveHistory()
1164	// release line resources
1165	c.Line.Close()
1166	c.Line = nil
1167}
1168
1169func (c *CommandLine) ExecuteFluxQuery(query string) error {
1170	ctx := context.Background()
1171	if !c.IgnoreSignals {
1172		done := make(chan struct{})
1173		defer close(done)
1174
1175		var cancel func()
1176		ctx, cancel = context.WithCancel(ctx)
1177		go func() {
1178			select {
1179			case <-done:
1180			case <-c.osSignals:
1181				cancel()
1182			}
1183		}()
1184	}
1185
1186	repl, err := getFluxREPL(c.URL, c.ClientConfig.Username, c.ClientConfig.Password)
1187	if err != nil {
1188		return err
1189	}
1190
1191	return repl.Input(query)
1192}
1193
1194type QueryLanguage uint8
1195
1196const (
1197	QueryLanguageInfluxQL QueryLanguage = iota
1198	QueryLanguageFlux
1199)
1200
1201func (l *QueryLanguage) Set(s string) error {
1202	switch s {
1203	case "influxql":
1204		*l = QueryLanguageInfluxQL
1205	case "flux":
1206		*l = QueryLanguageFlux
1207	default:
1208		return fmt.Errorf("%q not supported: specify influxql or flux", s)
1209	}
1210	return nil
1211}
1212
1213func (l *QueryLanguage) String() string {
1214	switch *l {
1215	case QueryLanguageInfluxQL:
1216		return "influxql"
1217	case QueryLanguageFlux:
1218		return "flux"
1219	}
1220	return fmt.Sprintf("QueryLanguage(%d)", uint8(*l))
1221}
1222