1package nomad
2
3import (
4	"fmt"
5	"time"
6
7	metrics "github.com/armon/go-metrics"
8	log "github.com/hashicorp/go-hclog"
9	memdb "github.com/hashicorp/go-memdb"
10	multierror "github.com/hashicorp/go-multierror"
11
12	"github.com/hashicorp/nomad/acl"
13	"github.com/hashicorp/nomad/helper"
14	"github.com/hashicorp/nomad/helper/uuid"
15	"github.com/hashicorp/nomad/nomad/state"
16	"github.com/hashicorp/nomad/nomad/structs"
17)
18
19// Alloc endpoint is used for manipulating allocations
20type Alloc struct {
21	srv    *Server
22	logger log.Logger
23}
24
25// List is used to list the allocations in the system
26func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListResponse) error {
27	if done, err := a.srv.forward("Alloc.List", args, args, reply); done {
28		return err
29	}
30	defer metrics.MeasureSince([]string{"nomad", "alloc", "list"}, time.Now())
31
32	if args.RequestNamespace() == structs.AllNamespacesSentinel {
33		return a.listAllNamespaces(args, reply)
34	}
35
36	// Check namespace read-job permissions
37	aclObj, err := a.srv.ResolveToken(args.AuthToken)
38	if err != nil {
39		return err
40	} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
41		return structs.ErrPermissionDenied
42	}
43
44	// Setup the blocking query
45	opts := blockingOptions{
46		queryOpts: &args.QueryOptions,
47		queryMeta: &reply.QueryMeta,
48		run: func(ws memdb.WatchSet, state *state.StateStore) error {
49			// Capture all the allocations
50			var err error
51			var iter memdb.ResultIterator
52
53			prefix := args.QueryOptions.Prefix
54			if prefix != "" {
55				iter, err = state.AllocsByIDPrefix(ws, args.RequestNamespace(), prefix)
56			} else {
57				iter, err = state.AllocsByNamespace(ws, args.RequestNamespace())
58			}
59			if err != nil {
60				return err
61			}
62
63			var allocs []*structs.AllocListStub
64			for {
65				raw := iter.Next()
66				if raw == nil {
67					break
68				}
69				alloc := raw.(*structs.Allocation)
70				allocs = append(allocs, alloc.Stub(args.Fields))
71			}
72			reply.Allocations = allocs
73
74			// Use the last index that affected the jobs table
75			index, err := state.Index("allocs")
76			if err != nil {
77				return err
78			}
79			reply.Index = index
80
81			// Set the query response
82			a.srv.setQueryMeta(&reply.QueryMeta)
83			return nil
84		}}
85	return a.srv.blockingRPC(&opts)
86}
87
88// listAllNamespaces lists all allocations across all namespaces
89func (a *Alloc) listAllNamespaces(args *structs.AllocListRequest, reply *structs.AllocListResponse) error {
90	// Check for read-job permissions
91	aclObj, err := a.srv.ResolveToken(args.AuthToken)
92	if err != nil {
93		return err
94	}
95	prefix := args.QueryOptions.Prefix
96	allow := func(ns string) bool {
97		return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob)
98	}
99
100	// Setup the blocking query
101	opts := blockingOptions{
102		queryOpts: &args.QueryOptions,
103		queryMeta: &reply.QueryMeta,
104		run: func(ws memdb.WatchSet, state *state.StateStore) error {
105			// get list of accessible namespaces
106			allowedNSes, err := allowedNSes(aclObj, state, allow)
107			if err == structs.ErrPermissionDenied {
108				// return empty allocations if token isn't authorized for any
109				// namespace, matching other endpoints
110				reply.Allocations = []*structs.AllocListStub{}
111			} else if err != nil {
112				return err
113			} else {
114				var iter memdb.ResultIterator
115				var err error
116				if prefix != "" {
117					iter, err = state.AllocsByIDPrefixAllNSs(ws, prefix)
118				} else {
119					iter, err = state.Allocs(ws)
120				}
121				if err != nil {
122					return err
123				}
124
125				var allocs []*structs.AllocListStub
126				for raw := iter.Next(); raw != nil; raw = iter.Next() {
127					alloc := raw.(*structs.Allocation)
128					if allowedNSes != nil && !allowedNSes[alloc.Namespace] {
129						continue
130					}
131					allocs = append(allocs, alloc.Stub(args.Fields))
132				}
133				reply.Allocations = allocs
134			}
135
136			// Use the last index that affected the jobs table
137			index, err := state.Index("allocs")
138			if err != nil {
139				return err
140			}
141			reply.Index = index
142
143			// Set the query response
144			a.srv.setQueryMeta(&reply.QueryMeta)
145			return nil
146		}}
147	return a.srv.blockingRPC(&opts)
148}
149
150// GetAlloc is used to lookup a particular allocation
151func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
152	reply *structs.SingleAllocResponse) error {
153	if done, err := a.srv.forward("Alloc.GetAlloc", args, args, reply); done {
154		return err
155	}
156	defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now())
157
158	// Check namespace read-job permissions before performing blocking query.
159	allowNsOp := acl.NamespaceValidator(acl.NamespaceCapabilityReadJob)
160	aclObj, err := a.srv.ResolveToken(args.AuthToken)
161	if err != nil {
162		// If ResolveToken had an unexpected error return that
163		if err != structs.ErrTokenNotFound {
164			return err
165		}
166
167		// Attempt to lookup AuthToken as a Node.SecretID since nodes
168		// call this endpoint and don't have an ACL token.
169		node, stateErr := a.srv.fsm.State().NodeBySecretID(nil, args.AuthToken)
170		if stateErr != nil {
171			// Return the original ResolveToken error with this err
172			var merr multierror.Error
173			merr.Errors = append(merr.Errors, err, stateErr)
174			return merr.ErrorOrNil()
175		}
176
177		// Not a node or a valid ACL token
178		if node == nil {
179			return structs.ErrTokenNotFound
180		}
181	}
182
183	// Setup the blocking query
184	opts := blockingOptions{
185		queryOpts: &args.QueryOptions,
186		queryMeta: &reply.QueryMeta,
187		run: func(ws memdb.WatchSet, state *state.StateStore) error {
188			// Lookup the allocation
189			out, err := state.AllocByID(ws, args.AllocID)
190			if err != nil {
191				return err
192			}
193
194			// Setup the output
195			reply.Alloc = out
196			if out != nil {
197				// Re-check namespace in case it differs from request.
198				if !allowNsOp(aclObj, out.Namespace) {
199					return structs.NewErrUnknownAllocation(args.AllocID)
200				}
201
202				reply.Index = out.ModifyIndex
203			} else {
204				// Use the last index that affected the allocs table
205				index, err := state.Index("allocs")
206				if err != nil {
207					return err
208				}
209				reply.Index = index
210			}
211
212			// Set the query response
213			a.srv.setQueryMeta(&reply.QueryMeta)
214			return nil
215		}}
216	return a.srv.blockingRPC(&opts)
217}
218
219// GetAllocs is used to lookup a set of allocations
220func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
221	reply *structs.AllocsGetResponse) error {
222	if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done {
223		return err
224	}
225	defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now())
226
227	allocs := make([]*structs.Allocation, len(args.AllocIDs))
228
229	// Setup the blocking query. We wait for at least one of the requested
230	// allocations to be above the min query index. This guarantees that the
231	// server has received that index.
232	opts := blockingOptions{
233		queryOpts: &args.QueryOptions,
234		queryMeta: &reply.QueryMeta,
235		run: func(ws memdb.WatchSet, state *state.StateStore) error {
236			// Lookup the allocation
237			thresholdMet := false
238			maxIndex := uint64(0)
239			for i, alloc := range args.AllocIDs {
240				out, err := state.AllocByID(ws, alloc)
241				if err != nil {
242					return err
243				}
244				if out == nil {
245					// We don't have the alloc yet
246					thresholdMet = false
247					break
248				}
249
250				// Store the pointer
251				allocs[i] = out
252
253				// Check if we have passed the minimum index
254				if out.ModifyIndex > args.QueryOptions.MinQueryIndex {
255					thresholdMet = true
256				}
257
258				if maxIndex < out.ModifyIndex {
259					maxIndex = out.ModifyIndex
260				}
261			}
262
263			// Setup the output
264			if thresholdMet {
265				reply.Allocs = allocs
266				reply.Index = maxIndex
267			} else {
268				// Use the last index that affected the nodes table
269				index, err := state.Index("allocs")
270				if err != nil {
271					return err
272				}
273				reply.Index = index
274			}
275
276			// Set the query response
277			a.srv.setQueryMeta(&reply.QueryMeta)
278			return nil
279		},
280	}
281	return a.srv.blockingRPC(&opts)
282}
283
284// Stop is used to stop an allocation and migrate it to another node.
285func (a *Alloc) Stop(args *structs.AllocStopRequest, reply *structs.AllocStopResponse) error {
286	if done, err := a.srv.forward("Alloc.Stop", args, args, reply); done {
287		return err
288	}
289	defer metrics.MeasureSince([]string{"nomad", "alloc", "stop"}, time.Now())
290
291	alloc, err := getAlloc(a.srv.State(), args.AllocID)
292	if err != nil {
293		return err
294	}
295
296	// Check for namespace alloc-lifecycle permissions.
297	allowNsOp := acl.NamespaceValidator(acl.NamespaceCapabilityAllocLifecycle)
298	aclObj, err := a.srv.ResolveToken(args.AuthToken)
299	if err != nil {
300		return err
301	} else if !allowNsOp(aclObj, alloc.Namespace) {
302		return structs.ErrPermissionDenied
303	}
304
305	now := time.Now().UTC().UnixNano()
306	eval := &structs.Evaluation{
307		ID:             uuid.Generate(),
308		Namespace:      alloc.Namespace,
309		Priority:       alloc.Job.Priority,
310		Type:           alloc.Job.Type,
311		TriggeredBy:    structs.EvalTriggerAllocStop,
312		JobID:          alloc.Job.ID,
313		JobModifyIndex: alloc.Job.ModifyIndex,
314		Status:         structs.EvalStatusPending,
315		CreateTime:     now,
316		ModifyTime:     now,
317	}
318
319	transitionReq := &structs.AllocUpdateDesiredTransitionRequest{
320		Evals: []*structs.Evaluation{eval},
321		Allocs: map[string]*structs.DesiredTransition{
322			args.AllocID: {
323				Migrate: helper.BoolToPtr(true),
324			},
325		},
326	}
327
328	// Commit this update via Raft
329	_, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransitionRequestType, transitionReq)
330	if err != nil {
331		a.logger.Error("AllocUpdateDesiredTransitionRequest failed", "error", err)
332		return err
333	}
334
335	// Setup the response
336	reply.Index = index
337	reply.EvalID = eval.ID
338	return nil
339}
340
341// UpdateDesiredTransition is used to update the desired transitions of an
342// allocation.
343func (a *Alloc) UpdateDesiredTransition(args *structs.AllocUpdateDesiredTransitionRequest, reply *structs.GenericResponse) error {
344	if done, err := a.srv.forward("Alloc.UpdateDesiredTransition", args, args, reply); done {
345		return err
346	}
347	defer metrics.MeasureSince([]string{"nomad", "alloc", "update_desired_transition"}, time.Now())
348
349	// Check that it is a management token.
350	if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil {
351		return err
352	} else if aclObj != nil && !aclObj.IsManagement() {
353		return structs.ErrPermissionDenied
354	}
355
356	// Ensure at least a single alloc
357	if len(args.Allocs) == 0 {
358		return fmt.Errorf("must update at least one allocation")
359	}
360
361	// Commit this update via Raft
362	_, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransitionRequestType, args)
363	if err != nil {
364		a.logger.Error("AllocUpdateDesiredTransitionRequest failed", "error", err)
365		return err
366	}
367
368	// Setup the response
369	reply.Index = index
370	return nil
371}
372