1package nomad 2 3import ( 4 "fmt" 5 "time" 6 7 metrics "github.com/armon/go-metrics" 8 log "github.com/hashicorp/go-hclog" 9 memdb "github.com/hashicorp/go-memdb" 10 multierror "github.com/hashicorp/go-multierror" 11 12 "github.com/hashicorp/nomad/acl" 13 "github.com/hashicorp/nomad/helper" 14 "github.com/hashicorp/nomad/helper/uuid" 15 "github.com/hashicorp/nomad/nomad/state" 16 "github.com/hashicorp/nomad/nomad/structs" 17) 18 19// Alloc endpoint is used for manipulating allocations 20type Alloc struct { 21 srv *Server 22 logger log.Logger 23} 24 25// List is used to list the allocations in the system 26func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListResponse) error { 27 if done, err := a.srv.forward("Alloc.List", args, args, reply); done { 28 return err 29 } 30 defer metrics.MeasureSince([]string{"nomad", "alloc", "list"}, time.Now()) 31 32 if args.RequestNamespace() == structs.AllNamespacesSentinel { 33 return a.listAllNamespaces(args, reply) 34 } 35 36 // Check namespace read-job permissions 37 aclObj, err := a.srv.ResolveToken(args.AuthToken) 38 if err != nil { 39 return err 40 } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) { 41 return structs.ErrPermissionDenied 42 } 43 44 // Setup the blocking query 45 opts := blockingOptions{ 46 queryOpts: &args.QueryOptions, 47 queryMeta: &reply.QueryMeta, 48 run: func(ws memdb.WatchSet, state *state.StateStore) error { 49 // Capture all the allocations 50 var err error 51 var iter memdb.ResultIterator 52 53 prefix := args.QueryOptions.Prefix 54 if prefix != "" { 55 iter, err = state.AllocsByIDPrefix(ws, args.RequestNamespace(), prefix) 56 } else { 57 iter, err = state.AllocsByNamespace(ws, args.RequestNamespace()) 58 } 59 if err != nil { 60 return err 61 } 62 63 var allocs []*structs.AllocListStub 64 for { 65 raw := iter.Next() 66 if raw == nil { 67 break 68 } 69 alloc := raw.(*structs.Allocation) 70 allocs = append(allocs, alloc.Stub(args.Fields)) 71 } 72 reply.Allocations = allocs 73 74 // Use the last index that affected the jobs table 75 index, err := state.Index("allocs") 76 if err != nil { 77 return err 78 } 79 reply.Index = index 80 81 // Set the query response 82 a.srv.setQueryMeta(&reply.QueryMeta) 83 return nil 84 }} 85 return a.srv.blockingRPC(&opts) 86} 87 88// listAllNamespaces lists all allocations across all namespaces 89func (a *Alloc) listAllNamespaces(args *structs.AllocListRequest, reply *structs.AllocListResponse) error { 90 // Check for read-job permissions 91 aclObj, err := a.srv.ResolveToken(args.AuthToken) 92 if err != nil { 93 return err 94 } 95 prefix := args.QueryOptions.Prefix 96 allow := func(ns string) bool { 97 return aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob) 98 } 99 100 // Setup the blocking query 101 opts := blockingOptions{ 102 queryOpts: &args.QueryOptions, 103 queryMeta: &reply.QueryMeta, 104 run: func(ws memdb.WatchSet, state *state.StateStore) error { 105 // get list of accessible namespaces 106 allowedNSes, err := allowedNSes(aclObj, state, allow) 107 if err == structs.ErrPermissionDenied { 108 // return empty allocations if token isn't authorized for any 109 // namespace, matching other endpoints 110 reply.Allocations = []*structs.AllocListStub{} 111 } else if err != nil { 112 return err 113 } else { 114 var iter memdb.ResultIterator 115 var err error 116 if prefix != "" { 117 iter, err = state.AllocsByIDPrefixAllNSs(ws, prefix) 118 } else { 119 iter, err = state.Allocs(ws) 120 } 121 if err != nil { 122 return err 123 } 124 125 var allocs []*structs.AllocListStub 126 for raw := iter.Next(); raw != nil; raw = iter.Next() { 127 alloc := raw.(*structs.Allocation) 128 if allowedNSes != nil && !allowedNSes[alloc.Namespace] { 129 continue 130 } 131 allocs = append(allocs, alloc.Stub(args.Fields)) 132 } 133 reply.Allocations = allocs 134 } 135 136 // Use the last index that affected the jobs table 137 index, err := state.Index("allocs") 138 if err != nil { 139 return err 140 } 141 reply.Index = index 142 143 // Set the query response 144 a.srv.setQueryMeta(&reply.QueryMeta) 145 return nil 146 }} 147 return a.srv.blockingRPC(&opts) 148} 149 150// GetAlloc is used to lookup a particular allocation 151func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest, 152 reply *structs.SingleAllocResponse) error { 153 if done, err := a.srv.forward("Alloc.GetAlloc", args, args, reply); done { 154 return err 155 } 156 defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now()) 157 158 // Check namespace read-job permissions before performing blocking query. 159 allowNsOp := acl.NamespaceValidator(acl.NamespaceCapabilityReadJob) 160 aclObj, err := a.srv.ResolveToken(args.AuthToken) 161 if err != nil { 162 // If ResolveToken had an unexpected error return that 163 if err != structs.ErrTokenNotFound { 164 return err 165 } 166 167 // Attempt to lookup AuthToken as a Node.SecretID since nodes 168 // call this endpoint and don't have an ACL token. 169 node, stateErr := a.srv.fsm.State().NodeBySecretID(nil, args.AuthToken) 170 if stateErr != nil { 171 // Return the original ResolveToken error with this err 172 var merr multierror.Error 173 merr.Errors = append(merr.Errors, err, stateErr) 174 return merr.ErrorOrNil() 175 } 176 177 // Not a node or a valid ACL token 178 if node == nil { 179 return structs.ErrTokenNotFound 180 } 181 } 182 183 // Setup the blocking query 184 opts := blockingOptions{ 185 queryOpts: &args.QueryOptions, 186 queryMeta: &reply.QueryMeta, 187 run: func(ws memdb.WatchSet, state *state.StateStore) error { 188 // Lookup the allocation 189 out, err := state.AllocByID(ws, args.AllocID) 190 if err != nil { 191 return err 192 } 193 194 // Setup the output 195 reply.Alloc = out 196 if out != nil { 197 // Re-check namespace in case it differs from request. 198 if !allowNsOp(aclObj, out.Namespace) { 199 return structs.NewErrUnknownAllocation(args.AllocID) 200 } 201 202 reply.Index = out.ModifyIndex 203 } else { 204 // Use the last index that affected the allocs table 205 index, err := state.Index("allocs") 206 if err != nil { 207 return err 208 } 209 reply.Index = index 210 } 211 212 // Set the query response 213 a.srv.setQueryMeta(&reply.QueryMeta) 214 return nil 215 }} 216 return a.srv.blockingRPC(&opts) 217} 218 219// GetAllocs is used to lookup a set of allocations 220func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, 221 reply *structs.AllocsGetResponse) error { 222 if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done { 223 return err 224 } 225 defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now()) 226 227 allocs := make([]*structs.Allocation, len(args.AllocIDs)) 228 229 // Setup the blocking query. We wait for at least one of the requested 230 // allocations to be above the min query index. This guarantees that the 231 // server has received that index. 232 opts := blockingOptions{ 233 queryOpts: &args.QueryOptions, 234 queryMeta: &reply.QueryMeta, 235 run: func(ws memdb.WatchSet, state *state.StateStore) error { 236 // Lookup the allocation 237 thresholdMet := false 238 maxIndex := uint64(0) 239 for i, alloc := range args.AllocIDs { 240 out, err := state.AllocByID(ws, alloc) 241 if err != nil { 242 return err 243 } 244 if out == nil { 245 // We don't have the alloc yet 246 thresholdMet = false 247 break 248 } 249 250 // Store the pointer 251 allocs[i] = out 252 253 // Check if we have passed the minimum index 254 if out.ModifyIndex > args.QueryOptions.MinQueryIndex { 255 thresholdMet = true 256 } 257 258 if maxIndex < out.ModifyIndex { 259 maxIndex = out.ModifyIndex 260 } 261 } 262 263 // Setup the output 264 if thresholdMet { 265 reply.Allocs = allocs 266 reply.Index = maxIndex 267 } else { 268 // Use the last index that affected the nodes table 269 index, err := state.Index("allocs") 270 if err != nil { 271 return err 272 } 273 reply.Index = index 274 } 275 276 // Set the query response 277 a.srv.setQueryMeta(&reply.QueryMeta) 278 return nil 279 }, 280 } 281 return a.srv.blockingRPC(&opts) 282} 283 284// Stop is used to stop an allocation and migrate it to another node. 285func (a *Alloc) Stop(args *structs.AllocStopRequest, reply *structs.AllocStopResponse) error { 286 if done, err := a.srv.forward("Alloc.Stop", args, args, reply); done { 287 return err 288 } 289 defer metrics.MeasureSince([]string{"nomad", "alloc", "stop"}, time.Now()) 290 291 alloc, err := getAlloc(a.srv.State(), args.AllocID) 292 if err != nil { 293 return err 294 } 295 296 // Check for namespace alloc-lifecycle permissions. 297 allowNsOp := acl.NamespaceValidator(acl.NamespaceCapabilityAllocLifecycle) 298 aclObj, err := a.srv.ResolveToken(args.AuthToken) 299 if err != nil { 300 return err 301 } else if !allowNsOp(aclObj, alloc.Namespace) { 302 return structs.ErrPermissionDenied 303 } 304 305 now := time.Now().UTC().UnixNano() 306 eval := &structs.Evaluation{ 307 ID: uuid.Generate(), 308 Namespace: alloc.Namespace, 309 Priority: alloc.Job.Priority, 310 Type: alloc.Job.Type, 311 TriggeredBy: structs.EvalTriggerAllocStop, 312 JobID: alloc.Job.ID, 313 JobModifyIndex: alloc.Job.ModifyIndex, 314 Status: structs.EvalStatusPending, 315 CreateTime: now, 316 ModifyTime: now, 317 } 318 319 transitionReq := &structs.AllocUpdateDesiredTransitionRequest{ 320 Evals: []*structs.Evaluation{eval}, 321 Allocs: map[string]*structs.DesiredTransition{ 322 args.AllocID: { 323 Migrate: helper.BoolToPtr(true), 324 }, 325 }, 326 } 327 328 // Commit this update via Raft 329 _, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransitionRequestType, transitionReq) 330 if err != nil { 331 a.logger.Error("AllocUpdateDesiredTransitionRequest failed", "error", err) 332 return err 333 } 334 335 // Setup the response 336 reply.Index = index 337 reply.EvalID = eval.ID 338 return nil 339} 340 341// UpdateDesiredTransition is used to update the desired transitions of an 342// allocation. 343func (a *Alloc) UpdateDesiredTransition(args *structs.AllocUpdateDesiredTransitionRequest, reply *structs.GenericResponse) error { 344 if done, err := a.srv.forward("Alloc.UpdateDesiredTransition", args, args, reply); done { 345 return err 346 } 347 defer metrics.MeasureSince([]string{"nomad", "alloc", "update_desired_transition"}, time.Now()) 348 349 // Check that it is a management token. 350 if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { 351 return err 352 } else if aclObj != nil && !aclObj.IsManagement() { 353 return structs.ErrPermissionDenied 354 } 355 356 // Ensure at least a single alloc 357 if len(args.Allocs) == 0 { 358 return fmt.Errorf("must update at least one allocation") 359 } 360 361 // Commit this update via Raft 362 _, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransitionRequestType, args) 363 if err != nil { 364 a.logger.Error("AllocUpdateDesiredTransitionRequest failed", "error", err) 365 return err 366 } 367 368 // Setup the response 369 reply.Index = index 370 return nil 371} 372