1package exec
2
3import (
4	"bytes"
5	"encoding/json"
6	"flag"
7	"fmt"
8	"io"
9	"os"
10	"path"
11	"regexp"
12	"strconv"
13	"strings"
14	"time"
15	"unicode"
16
17	"github.com/hashicorp/consul/api"
18	"github.com/hashicorp/consul/command/flags"
19	"github.com/mitchellh/cli"
20)
21
22func New(ui cli.Ui, shutdownCh <-chan struct{}) *cmd {
23	c := &cmd{UI: ui, shutdownCh: shutdownCh}
24	c.init()
25	return c
26}
27
28type cmd struct {
29	UI    cli.Ui
30	flags *flag.FlagSet
31	http  *flags.HTTPFlags
32	help  string
33
34	shutdownCh <-chan struct{}
35	conf       rExecConf
36	apiclient  *api.Client
37	sessionID  string
38	stopCh     chan struct{}
39}
40
41func (c *cmd) init() {
42	c.flags = flag.NewFlagSet("", flag.ContinueOnError)
43	c.flags.StringVar(&c.conf.node, "node", "",
44		"Regular expression to filter on node names.")
45	c.flags.StringVar(&c.conf.service, "service", "",
46		"Regular expression to filter on service instances.")
47	c.flags.StringVar(&c.conf.tag, "tag", "",
48		"Regular expression to filter on service tags. Must be used with -service.")
49	c.flags.StringVar(&c.conf.prefix, "prefix", rExecPrefix,
50		"Prefix in the KV store to use for request data.")
51	c.flags.BoolVar(&c.conf.shell, "shell", true,
52		"Use a shell to run the command.")
53	c.flags.DurationVar(&c.conf.wait, "wait", rExecQuietWait,
54		"Period to wait with no responses before terminating execution.")
55	c.flags.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait,
56		"Period to wait for replication before firing event. This is an optimization to allow stale reads to be performed.")
57	c.flags.BoolVar(&c.conf.verbose, "verbose", false,
58		"Enables verbose output.")
59
60	c.http = &flags.HTTPFlags{}
61	flags.Merge(c.flags, c.http.ClientFlags())
62	flags.Merge(c.flags, c.http.ServerFlags())
63	c.help = flags.Usage(help, c.flags)
64}
65
66func (c *cmd) Run(args []string) int {
67	if err := c.flags.Parse(args); err != nil {
68		return 1
69	}
70
71	// Join the commands to execute
72	c.conf.cmd = strings.Join(c.flags.Args(), " ")
73
74	// If there is no command, read stdin for a script input
75	if c.conf.cmd == "-" {
76		if !c.conf.shell {
77			c.UI.Error("Cannot configure -shell=false when reading from stdin")
78			return 1
79		}
80
81		c.conf.cmd = ""
82		var buf bytes.Buffer
83		_, err := io.Copy(&buf, os.Stdin)
84		if err != nil {
85			c.UI.Error(fmt.Sprintf("Failed to read stdin: %v", err))
86			c.UI.Error("")
87			c.UI.Error(c.Help())
88			return 1
89		}
90		c.conf.script = buf.Bytes()
91	} else if !c.conf.shell {
92		c.conf.cmd = ""
93		c.conf.args = c.flags.Args()
94	}
95
96	// Ensure we have a command or script
97	if c.conf.cmd == "" && len(c.conf.script) == 0 && len(c.conf.args) == 0 {
98		c.UI.Error("Must specify a command to execute")
99		c.UI.Error("")
100		c.UI.Error(c.Help())
101		return 1
102	}
103
104	// Validate the configuration
105	if err := c.conf.validate(); err != nil {
106		c.UI.Error(err.Error())
107		return 1
108	}
109
110	// Create and test the HTTP client
111	client, err := c.http.APIClient()
112	if err != nil {
113		c.UI.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
114		return 1
115	}
116	info, err := client.Agent().Self()
117	if err != nil {
118		c.UI.Error(fmt.Sprintf("Error querying Consul agent: %s", err))
119		return 1
120	}
121	c.apiclient = client
122
123	// Check if this is a foreign datacenter
124	if c.http.Datacenter() != "" && c.http.Datacenter() != info["Config"]["Datacenter"] {
125		if c.conf.verbose {
126			c.UI.Info("Remote exec in foreign datacenter, using Session TTL")
127		}
128		c.conf.foreignDC = true
129		c.conf.localDC = info["Config"]["Datacenter"].(string)
130		c.conf.localNode = info["Config"]["NodeName"].(string)
131	}
132
133	// Create the job spec
134	spec, err := c.makeRExecSpec()
135	if err != nil {
136		c.UI.Error(fmt.Sprintf("Failed to create job spec: %s", err))
137		return 1
138	}
139
140	// Create a session for this
141	c.sessionID, err = c.createSession()
142	if err != nil {
143		c.UI.Error(fmt.Sprintf("Failed to create session: %s", err))
144		return 1
145	}
146	defer c.destroySession()
147	if c.conf.verbose {
148		c.UI.Info(fmt.Sprintf("Created remote execution session: %s", c.sessionID))
149	}
150
151	// Upload the payload
152	if err := c.uploadPayload(spec); err != nil {
153		c.UI.Error(fmt.Sprintf("Failed to create job file: %s", err))
154		return 1
155	}
156	defer c.destroyData()
157	if c.conf.verbose {
158		c.UI.Info(fmt.Sprintf("Uploaded remote execution spec"))
159	}
160
161	// Wait for replication. This is done so that when the event is
162	// received, the job file can be read using a stale read. If the
163	// stale read fails, we expect a consistent read to be done, so
164	// largely this is a heuristic.
165	select {
166	case <-time.After(c.conf.replWait):
167	case <-c.shutdownCh:
168		return 1
169	}
170
171	// Fire the event
172	id, err := c.fireEvent()
173	if err != nil {
174		c.UI.Error(fmt.Sprintf("Failed to fire event: %s", err))
175		return 1
176	}
177	if c.conf.verbose {
178		c.UI.Info(fmt.Sprintf("Fired remote execution event: %s", id))
179	}
180
181	// Wait for the job to finish now
182	return c.waitForJob()
183}
184
185func (c *cmd) Synopsis() string {
186	return synopsis
187}
188
189func (c *cmd) Help() string {
190	return c.help
191}
192
193const synopsis = "Executes a command on Consul nodes"
194const help = `
195Usage: consul exec [options] [-|command...]
196
197  Evaluates a command on remote Consul nodes. The nodes responding can
198  be filtered using regular expressions on node name, service, and tag
199  definitions. If a command is '-', stdin will be read until EOF
200  and used as a script input.
201`
202
203// waitForJob is used to poll for results and wait until the job is terminated
204func (c *cmd) waitForJob() int {
205	// Although the session destroy is already deferred, we do it again here,
206	// because invalidation of the session before destroyData() ensures there is
207	// no race condition allowing an agent to upload data (the acquire will fail).
208	defer c.destroySession()
209	start := time.Now()
210	ackCh := make(chan rExecAck, 128)
211	heartCh := make(chan rExecHeart, 128)
212	outputCh := make(chan rExecOutput, 128)
213	exitCh := make(chan rExecExit, 128)
214	doneCh := make(chan struct{})
215	errCh := make(chan struct{}, 1)
216	defer close(doneCh)
217	go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh)
218	target := &TargetedUI{UI: c.UI}
219
220	var ackCount, exitCount, badExit int
221OUTER:
222	for {
223		// Determine wait time. We provide a larger window if we know about
224		// nodes which are still working.
225		waitIntv := c.conf.wait
226		if ackCount > exitCount {
227			waitIntv *= 2
228		}
229
230		select {
231		case e := <-ackCh:
232			ackCount++
233			if c.conf.verbose {
234				target.Target = e.Node
235				target.Info("acknowledged")
236			}
237
238		case h := <-heartCh:
239			if c.conf.verbose {
240				target.Target = h.Node
241				target.Info("heartbeat received")
242			}
243
244		case e := <-outputCh:
245			target.Target = e.Node
246			target.Output(string(e.Output))
247
248		case e := <-exitCh:
249			exitCount++
250			target.Target = e.Node
251			target.Info(fmt.Sprintf("finished with exit code %d", e.Code))
252			if e.Code != 0 {
253				badExit++
254			}
255
256		case <-time.After(waitIntv):
257			c.UI.Info(fmt.Sprintf("%d / %d node(s) completed / acknowledged", exitCount, ackCount))
258			if c.conf.verbose {
259				c.UI.Info(fmt.Sprintf("Completed in %0.2f seconds",
260					float64(time.Since(start))/float64(time.Second)))
261			}
262			if exitCount < ackCount {
263				badExit++
264			}
265			break OUTER
266
267		case <-errCh:
268			return 1
269
270		case <-c.shutdownCh:
271			return 1
272		}
273	}
274
275	if badExit > 0 {
276		return 2
277	}
278	return 0
279}
280
281// streamResults is used to perform blocking queries against the KV endpoint and stream in
282// notice of various events into waitForJob
283func (c *cmd) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart,
284	outputCh chan rExecOutput, exitCh chan rExecExit, errCh chan struct{}) {
285	kv := c.apiclient.KV()
286	opts := api.QueryOptions{WaitTime: c.conf.wait}
287	dir := path.Join(c.conf.prefix, c.sessionID) + "/"
288	seen := make(map[string]struct{})
289
290	for {
291		// Check if we've been signaled to exit
292		select {
293		case <-doneCh:
294			return
295		default:
296		}
297
298		// Block on waiting for new keys
299		keys, qm, err := kv.Keys(dir, "", &opts)
300		if err != nil {
301			c.UI.Error(fmt.Sprintf("Failed to read results: %s", err))
302			goto ERR_EXIT
303		}
304
305		// Fast-path the no-change case
306		if qm.LastIndex == opts.WaitIndex {
307			continue
308		}
309		opts.WaitIndex = qm.LastIndex
310
311		// Handle each key
312		for _, key := range keys {
313			// Ignore if we've seen it
314			if _, ok := seen[key]; ok {
315				continue
316			}
317			seen[key] = struct{}{}
318
319			// Trim the directory
320			full := key
321			key = strings.TrimPrefix(key, dir)
322
323			// Handle the key type
324			switch {
325			case key == rExecFileName:
326				continue
327			case strings.HasSuffix(key, rExecAckSuffix):
328				ackCh <- rExecAck{Node: strings.TrimSuffix(key, rExecAckSuffix)}
329
330			case strings.HasSuffix(key, rExecExitSuffix):
331				pair, _, err := kv.Get(full, nil)
332				if err != nil || pair == nil {
333					c.UI.Error(fmt.Sprintf("Failed to read key '%s': %v", full, err))
334					continue
335				}
336				code, err := strconv.ParseInt(string(pair.Value), 10, 32)
337				if err != nil {
338					c.UI.Error(fmt.Sprintf("Failed to parse exit code '%s': %v", pair.Value, err))
339					continue
340				}
341				exitCh <- rExecExit{
342					Node: strings.TrimSuffix(key, rExecExitSuffix),
343					Code: int(code),
344				}
345
346			case strings.LastIndex(key, rExecOutputDivider) != -1:
347				pair, _, err := kv.Get(full, nil)
348				if err != nil || pair == nil {
349					c.UI.Error(fmt.Sprintf("Failed to read key '%s': %v", full, err))
350					continue
351				}
352				idx := strings.LastIndex(key, rExecOutputDivider)
353				node := key[:idx]
354				if len(pair.Value) == 0 {
355					heartCh <- rExecHeart{Node: node}
356				} else {
357					outputCh <- rExecOutput{Node: node, Output: pair.Value}
358				}
359
360			default:
361				c.UI.Error(fmt.Sprintf("Unknown key '%s', ignoring.", key))
362			}
363		}
364	}
365
366ERR_EXIT:
367	select {
368	case errCh <- struct{}{}:
369	default:
370	}
371}
372
373// validate checks that the configuration is sane
374func (conf *rExecConf) validate() error {
375	// Validate the filters
376	if conf.node != "" {
377		if _, err := regexp.Compile(conf.node); err != nil {
378			return fmt.Errorf("Failed to compile node filter regexp: %v", err)
379		}
380	}
381	if conf.service != "" {
382		if _, err := regexp.Compile(conf.service); err != nil {
383			return fmt.Errorf("Failed to compile service filter regexp: %v", err)
384		}
385	}
386	if conf.tag != "" {
387		if _, err := regexp.Compile(conf.tag); err != nil {
388			return fmt.Errorf("Failed to compile tag filter regexp: %v", err)
389		}
390	}
391	if conf.tag != "" && conf.service == "" {
392		return fmt.Errorf("Cannot provide tag filter without service filter.")
393	}
394	return nil
395}
396
397// createSession is used to create a new session for this command
398func (c *cmd) createSession() (string, error) {
399	var id string
400	var err error
401	if c.conf.foreignDC {
402		id, err = c.createSessionForeign()
403	} else {
404		id, err = c.createSessionLocal()
405	}
406	if err == nil {
407		c.stopCh = make(chan struct{})
408		go c.renewSession(id, c.stopCh)
409	}
410	return id, err
411}
412
413// createSessionLocal is used to create a new session in a local datacenter
414// This is simpler since we can use the local agent to create the session.
415func (c *cmd) createSessionLocal() (string, error) {
416	session := c.apiclient.Session()
417	se := api.SessionEntry{
418		Name:     "Remote Exec",
419		Behavior: api.SessionBehaviorDelete,
420		TTL:      rExecTTL,
421	}
422	id, _, err := session.Create(&se, nil)
423	return id, err
424}
425
426// createSessionLocal is used to create a new session in a foreign datacenter
427// This is more complex since the local agent cannot be used to create
428// a session, and we must associate with a node in the remote datacenter.
429func (c *cmd) createSessionForeign() (string, error) {
430	// Look for a remote node to bind to
431	health := c.apiclient.Health()
432	services, _, err := health.Service("consul", "", true, nil)
433	if err != nil {
434		return "", fmt.Errorf("Failed to find Consul server in remote datacenter: %v", err)
435	}
436	if len(services) == 0 {
437		return "", fmt.Errorf("Failed to find Consul server in remote datacenter")
438	}
439	node := services[0].Node.Node
440	if c.conf.verbose {
441		c.UI.Info(fmt.Sprintf("Binding session to remote node %s@%s", node, c.http.Datacenter()))
442	}
443
444	session := c.apiclient.Session()
445	se := api.SessionEntry{
446		Name:     fmt.Sprintf("Remote Exec via %s@%s", c.conf.localNode, c.conf.localDC),
447		Node:     node,
448		Checks:   []string{},
449		Behavior: api.SessionBehaviorDelete,
450		TTL:      rExecTTL,
451	}
452	id, _, err := session.CreateNoChecks(&se, nil)
453	return id, err
454}
455
456// renewSession is a long running routine that periodically renews
457// the session TTL. This is used for foreign sessions where we depend
458// on TTLs.
459func (c *cmd) renewSession(id string, stopCh chan struct{}) {
460	session := c.apiclient.Session()
461	for {
462		select {
463		case <-time.After(rExecRenewInterval):
464			_, _, err := session.Renew(id, nil)
465			if err != nil {
466				c.UI.Error(fmt.Sprintf("Session renew failed: %v", err))
467				return
468			}
469		case <-stopCh:
470			return
471		}
472	}
473}
474
475// destroySession is used to destroy the associated session
476func (c *cmd) destroySession() error {
477	// Stop the session renew if any
478	if c.stopCh != nil {
479		close(c.stopCh)
480		c.stopCh = nil
481	}
482
483	// Destroy the session explicitly
484	session := c.apiclient.Session()
485	_, err := session.Destroy(c.sessionID, nil)
486	return err
487}
488
489// makeRExecSpec creates a serialized job specification
490// that can be uploaded which will be parsed by agents to
491// determine what to do.
492func (c *cmd) makeRExecSpec() ([]byte, error) {
493	spec := &rExecSpec{
494		Command: c.conf.cmd,
495		Args:    c.conf.args,
496		Script:  c.conf.script,
497		Wait:    c.conf.wait,
498	}
499	return json.Marshal(spec)
500}
501
502// uploadPayload is used to upload the request payload
503func (c *cmd) uploadPayload(payload []byte) error {
504	kv := c.apiclient.KV()
505	pair := api.KVPair{
506		Key:     path.Join(c.conf.prefix, c.sessionID, rExecFileName),
507		Value:   payload,
508		Session: c.sessionID,
509	}
510	ok, _, err := kv.Acquire(&pair, nil)
511	if err != nil {
512		return err
513	}
514	if !ok {
515		return fmt.Errorf("failed to acquire key %s", pair.Key)
516	}
517	return nil
518}
519
520// destroyData is used to nuke all the data associated with
521// this remote exec. We just do a recursive delete of our
522// data directory.
523func (c *cmd) destroyData() error {
524	kv := c.apiclient.KV()
525	dir := path.Join(c.conf.prefix, c.sessionID)
526	_, err := kv.DeleteTree(dir, nil)
527	return err
528}
529
530// fireEvent is used to fire the event that will notify nodes
531// about the remote execution. Returns the event ID or error
532func (c *cmd) fireEvent() (string, error) {
533	// Create the user event payload
534	msg := &rExecEvent{
535		Prefix:  c.conf.prefix,
536		Session: c.sessionID,
537	}
538	buf, err := json.Marshal(msg)
539	if err != nil {
540		return "", err
541	}
542
543	// Format the user event
544	event := c.apiclient.Event()
545	params := &api.UserEvent{
546		Name:          "_rexec",
547		Payload:       buf,
548		NodeFilter:    c.conf.node,
549		ServiceFilter: c.conf.service,
550		TagFilter:     c.conf.tag,
551	}
552
553	// Fire the event
554	id, _, err := event.Fire(params, nil)
555	return id, err
556}
557
558const (
559	// rExecPrefix is the prefix in the KV store used to
560	// store the remote exec data
561	rExecPrefix = "_rexec"
562
563	// rExecFileName is the name of the file we append to
564	// the path, e.g. _rexec/session_id/job
565	rExecFileName = "job"
566
567	// rExecAck is the suffix added to an ack path
568	rExecAckSuffix = "/ack"
569
570	// rExecAck is the suffix added to an exit code
571	rExecExitSuffix = "/exit"
572
573	// rExecOutputDivider is used to namespace the output
574	rExecOutputDivider = "/out/"
575
576	// rExecReplicationWait is how long we wait for replication
577	rExecReplicationWait = 200 * time.Millisecond
578
579	// rExecQuietWait is how long we wait for no responses
580	// before assuming the job is done.
581	rExecQuietWait = 2 * time.Second
582
583	// rExecTTL is how long we default the session TTL to
584	rExecTTL = "15s"
585
586	// rExecRenewInterval is how often we renew the session TTL
587	// when doing an exec in a foreign DC.
588	rExecRenewInterval = 5 * time.Second
589)
590
591// rExecConf is used to pass around configuration
592type rExecConf struct {
593	prefix string
594	shell  bool
595
596	foreignDC bool
597	localDC   string
598	localNode string
599
600	node    string
601	service string
602	tag     string
603
604	wait     time.Duration
605	replWait time.Duration
606
607	cmd    string
608	args   []string
609	script []byte
610
611	verbose bool
612}
613
614// rExecEvent is the event we broadcast using a user-event
615type rExecEvent struct {
616	Prefix  string
617	Session string
618}
619
620// rExecSpec is the file we upload to specify the parameters
621// of the remote execution.
622type rExecSpec struct {
623	// Command is a single command to run directly in the shell
624	Command string `json:",omitempty"`
625
626	// Args is the list of arguments to run the subprocess directly
627	Args []string `json:",omitempty"`
628
629	// Script should be spilled to a file and executed
630	Script []byte `json:",omitempty"`
631
632	// Wait is how long we are waiting on a quiet period to terminate
633	Wait time.Duration
634}
635
636// rExecAck is used to transmit an acknowledgement
637type rExecAck struct {
638	Node string
639}
640
641// rExecHeart is used to transmit a heartbeat
642type rExecHeart struct {
643	Node string
644}
645
646// rExecOutput is used to transmit a chunk of output
647type rExecOutput struct {
648	Node   string
649	Output []byte
650}
651
652// rExecExit is used to transmit an exit code
653type rExecExit struct {
654	Node string
655	Code int
656}
657
658// TargetedUI is a UI that wraps another UI implementation and modifies
659// the output to indicate a specific target. Specifically, all Say output
660// is prefixed with the target name. Message output is not prefixed but
661// is offset by the length of the target so that output is lined up properly
662// with Say output. Machine-readable output has the proper target set.
663type TargetedUI struct {
664	Target string
665	UI     cli.Ui
666}
667
668func (u *TargetedUI) Ask(query string) (string, error) {
669	return u.UI.Ask(u.prefixLines(true, query))
670}
671
672func (u *TargetedUI) Info(message string) {
673	u.UI.Info(u.prefixLines(true, message))
674}
675
676func (u *TargetedUI) Output(message string) {
677	u.UI.Output(u.prefixLines(false, message))
678}
679
680func (u *TargetedUI) Error(message string) {
681	u.UI.Error(u.prefixLines(true, message))
682}
683
684func (u *TargetedUI) prefixLines(arrow bool, message string) string {
685	arrowText := "==>"
686	if !arrow {
687		arrowText = strings.Repeat(" ", len(arrowText))
688	}
689
690	var result bytes.Buffer
691
692	for _, line := range strings.Split(message, "\n") {
693		result.WriteString(fmt.Sprintf("%s %s: %s\n", arrowText, u.Target, line))
694	}
695
696	return strings.TrimRightFunc(result.String(), unicode.IsSpace)
697}
698