1package command
2
3import (
4	"fmt"
5	"sort"
6	"strings"
7	"sync"
8	"time"
9
10	"github.com/hashicorp/nomad/api"
11	"github.com/hashicorp/nomad/nomad/structs"
12	"github.com/mitchellh/cli"
13)
14
15const (
16	// updateWait is the amount of time to wait between status
17	// updates. Because the monitor is poll-based, we use this
18	// delay to avoid overwhelming the API server.
19	updateWait = time.Second
20)
21
22// evalState is used to store the current "state of the world"
23// in the context of monitoring an evaluation.
24type evalState struct {
25	status     string
26	desc       string
27	node       string
28	deployment string
29	job        string
30	allocs     map[string]*allocState
31	wait       time.Duration
32	index      uint64
33}
34
35// newEvalState creates and initializes a new monitorState
36func newEvalState() *evalState {
37	return &evalState{
38		status: structs.EvalStatusPending,
39		allocs: make(map[string]*allocState),
40	}
41}
42
43// allocState is used to track the state of an allocation
44type allocState struct {
45	id          string
46	group       string
47	node        string
48	desired     string
49	desiredDesc string
50	client      string
51	clientDesc  string
52	index       uint64
53}
54
55// monitor wraps an evaluation monitor and holds metadata and
56// state information.
57type monitor struct {
58	ui     cli.Ui
59	client *api.Client
60	state  *evalState
61
62	// length determines the number of characters for identifiers in the ui.
63	length int
64
65	sync.Mutex
66}
67
68// newMonitor returns a new monitor. The returned monitor will
69// write output information to the provided ui. The length parameter determines
70// the number of characters for identifiers in the ui.
71func newMonitor(ui cli.Ui, client *api.Client, length int) *monitor {
72	if colorUi, ok := ui.(*cli.ColoredUi); ok {
73		// Disable Info color for monitored output
74		ui = &cli.ColoredUi{
75			ErrorColor: colorUi.ErrorColor,
76			WarnColor:  colorUi.WarnColor,
77			InfoColor:  cli.UiColorNone,
78			Ui:         colorUi.Ui,
79		}
80	}
81	mon := &monitor{
82		ui: &cli.PrefixedUi{
83			InfoPrefix:   "==> ",
84			OutputPrefix: "    ",
85			ErrorPrefix:  "==> ",
86			Ui:           ui,
87		},
88		client: client,
89		state:  newEvalState(),
90		length: length,
91	}
92	return mon
93}
94
95// update is used to update our monitor with new state. It can be
96// called whether the passed information is new or not, and will
97// only dump update messages when state changes.
98func (m *monitor) update(update *evalState) {
99	m.Lock()
100	defer m.Unlock()
101
102	existing := m.state
103
104	// Swap in the new state at the end
105	defer func() {
106		m.state = update
107	}()
108
109	// Check if the evaluation was triggered by a node
110	if existing.node == "" && update.node != "" {
111		m.ui.Output(fmt.Sprintf("Evaluation triggered by node %q",
112			limit(update.node, m.length)))
113	}
114
115	// Check if the evaluation was triggered by a job
116	if existing.job == "" && update.job != "" {
117		m.ui.Output(fmt.Sprintf("Evaluation triggered by job %q", update.job))
118	}
119
120	// Check if the evaluation was triggered by a deployment
121	if existing.deployment == "" && update.deployment != "" {
122		m.ui.Output(fmt.Sprintf("Evaluation within deployment: %q", limit(update.deployment, m.length)))
123	}
124
125	// Check the allocations
126	for allocID, alloc := range update.allocs {
127		if existing, ok := existing.allocs[allocID]; !ok {
128			switch {
129			case alloc.index < update.index:
130				// New alloc with create index lower than the eval
131				// create index indicates modification
132				m.ui.Output(fmt.Sprintf(
133					"Allocation %q modified: node %q, group %q",
134					limit(alloc.id, m.length), limit(alloc.node, m.length), alloc.group))
135
136			case alloc.desired == structs.AllocDesiredStatusRun:
137				// New allocation with desired status running
138				m.ui.Output(fmt.Sprintf(
139					"Allocation %q created: node %q, group %q",
140					limit(alloc.id, m.length), limit(alloc.node, m.length), alloc.group))
141			}
142		} else {
143			switch {
144			case existing.client != alloc.client:
145				description := ""
146				if alloc.clientDesc != "" {
147					description = fmt.Sprintf(" (%s)", alloc.clientDesc)
148				}
149				// Allocation status has changed
150				m.ui.Output(fmt.Sprintf(
151					"Allocation %q status changed: %q -> %q%s",
152					limit(alloc.id, m.length), existing.client, alloc.client, description))
153			}
154		}
155	}
156
157	// Check if the status changed. We skip any transitions to pending status.
158	if existing.status != "" &&
159		update.status != structs.AllocClientStatusPending &&
160		existing.status != update.status {
161		m.ui.Output(fmt.Sprintf("Evaluation status changed: %q -> %q",
162			existing.status, update.status))
163	}
164}
165
166// monitor is used to start monitoring the given evaluation ID. It
167// writes output directly to the monitor's ui, and returns the
168// exit code for the command. If allowPrefix is false, monitor will only accept
169// exact matching evalIDs.
170//
171// The return code will be 0 on successful evaluation. If there are
172// problems scheduling the job (impossible constraints, resources
173// exhausted, etc), then the return code will be 2. For any other
174// failures (API connectivity, internal errors, etc), the return code
175// will be 1.
176func (m *monitor) monitor(evalID string, allowPrefix bool) int {
177	// Track if we encounter a scheduling failure. This can only be
178	// detected while querying allocations, so we use this bool to
179	// carry that status into the return code.
180	var schedFailure bool
181
182	// The user may have specified a prefix as eval id. We need to lookup the
183	// full id from the database first. Since we do this in a loop we need a
184	// variable to keep track if we've already written the header message.
185	var headerWritten bool
186
187	// Add the initial pending state
188	m.update(newEvalState())
189
190	for {
191		// Query the evaluation
192		eval, _, err := m.client.Evaluations().Info(evalID, nil)
193		if err != nil {
194			if !allowPrefix {
195				m.ui.Error(fmt.Sprintf("No evaluation with id %q found", evalID))
196				return 1
197			}
198			if len(evalID) == 1 {
199				m.ui.Error(fmt.Sprintf("Identifier must contain at least two characters."))
200				return 1
201			}
202
203			evalID = sanitizeUUIDPrefix(evalID)
204			evals, _, err := m.client.Evaluations().PrefixList(evalID)
205			if err != nil {
206				m.ui.Error(fmt.Sprintf("Error reading evaluation: %s", err))
207				return 1
208			}
209			if len(evals) == 0 {
210				m.ui.Error(fmt.Sprintf("No evaluation(s) with prefix or id %q found", evalID))
211				return 1
212			}
213			if len(evals) > 1 {
214				// Format the evaluations
215				out := make([]string, len(evals)+1)
216				out[0] = "ID|Priority|Type|Triggered By|Status"
217				for i, eval := range evals {
218					out[i+1] = fmt.Sprintf("%s|%d|%s|%s|%s",
219						limit(eval.ID, m.length),
220						eval.Priority,
221						eval.Type,
222						eval.TriggeredBy,
223						eval.Status)
224				}
225				m.ui.Output(fmt.Sprintf("Prefix matched multiple evaluations\n\n%s", formatList(out)))
226				return 0
227			}
228			// Prefix lookup matched a single evaluation
229			eval, _, err = m.client.Evaluations().Info(evals[0].ID, nil)
230			if err != nil {
231				m.ui.Error(fmt.Sprintf("Error reading evaluation: %s", err))
232			}
233		}
234
235		if !headerWritten {
236			m.ui.Info(fmt.Sprintf("Monitoring evaluation %q", limit(eval.ID, m.length)))
237			headerWritten = true
238		}
239
240		// Create the new eval state.
241		state := newEvalState()
242		state.status = eval.Status
243		state.desc = eval.StatusDescription
244		state.node = eval.NodeID
245		state.job = eval.JobID
246		state.deployment = eval.DeploymentID
247		state.wait = eval.Wait
248		state.index = eval.CreateIndex
249
250		// Query the allocations associated with the evaluation
251		allocs, _, err := m.client.Evaluations().Allocations(eval.ID, nil)
252		if err != nil {
253			m.ui.Error(fmt.Sprintf("Error reading allocations: %s", err))
254			return 1
255		}
256
257		// Add the allocs to the state
258		for _, alloc := range allocs {
259			state.allocs[alloc.ID] = &allocState{
260				id:          alloc.ID,
261				group:       alloc.TaskGroup,
262				node:        alloc.NodeID,
263				desired:     alloc.DesiredStatus,
264				desiredDesc: alloc.DesiredDescription,
265				client:      alloc.ClientStatus,
266				clientDesc:  alloc.ClientDescription,
267				index:       alloc.CreateIndex,
268			}
269		}
270
271		// Update the state
272		m.update(state)
273
274		switch eval.Status {
275		case structs.EvalStatusComplete, structs.EvalStatusFailed, structs.EvalStatusCancelled:
276			if len(eval.FailedTGAllocs) == 0 {
277				m.ui.Info(fmt.Sprintf("Evaluation %q finished with status %q",
278					limit(eval.ID, m.length), eval.Status))
279			} else {
280				// There were failures making the allocations
281				schedFailure = true
282				m.ui.Info(fmt.Sprintf("Evaluation %q finished with status %q but failed to place all allocations:",
283					limit(eval.ID, m.length), eval.Status))
284
285				// Print the failures per task group
286				for tg, metrics := range eval.FailedTGAllocs {
287					noun := "allocation"
288					if metrics.CoalescedFailures > 0 {
289						noun += "s"
290					}
291					m.ui.Output(fmt.Sprintf("Task Group %q (failed to place %d %s):", tg, metrics.CoalescedFailures+1, noun))
292					metrics := formatAllocMetrics(metrics, false, "  ")
293					for _, line := range strings.Split(metrics, "\n") {
294						m.ui.Output(line)
295					}
296				}
297
298				if eval.BlockedEval != "" {
299					m.ui.Output(fmt.Sprintf("Evaluation %q waiting for additional capacity to place remainder",
300						limit(eval.BlockedEval, m.length)))
301				}
302			}
303		default:
304			// Wait for the next update
305			time.Sleep(updateWait)
306			continue
307		}
308
309		// Monitor the next eval in the chain, if present
310		if eval.NextEval != "" {
311			if eval.Wait.Nanoseconds() != 0 {
312				m.ui.Info(fmt.Sprintf(
313					"Monitoring next evaluation %q in %s",
314					limit(eval.NextEval, m.length), eval.Wait))
315
316				// Skip some unnecessary polling
317				time.Sleep(eval.Wait)
318			}
319
320			// Reset the state and monitor the new eval
321			m.state = newEvalState()
322			return m.monitor(eval.NextEval, allowPrefix)
323		}
324		break
325	}
326
327	// Treat scheduling failures specially using a dedicated exit code.
328	// This makes it easier to detect failures from the CLI.
329	if schedFailure {
330		return 2
331	}
332
333	return 0
334}
335
336func formatAllocMetrics(metrics *api.AllocationMetric, scores bool, prefix string) string {
337	// Print a helpful message if we have an eligibility problem
338	var out string
339	if metrics.NodesEvaluated == 0 {
340		out += fmt.Sprintf("%s* No nodes were eligible for evaluation\n", prefix)
341	}
342
343	// Print a helpful message if the user has asked for a DC that has no
344	// available nodes.
345	for dc, available := range metrics.NodesAvailable {
346		if available == 0 {
347			out += fmt.Sprintf("%s* No nodes are available in datacenter %q\n", prefix, dc)
348		}
349	}
350
351	// Print filter info
352	for class, num := range metrics.ClassFiltered {
353		out += fmt.Sprintf("%s* Class %q filtered %d nodes\n", prefix, class, num)
354	}
355	for cs, num := range metrics.ConstraintFiltered {
356		out += fmt.Sprintf("%s* Constraint %q filtered %d nodes\n", prefix, cs, num)
357	}
358
359	// Print exhaustion info
360	if ne := metrics.NodesExhausted; ne > 0 {
361		out += fmt.Sprintf("%s* Resources exhausted on %d nodes\n", prefix, ne)
362	}
363	for class, num := range metrics.ClassExhausted {
364		out += fmt.Sprintf("%s* Class %q exhausted on %d nodes\n", prefix, class, num)
365	}
366	for dim, num := range metrics.DimensionExhausted {
367		out += fmt.Sprintf("%s* Dimension %q exhausted on %d nodes\n", prefix, dim, num)
368	}
369
370	// Print quota info
371	for _, dim := range metrics.QuotaExhausted {
372		out += fmt.Sprintf("%s* Quota limit hit %q\n", prefix, dim)
373	}
374
375	// Print scores
376	if scores {
377		if len(metrics.ScoreMetaData) > 0 {
378			scoreOutput := make([]string, len(metrics.ScoreMetaData)+1)
379			var scorerNames []string
380			for i, scoreMeta := range metrics.ScoreMetaData {
381				// Add header as first row
382				if i == 0 {
383					scoreOutput[0] = "Node|"
384
385					// sort scores alphabetically
386					scores := make([]string, 0, len(scoreMeta.Scores))
387					for score := range scoreMeta.Scores {
388						scores = append(scores, score)
389					}
390					sort.Strings(scores)
391
392					// build score header output
393					for _, scorerName := range scores {
394						scoreOutput[0] += fmt.Sprintf("%v|", scorerName)
395						scorerNames = append(scorerNames, scorerName)
396					}
397					scoreOutput[0] += "final score"
398				}
399				scoreOutput[i+1] = fmt.Sprintf("%v|", scoreMeta.NodeID)
400				for _, scorerName := range scorerNames {
401					scoreVal := scoreMeta.Scores[scorerName]
402					scoreOutput[i+1] += fmt.Sprintf("%.3g|", scoreVal)
403				}
404				scoreOutput[i+1] += fmt.Sprintf("%.3g", scoreMeta.NormScore)
405			}
406			out += formatList(scoreOutput)
407		} else {
408			// Backwards compatibility for old allocs
409			for name, score := range metrics.Scores {
410				out += fmt.Sprintf("%s* Score %q = %f\n", prefix, name, score)
411			}
412		}
413	}
414
415	out = strings.TrimSuffix(out, "\n")
416	return out
417}
418