1package nomad
2
3import (
4	"fmt"
5	"time"
6
7	metrics "github.com/armon/go-metrics"
8	log "github.com/hashicorp/go-hclog"
9	memdb "github.com/hashicorp/go-memdb"
10
11	"github.com/hashicorp/nomad/acl"
12	"github.com/hashicorp/nomad/nomad/state"
13	"github.com/hashicorp/nomad/nomad/structs"
14)
15
16// Deployment endpoint is used for manipulating deployments
17type Deployment struct {
18	srv    *Server
19	logger log.Logger
20}
21
22// GetDeployment is used to request information about a specific deployment
23func (d *Deployment) GetDeployment(args *structs.DeploymentSpecificRequest,
24	reply *structs.SingleDeploymentResponse) error {
25	if done, err := d.srv.forward("Deployment.GetDeployment", args, args, reply); done {
26		return err
27	}
28	defer metrics.MeasureSince([]string{"nomad", "deployment", "get_deployment"}, time.Now())
29
30	// Check namespace read-job permissions
31	allowNsOp := acl.NamespaceValidator(acl.NamespaceCapabilityReadJob)
32	aclObj, err := d.srv.ResolveToken(args.AuthToken)
33	if err != nil {
34		return err
35	} else if !allowNsOp(aclObj, args.RequestNamespace()) {
36		return structs.ErrPermissionDenied
37	}
38
39	// Setup the blocking query
40	opts := blockingOptions{
41		queryOpts: &args.QueryOptions,
42		queryMeta: &reply.QueryMeta,
43		run: func(ws memdb.WatchSet, state *state.StateStore) error {
44			// Verify the arguments
45			if args.DeploymentID == "" {
46				return fmt.Errorf("missing deployment ID")
47			}
48
49			// Look for the deployment
50			out, err := state.DeploymentByID(ws, args.DeploymentID)
51			if err != nil {
52				return err
53			}
54
55			// Setup the output
56			reply.Deployment = out
57			if out != nil {
58				// Re-check namespace in case it differs from request.
59				if !allowNsOp(aclObj, out.Namespace) {
60					return structs.NewErrUnknownAllocation(args.DeploymentID)
61				}
62
63				reply.Index = out.ModifyIndex
64			} else {
65				// Use the last index that affected the deployments table
66				index, err := state.Index("deployment")
67				if err != nil {
68					return err
69				}
70				reply.Index = index
71			}
72
73			// Set the query response
74			d.srv.setQueryMeta(&reply.QueryMeta)
75			return nil
76		}}
77	return d.srv.blockingRPC(&opts)
78}
79
80// Fail is used to force fail a deployment
81func (d *Deployment) Fail(args *structs.DeploymentFailRequest, reply *structs.DeploymentUpdateResponse) error {
82	if done, err := d.srv.forward("Deployment.Fail", args, args, reply); done {
83		return err
84	}
85	defer metrics.MeasureSince([]string{"nomad", "deployment", "fail"}, time.Now())
86
87	// Validate the arguments
88	if args.DeploymentID == "" {
89		return fmt.Errorf("missing deployment ID")
90	}
91
92	// Lookup the deployment
93	snap, err := d.srv.fsm.State().Snapshot()
94	if err != nil {
95		return err
96	}
97
98	ws := memdb.NewWatchSet()
99	deploy, err := snap.DeploymentByID(ws, args.DeploymentID)
100	if err != nil {
101		return err
102	}
103	if deploy == nil {
104		return fmt.Errorf("deployment not found")
105	}
106
107	// Check namespace submit-job permissions
108	if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
109		return err
110	} else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) {
111		return structs.ErrPermissionDenied
112	}
113
114	if !deploy.Active() {
115		return fmt.Errorf("can't fail terminal deployment")
116	}
117
118	// Call into the deployment watcher
119	return d.srv.deploymentWatcher.FailDeployment(args, reply)
120}
121
122// Pause is used to pause a deployment
123func (d *Deployment) Pause(args *structs.DeploymentPauseRequest, reply *structs.DeploymentUpdateResponse) error {
124	if done, err := d.srv.forward("Deployment.Pause", args, args, reply); done {
125		return err
126	}
127	defer metrics.MeasureSince([]string{"nomad", "deployment", "pause"}, time.Now())
128
129	// Validate the arguments
130	if args.DeploymentID == "" {
131		return fmt.Errorf("missing deployment ID")
132	}
133
134	// Lookup the deployment
135	snap, err := d.srv.fsm.State().Snapshot()
136	if err != nil {
137		return err
138	}
139
140	ws := memdb.NewWatchSet()
141	deploy, err := snap.DeploymentByID(ws, args.DeploymentID)
142	if err != nil {
143		return err
144	}
145	if deploy == nil {
146		return fmt.Errorf("deployment not found")
147	}
148
149	// Check namespace submit-job permissions
150	if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
151		return err
152	} else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) {
153		return structs.ErrPermissionDenied
154	}
155
156	if !deploy.Active() {
157		if args.Pause {
158			return fmt.Errorf("can't pause terminal deployment")
159		}
160
161		return fmt.Errorf("can't resume terminal deployment")
162	}
163
164	// Call into the deployment watcher
165	return d.srv.deploymentWatcher.PauseDeployment(args, reply)
166}
167
168// Promote is used to promote canaries in a deployment
169func (d *Deployment) Promote(args *structs.DeploymentPromoteRequest, reply *structs.DeploymentUpdateResponse) error {
170	if done, err := d.srv.forward("Deployment.Promote", args, args, reply); done {
171		return err
172	}
173	defer metrics.MeasureSince([]string{"nomad", "deployment", "promote"}, time.Now())
174
175	// Validate the arguments
176	if args.DeploymentID == "" {
177		return fmt.Errorf("missing deployment ID")
178	}
179
180	// Lookup the deployment
181	snap, err := d.srv.fsm.State().Snapshot()
182	if err != nil {
183		return err
184	}
185
186	ws := memdb.NewWatchSet()
187	deploy, err := snap.DeploymentByID(ws, args.DeploymentID)
188	if err != nil {
189		return err
190	}
191	if deploy == nil {
192		return fmt.Errorf("deployment not found")
193	}
194
195	// Check namespace submit-job permissions
196	if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
197		return err
198	} else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) {
199		return structs.ErrPermissionDenied
200	}
201
202	if !deploy.Active() {
203		return fmt.Errorf("can't promote terminal deployment")
204	}
205
206	// Call into the deployment watcher
207	return d.srv.deploymentWatcher.PromoteDeployment(args, reply)
208}
209
210// SetAllocHealth is used to set the health of allocations that are part of the
211// deployment.
212func (d *Deployment) SetAllocHealth(args *structs.DeploymentAllocHealthRequest, reply *structs.DeploymentUpdateResponse) error {
213	if done, err := d.srv.forward("Deployment.SetAllocHealth", args, args, reply); done {
214		return err
215	}
216	defer metrics.MeasureSince([]string{"nomad", "deployment", "set_alloc_health"}, time.Now())
217
218	// Validate the arguments
219	if args.DeploymentID == "" {
220		return fmt.Errorf("missing deployment ID")
221	}
222
223	if len(args.HealthyAllocationIDs)+len(args.UnhealthyAllocationIDs) == 0 {
224		return fmt.Errorf("must specify at least one healthy/unhealthy allocation ID")
225	}
226
227	// Lookup the deployment
228	snap, err := d.srv.fsm.State().Snapshot()
229	if err != nil {
230		return err
231	}
232
233	ws := memdb.NewWatchSet()
234	deploy, err := snap.DeploymentByID(ws, args.DeploymentID)
235	if err != nil {
236		return err
237	}
238	if deploy == nil {
239		return fmt.Errorf("deployment not found")
240	}
241
242	// Check namespace submit-job permissions
243	if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
244		return err
245	} else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) {
246		return structs.ErrPermissionDenied
247	}
248
249	if !deploy.Active() {
250		return fmt.Errorf("can't set health of allocations for a terminal deployment")
251	}
252
253	// Call into the deployment watcher
254	return d.srv.deploymentWatcher.SetAllocHealth(args, reply)
255}
256
257// List returns the list of deployments in the system
258func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error {
259	if done, err := d.srv.forward("Deployment.List", args, args, reply); done {
260		return err
261	}
262	defer metrics.MeasureSince([]string{"nomad", "deployment", "list"}, time.Now())
263
264	// Check namespace read-job permissions against request namespace since
265	// results are filtered by request namespace.
266	if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil {
267		return err
268	} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
269		return structs.ErrPermissionDenied
270	}
271
272	// Setup the blocking query
273	opts := blockingOptions{
274		queryOpts: &args.QueryOptions,
275		queryMeta: &reply.QueryMeta,
276		run: func(ws memdb.WatchSet, state *state.StateStore) error {
277			// Capture all the deployments
278			var err error
279			var iter memdb.ResultIterator
280			if prefix := args.QueryOptions.Prefix; prefix != "" {
281				iter, err = state.DeploymentsByIDPrefix(ws, args.RequestNamespace(), prefix)
282			} else {
283				iter, err = state.DeploymentsByNamespace(ws, args.RequestNamespace())
284			}
285			if err != nil {
286				return err
287			}
288
289			var deploys []*structs.Deployment
290			for {
291				raw := iter.Next()
292				if raw == nil {
293					break
294				}
295				deploy := raw.(*structs.Deployment)
296				deploys = append(deploys, deploy)
297			}
298			reply.Deployments = deploys
299
300			// Use the last index that affected the deployment table
301			index, err := state.Index("deployment")
302			if err != nil {
303				return err
304			}
305			reply.Index = index
306
307			// Set the query response
308			d.srv.setQueryMeta(&reply.QueryMeta)
309			return nil
310		}}
311	return d.srv.blockingRPC(&opts)
312}
313
314// Allocations returns the list of allocations that are a part of the deployment
315func (d *Deployment) Allocations(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error {
316	if done, err := d.srv.forward("Deployment.Allocations", args, args, reply); done {
317		return err
318	}
319	defer metrics.MeasureSince([]string{"nomad", "deployment", "allocations"}, time.Now())
320
321	// Check namespace read-job permissions against the request namespace.
322	// Must re-check against the alloc namespace when they return to ensure
323	// there's no namespace mismatch.
324	allowNsOp := acl.NamespaceValidator(acl.NamespaceCapabilityReadJob)
325	aclObj, err := d.srv.ResolveToken(args.AuthToken)
326	if err != nil {
327		return err
328	} else if !allowNsOp(aclObj, args.RequestNamespace()) {
329		return structs.ErrPermissionDenied
330	}
331
332	// Setup the blocking query
333	opts := blockingOptions{
334		queryOpts: &args.QueryOptions,
335		queryMeta: &reply.QueryMeta,
336		run: func(ws memdb.WatchSet, state *state.StateStore) error {
337			// Capture all the allocations
338			allocs, err := state.AllocsByDeployment(ws, args.DeploymentID)
339			if err != nil {
340				return err
341			}
342
343			// Deployments do not span namespaces so just check the
344			// first allocs namespace.
345			if len(allocs) > 0 {
346				ns := allocs[0].Namespace
347				if ns != args.RequestNamespace() && !allowNsOp(aclObj, ns) {
348					return structs.ErrPermissionDenied
349				}
350			}
351
352			stubs := make([]*structs.AllocListStub, 0, len(allocs))
353			for _, alloc := range allocs {
354				stubs = append(stubs, alloc.Stub())
355			}
356			reply.Allocations = stubs
357
358			// Use the last index that affected the jobs table
359			index, err := state.Index("allocs")
360			if err != nil {
361				return err
362			}
363			reply.Index = index
364
365			// Set the query response
366			d.srv.setQueryMeta(&reply.QueryMeta)
367			return nil
368		}}
369	return d.srv.blockingRPC(&opts)
370}
371
372// Reap is used to cleanup terminal deployments
373func (d *Deployment) Reap(args *structs.DeploymentDeleteRequest,
374	reply *structs.GenericResponse) error {
375	if done, err := d.srv.forward("Deployment.Reap", args, args, reply); done {
376		return err
377	}
378	defer metrics.MeasureSince([]string{"nomad", "deployment", "reap"}, time.Now())
379
380	// Update via Raft
381	_, index, err := d.srv.raftApply(structs.DeploymentDeleteRequestType, args)
382	if err != nil {
383		return err
384	}
385
386	// Update the index
387	reply.Index = index
388	return nil
389}
390