1package nomad
2
3import (
4	"fmt"
5	"time"
6
7	metrics "github.com/armon/go-metrics"
8	log "github.com/hashicorp/go-hclog"
9	memdb "github.com/hashicorp/go-memdb"
10
11	"github.com/hashicorp/nomad/acl"
12	"github.com/hashicorp/nomad/nomad/state"
13	"github.com/hashicorp/nomad/nomad/structs"
14)
15
16// Deployment endpoint is used for manipulating deployments
17type Deployment struct {
18	srv    *Server
19	logger log.Logger
20}
21
22// GetDeployment is used to request information about a specific deployment
23func (d *Deployment) GetDeployment(args *structs.DeploymentSpecificRequest,
24	reply *structs.SingleDeploymentResponse) error {
25	if done, err := d.srv.forward("Deployment.GetDeployment", args, args, reply); done {
26		return err
27	}
28	defer metrics.MeasureSince([]string{"nomad", "deployment", "get_deployment"}, time.Now())
29
30	// Check namespace read-job permissions
31	allowNsOp := acl.NamespaceValidator(acl.NamespaceCapabilityReadJob)
32	aclObj, err := d.srv.ResolveToken(args.AuthToken)
33	if err != nil {
34		return err
35	} else if !allowNsOp(aclObj, args.RequestNamespace()) {
36		return structs.ErrPermissionDenied
37	}
38
39	// Setup the blocking query
40	opts := blockingOptions{
41		queryOpts: &args.QueryOptions,
42		queryMeta: &reply.QueryMeta,
43		run: func(ws memdb.WatchSet, state *state.StateStore) error {
44			// Verify the arguments
45			if args.DeploymentID == "" {
46				return fmt.Errorf("missing deployment ID")
47			}
48
49			// Look for the deployment
50			out, err := state.DeploymentByID(ws, args.DeploymentID)
51			if err != nil {
52				return err
53			}
54
55			// Re-check namespace in case it differs from request.
56			if out != nil && !allowNsOp(aclObj, out.Namespace) {
57				// hide this deployment, caller is not authorized to view it
58				out = nil
59			}
60
61			// Setup the output
62			reply.Deployment = out
63			if out != nil {
64				reply.Index = out.ModifyIndex
65			} else {
66				// Use the last index that affected the deployments table
67				index, err := state.Index("deployment")
68				if err != nil {
69					return err
70				}
71				reply.Index = index
72			}
73
74			// Set the query response
75			d.srv.setQueryMeta(&reply.QueryMeta)
76			return nil
77		}}
78	return d.srv.blockingRPC(&opts)
79}
80
81// Fail is used to force fail a deployment
82func (d *Deployment) Fail(args *structs.DeploymentFailRequest, reply *structs.DeploymentUpdateResponse) error {
83	if done, err := d.srv.forward("Deployment.Fail", args, args, reply); done {
84		return err
85	}
86	defer metrics.MeasureSince([]string{"nomad", "deployment", "fail"}, time.Now())
87
88	// Validate the arguments
89	if args.DeploymentID == "" {
90		return fmt.Errorf("missing deployment ID")
91	}
92
93	// Lookup the deployment
94	snap, err := d.srv.fsm.State().Snapshot()
95	if err != nil {
96		return err
97	}
98
99	ws := memdb.NewWatchSet()
100	deploy, err := snap.DeploymentByID(ws, args.DeploymentID)
101	if err != nil {
102		return err
103	}
104	if deploy == nil {
105		return fmt.Errorf("deployment not found")
106	}
107
108	// Check namespace submit-job permissions
109	if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
110		return err
111	} else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) {
112		return structs.ErrPermissionDenied
113	}
114
115	if !deploy.Active() {
116		return structs.ErrDeploymentTerminalNoFail
117	}
118
119	// Call into the deployment watcher
120	return d.srv.deploymentWatcher.FailDeployment(args, reply)
121}
122
123// Pause is used to pause a deployment
124func (d *Deployment) Pause(args *structs.DeploymentPauseRequest, reply *structs.DeploymentUpdateResponse) error {
125	if done, err := d.srv.forward("Deployment.Pause", args, args, reply); done {
126		return err
127	}
128	defer metrics.MeasureSince([]string{"nomad", "deployment", "pause"}, time.Now())
129
130	// Validate the arguments
131	if args.DeploymentID == "" {
132		return fmt.Errorf("missing deployment ID")
133	}
134
135	// Lookup the deployment
136	snap, err := d.srv.fsm.State().Snapshot()
137	if err != nil {
138		return err
139	}
140
141	ws := memdb.NewWatchSet()
142	deploy, err := snap.DeploymentByID(ws, args.DeploymentID)
143	if err != nil {
144		return err
145	}
146	if deploy == nil {
147		return fmt.Errorf("deployment not found")
148	}
149
150	// Check namespace submit-job permissions
151	if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
152		return err
153	} else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) {
154		return structs.ErrPermissionDenied
155	}
156
157	if !deploy.Active() {
158		if args.Pause {
159			return structs.ErrDeploymentTerminalNoPause
160		}
161
162		return structs.ErrDeploymentTerminalNoResume
163	}
164
165	// Call into the deployment watcher
166	return d.srv.deploymentWatcher.PauseDeployment(args, reply)
167}
168
169// Promote is used to promote canaries in a deployment
170func (d *Deployment) Promote(args *structs.DeploymentPromoteRequest, reply *structs.DeploymentUpdateResponse) error {
171	if done, err := d.srv.forward("Deployment.Promote", args, args, reply); done {
172		return err
173	}
174	defer metrics.MeasureSince([]string{"nomad", "deployment", "promote"}, time.Now())
175
176	// Validate the arguments
177	if args.DeploymentID == "" {
178		return fmt.Errorf("missing deployment ID")
179	}
180
181	// Lookup the deployment
182	snap, err := d.srv.fsm.State().Snapshot()
183	if err != nil {
184		return err
185	}
186
187	ws := memdb.NewWatchSet()
188	deploy, err := snap.DeploymentByID(ws, args.DeploymentID)
189	if err != nil {
190		return err
191	}
192	if deploy == nil {
193		return fmt.Errorf("deployment not found")
194	}
195
196	// Check namespace submit-job permissions
197	if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
198		return err
199	} else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) {
200		return structs.ErrPermissionDenied
201	}
202
203	if !deploy.Active() {
204		return structs.ErrDeploymentTerminalNoPromote
205	}
206
207	// Call into the deployment watcher
208	return d.srv.deploymentWatcher.PromoteDeployment(args, reply)
209}
210
211// Run is used to start a pending deployment
212func (d *Deployment) Run(args *structs.DeploymentRunRequest, reply *structs.DeploymentUpdateResponse) error {
213	if done, err := d.srv.forward("Deployment.Run", args, args, reply); done {
214		return err
215	}
216	defer metrics.MeasureSince([]string{"nomad", "deployment", "run"}, time.Now())
217
218	// Validate the arguments
219	if args.DeploymentID == "" {
220		return fmt.Errorf("missing deployment ID")
221	}
222
223	// Lookup the deployment
224	snap, err := d.srv.fsm.State().Snapshot()
225	if err != nil {
226		return err
227	}
228
229	ws := memdb.NewWatchSet()
230	deploy, err := snap.DeploymentByID(ws, args.DeploymentID)
231	if err != nil {
232		return err
233	}
234	if deploy == nil {
235		return fmt.Errorf("deployment not found")
236	}
237
238	// Check namespace submit-job permissions
239	if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
240		return err
241	} else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) {
242		return structs.ErrPermissionDenied
243	}
244
245	if !deploy.Active() {
246		return structs.ErrDeploymentTerminalNoRun
247	}
248
249	// Call into the deployment watcher
250	return d.srv.deploymentWatcher.RunDeployment(args, reply)
251}
252
253// Unblock is used to unblock a deployment
254func (d *Deployment) Unblock(args *structs.DeploymentUnblockRequest, reply *structs.DeploymentUpdateResponse) error {
255	if done, err := d.srv.forward("Deployment.Unblock", args, args, reply); done {
256		return err
257	}
258	defer metrics.MeasureSince([]string{"nomad", "deployment", "unblock"}, time.Now())
259
260	// Validate the arguments
261	if args.DeploymentID == "" {
262		return fmt.Errorf("missing deployment ID")
263	}
264
265	// Lookup the deployment
266	snap, err := d.srv.fsm.State().Snapshot()
267	if err != nil {
268		return err
269	}
270
271	ws := memdb.NewWatchSet()
272	deploy, err := snap.DeploymentByID(ws, args.DeploymentID)
273	if err != nil {
274		return err
275	}
276	if deploy == nil {
277		return fmt.Errorf("deployment not found")
278	}
279
280	// Check namespace submit-job permissions
281	if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
282		return err
283	} else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) {
284		return structs.ErrPermissionDenied
285	}
286
287	if !deploy.Active() {
288		return structs.ErrDeploymentTerminalNoUnblock
289	}
290
291	// Call into the deployment watcher
292	return d.srv.deploymentWatcher.UnblockDeployment(args, reply)
293}
294
295// Cancel is used to cancel a deployment
296func (d *Deployment) Cancel(args *structs.DeploymentCancelRequest, reply *structs.DeploymentUpdateResponse) error {
297	if done, err := d.srv.forward("Deployment.Cancel", args, args, reply); done {
298		return err
299	}
300	defer metrics.MeasureSince([]string{"nomad", "deployment", "cancel"}, time.Now())
301
302	// Validate the arguments
303	if args.DeploymentID == "" {
304		return fmt.Errorf("missing deployment ID")
305	}
306
307	// Lookup the deployment
308	snap, err := d.srv.fsm.State().Snapshot()
309	if err != nil {
310		return err
311	}
312
313	ws := memdb.NewWatchSet()
314	deploy, err := snap.DeploymentByID(ws, args.DeploymentID)
315	if err != nil {
316		return err
317	}
318	if deploy == nil {
319		return fmt.Errorf("deployment not found")
320	}
321
322	// Check namespace submit-job permissions
323	if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
324		return err
325	} else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) {
326		return structs.ErrPermissionDenied
327	}
328
329	if !deploy.Active() {
330		return structs.ErrDeploymentTerminalNoCancel
331	}
332
333	// Call into the deployment watcher
334	return d.srv.deploymentWatcher.CancelDeployment(args, reply)
335}
336
337// SetAllocHealth is used to set the health of allocations that are part of the
338// deployment.
339func (d *Deployment) SetAllocHealth(args *structs.DeploymentAllocHealthRequest, reply *structs.DeploymentUpdateResponse) error {
340	if done, err := d.srv.forward("Deployment.SetAllocHealth", args, args, reply); done {
341		return err
342	}
343	defer metrics.MeasureSince([]string{"nomad", "deployment", "set_alloc_health"}, time.Now())
344
345	// Validate the arguments
346	if args.DeploymentID == "" {
347		return fmt.Errorf("missing deployment ID")
348	}
349
350	if len(args.HealthyAllocationIDs)+len(args.UnhealthyAllocationIDs) == 0 {
351		return fmt.Errorf("must specify at least one healthy/unhealthy allocation ID")
352	}
353
354	// Lookup the deployment
355	snap, err := d.srv.fsm.State().Snapshot()
356	if err != nil {
357		return err
358	}
359
360	ws := memdb.NewWatchSet()
361	deploy, err := snap.DeploymentByID(ws, args.DeploymentID)
362	if err != nil {
363		return err
364	}
365	if deploy == nil {
366		return fmt.Errorf("deployment not found")
367	}
368
369	// Check namespace submit-job permissions
370	if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
371		return err
372	} else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) {
373		return structs.ErrPermissionDenied
374	}
375
376	if !deploy.Active() {
377		return structs.ErrDeploymentTerminalNoSetHealth
378	}
379
380	// Call into the deployment watcher
381	return d.srv.deploymentWatcher.SetAllocHealth(args, reply)
382}
383
384// List returns the list of deployments in the system
385func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error {
386	if done, err := d.srv.forward("Deployment.List", args, args, reply); done {
387		return err
388	}
389	defer metrics.MeasureSince([]string{"nomad", "deployment", "list"}, time.Now())
390
391	// Check namespace read-job permissions against request namespace since
392	// results are filtered by request namespace.
393	if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
394		return err
395	} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
396		return structs.ErrPermissionDenied
397	}
398
399	// Setup the blocking query
400	opts := blockingOptions{
401		queryOpts: &args.QueryOptions,
402		queryMeta: &reply.QueryMeta,
403		run: func(ws memdb.WatchSet, state *state.StateStore) error {
404			// Capture all the deployments
405			var err error
406			var iter memdb.ResultIterator
407			if prefix := args.QueryOptions.Prefix; prefix != "" {
408				iter, err = state.DeploymentsByIDPrefix(ws, args.RequestNamespace(), prefix)
409			} else {
410				iter, err = state.DeploymentsByNamespace(ws, args.RequestNamespace())
411			}
412			if err != nil {
413				return err
414			}
415
416			var deploys []*structs.Deployment
417			for {
418				raw := iter.Next()
419				if raw == nil {
420					break
421				}
422				deploy := raw.(*structs.Deployment)
423				deploys = append(deploys, deploy)
424			}
425			reply.Deployments = deploys
426
427			// Use the last index that affected the deployment table
428			index, err := state.Index("deployment")
429			if err != nil {
430				return err
431			}
432			reply.Index = index
433
434			// Set the query response
435			d.srv.setQueryMeta(&reply.QueryMeta)
436			return nil
437		}}
438	return d.srv.blockingRPC(&opts)
439}
440
441// Allocations returns the list of allocations that are a part of the deployment
442func (d *Deployment) Allocations(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error {
443	if done, err := d.srv.forward("Deployment.Allocations", args, args, reply); done {
444		return err
445	}
446	defer metrics.MeasureSince([]string{"nomad", "deployment", "allocations"}, time.Now())
447
448	// Check namespace read-job permissions against the request namespace.
449	// Must re-check against the alloc namespace when they return to ensure
450	// there's no namespace mismatch.
451	allowNsOp := acl.NamespaceValidator(acl.NamespaceCapabilityReadJob)
452	aclObj, err := d.srv.ResolveToken(args.AuthToken)
453	if err != nil {
454		return err
455	} else if !allowNsOp(aclObj, args.RequestNamespace()) {
456		return structs.ErrPermissionDenied
457	}
458
459	// Setup the blocking query
460	opts := blockingOptions{
461		queryOpts: &args.QueryOptions,
462		queryMeta: &reply.QueryMeta,
463		run: func(ws memdb.WatchSet, state *state.StateStore) error {
464			// Capture all the allocations
465			allocs, err := state.AllocsByDeployment(ws, args.DeploymentID)
466			if err != nil {
467				return err
468			}
469
470			// Deployments do not span namespaces so just check the
471			// first allocs namespace.
472			if len(allocs) > 0 {
473				ns := allocs[0].Namespace
474				if ns != args.RequestNamespace() && !allowNsOp(aclObj, ns) {
475					return structs.ErrPermissionDenied
476				}
477			}
478
479			stubs := make([]*structs.AllocListStub, 0, len(allocs))
480			for _, alloc := range allocs {
481				stubs = append(stubs, alloc.Stub(nil))
482			}
483			reply.Allocations = stubs
484
485			// Use the last index that affected the jobs table
486			index, err := state.Index("allocs")
487			if err != nil {
488				return err
489			}
490			reply.Index = index
491
492			// Set the query response
493			d.srv.setQueryMeta(&reply.QueryMeta)
494			return nil
495		}}
496	return d.srv.blockingRPC(&opts)
497}
498
499// Reap is used to cleanup terminal deployments
500func (d *Deployment) Reap(args *structs.DeploymentDeleteRequest,
501	reply *structs.GenericResponse) error {
502	if done, err := d.srv.forward("Deployment.Reap", args, args, reply); done {
503		return err
504	}
505	defer metrics.MeasureSince([]string{"nomad", "deployment", "reap"}, time.Now())
506
507	// Update via Raft
508	_, index, err := d.srv.raftApply(structs.DeploymentDeleteRequestType, args)
509	if err != nil {
510		return err
511	}
512
513	// Update the index
514	reply.Index = index
515	return nil
516}
517