1package state 2 3import ( 4 "context" 5 "fmt" 6 "reflect" 7 "sort" 8 "strings" 9 "time" 10 11 log "github.com/hashicorp/go-hclog" 12 memdb "github.com/hashicorp/go-memdb" 13 multierror "github.com/hashicorp/go-multierror" 14 "github.com/pkg/errors" 15 16 "github.com/hashicorp/nomad/helper" 17 "github.com/hashicorp/nomad/nomad/stream" 18 "github.com/hashicorp/nomad/nomad/structs" 19) 20 21// Txn is a transaction against a state store. 22// This can be a read or write transaction. 23type Txn = *txn 24 25const ( 26 // NodeRegisterEventReregistered is the message used when the node becomes 27 // reregistered. 28 NodeRegisterEventRegistered = "Node registered" 29 30 // NodeRegisterEventReregistered is the message used when the node becomes 31 // reregistered. 32 NodeRegisterEventReregistered = "Node re-registered" 33) 34 35// IndexEntry is used with the "index" table 36// for managing the latest Raft index affecting a table. 37type IndexEntry struct { 38 Key string 39 Value uint64 40} 41 42// StateStoreConfig is used to configure a new state store 43type StateStoreConfig struct { 44 // Logger is used to output the state store's logs 45 Logger log.Logger 46 47 // Region is the region of the server embedding the state store. 48 Region string 49 50 // EnablePublisher is used to enable or disable the event publisher 51 EnablePublisher bool 52 53 // EventBufferSize configures the amount of events to hold in memory 54 EventBufferSize int64 55} 56 57// The StateStore is responsible for maintaining all the Nomad 58// state. It is manipulated by the FSM which maintains consistency 59// through the use of Raft. The goals of the StateStore are to provide 60// high concurrency for read operations without blocking writes, and 61// to provide write availability in the face of reads. EVERY object 62// returned as a result of a read against the state store should be 63// considered a constant and NEVER modified in place. 64type StateStore struct { 65 logger log.Logger 66 db *changeTrackerDB 67 68 // config is the passed in configuration 69 config *StateStoreConfig 70 71 // abandonCh is used to signal watchers that this state store has been 72 // abandoned (usually during a restore). This is only ever closed. 73 abandonCh chan struct{} 74 75 // TODO: refactor abandonCh to use a context so that both can use the same 76 // cancel mechanism. 77 stopEventBroker func() 78} 79 80type streamACLDelegate struct { 81 s *StateStore 82} 83 84func (a *streamACLDelegate) TokenProvider() stream.ACLTokenProvider { 85 resolver, _ := a.s.Snapshot() 86 return resolver 87} 88 89// NewStateStore is used to create a new state store 90func NewStateStore(config *StateStoreConfig) (*StateStore, error) { 91 // Create the MemDB 92 db, err := memdb.NewMemDB(stateStoreSchema()) 93 if err != nil { 94 return nil, fmt.Errorf("state store setup failed: %v", err) 95 } 96 97 // Create the state store 98 ctx, cancel := context.WithCancel(context.TODO()) 99 s := &StateStore{ 100 logger: config.Logger.Named("state_store"), 101 config: config, 102 abandonCh: make(chan struct{}), 103 stopEventBroker: cancel, 104 } 105 106 if config.EnablePublisher { 107 // Create new event publisher using provided config 108 broker, err := stream.NewEventBroker(ctx, &streamACLDelegate{s}, stream.EventBrokerCfg{ 109 EventBufferSize: config.EventBufferSize, 110 Logger: config.Logger, 111 }) 112 if err != nil { 113 return nil, fmt.Errorf("creating state store event broker %w", err) 114 } 115 s.db = NewChangeTrackerDB(db, broker, eventsFromChanges) 116 } else { 117 s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges) 118 } 119 120 // Initialize the state store with the default namespace. 121 if err := s.namespaceInit(); err != nil { 122 return nil, fmt.Errorf("enterprise state store initialization failed: %v", err) 123 } 124 125 return s, nil 126} 127 128// NewWatchSet returns a new memdb.WatchSet that adds the state stores abandonCh 129// as a watcher. This is important in that it will notify when this specific 130// state store is no longer valid, usually due to a new snapshot being loaded 131func (s *StateStore) NewWatchSet() memdb.WatchSet { 132 ws := memdb.NewWatchSet() 133 ws.Add(s.AbandonCh()) 134 return ws 135} 136 137func (s *StateStore) EventBroker() (*stream.EventBroker, error) { 138 if s.db.publisher == nil { 139 return nil, fmt.Errorf("EventBroker not configured") 140 } 141 return s.db.publisher, nil 142} 143 144// namespaceInit ensures the default namespace exists. 145func (s *StateStore) namespaceInit() error { 146 // Create the default namespace. This is safe to do every time we create the 147 // state store. There are two main cases, a brand new cluster in which case 148 // each server will have the same default namespace object, or a new cluster 149 // in which case if the default namespace has been modified, it will be 150 // overridden by the restore code path. 151 defaultNs := &structs.Namespace{ 152 Name: structs.DefaultNamespace, 153 Description: structs.DefaultNamespaceDescription, 154 } 155 156 if err := s.UpsertNamespaces(1, []*structs.Namespace{defaultNs}); err != nil { 157 return fmt.Errorf("inserting default namespace failed: %v", err) 158 } 159 160 return nil 161} 162 163// Config returns the state store configuration. 164func (s *StateStore) Config() *StateStoreConfig { 165 return s.config 166} 167 168// Snapshot is used to create a point in time snapshot. Because 169// we use MemDB, we just need to snapshot the state of the underlying 170// database. 171func (s *StateStore) Snapshot() (*StateSnapshot, error) { 172 memDBSnap := s.db.memdb.Snapshot() 173 174 store := StateStore{ 175 logger: s.logger, 176 config: s.config, 177 } 178 179 // Create a new change tracker DB that does not publish or track changes 180 store.db = NewChangeTrackerDB(memDBSnap, nil, noOpProcessChanges) 181 182 snap := &StateSnapshot{ 183 StateStore: store, 184 } 185 return snap, nil 186} 187 188// SnapshotMinIndex is used to create a state snapshot where the index is 189// guaranteed to be greater than or equal to the index parameter. 190// 191// Some server operations (such as scheduling) exchange objects via RPC 192// concurrent with Raft log application, so they must ensure the state store 193// snapshot they are operating on is at or after the index the objects 194// retrieved via RPC were applied to the Raft log at. 195// 196// Callers should maintain their own timer metric as the time this method 197// blocks indicates Raft log application latency relative to scheduling. 198func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*StateSnapshot, error) { 199 // Ported from work.go:waitForIndex prior to 0.9 200 201 const backoffBase = 20 * time.Millisecond 202 const backoffLimit = 1 * time.Second 203 var retries uint 204 var retryTimer *time.Timer 205 206 // XXX: Potential optimization is to set up a watch on the state 207 // store's index table and only unblock via a trigger rather than 208 // polling. 209 for { 210 // Get the states current index 211 snapshotIndex, err := s.LatestIndex() 212 if err != nil { 213 return nil, fmt.Errorf("failed to determine state store's index: %v", err) 214 } 215 216 // We only need the FSM state to be as recent as the given index 217 if snapshotIndex >= index { 218 return s.Snapshot() 219 } 220 221 // Exponential back off 222 retries++ 223 if retryTimer == nil { 224 // First retry, start at baseline 225 retryTimer = time.NewTimer(backoffBase) 226 } else { 227 // Subsequent retry, reset timer 228 deadline := 1 << (2 * retries) * backoffBase 229 if deadline > backoffLimit { 230 deadline = backoffLimit 231 } 232 retryTimer.Reset(deadline) 233 } 234 235 select { 236 case <-ctx.Done(): 237 return nil, ctx.Err() 238 case <-retryTimer.C: 239 } 240 } 241} 242 243// Restore is used to optimize the efficiency of rebuilding 244// state by minimizing the number of transactions and checking 245// overhead. 246func (s *StateStore) Restore() (*StateRestore, error) { 247 txn := s.db.WriteTxnRestore() 248 r := &StateRestore{ 249 txn: txn, 250 } 251 return r, nil 252} 253 254// AbandonCh returns a channel you can wait on to know if the state store was 255// abandoned. 256func (s *StateStore) AbandonCh() <-chan struct{} { 257 return s.abandonCh 258} 259 260// Abandon is used to signal that the given state store has been abandoned. 261// Calling this more than one time will panic. 262func (s *StateStore) Abandon() { 263 s.StopEventBroker() 264 close(s.abandonCh) 265} 266 267// StopStopEventBroker calls the cancel func for the state stores event 268// publisher. It should be called during server shutdown. 269func (s *StateStore) StopEventBroker() { 270 s.stopEventBroker() 271} 272 273// QueryFn is the definition of a function that can be used to implement a basic 274// blocking query against the state store. 275type QueryFn func(memdb.WatchSet, *StateStore) (resp interface{}, index uint64, err error) 276 277// BlockingQuery takes a query function and runs the function until the minimum 278// query index is met or until the passed context is cancelled. 279func (s *StateStore) BlockingQuery(query QueryFn, minIndex uint64, ctx context.Context) ( 280 resp interface{}, index uint64, err error) { 281 282RUN_QUERY: 283 // We capture the state store and its abandon channel but pass a snapshot to 284 // the blocking query function. We operate on the snapshot to allow separate 285 // calls to the state store not all wrapped within the same transaction. 286 abandonCh := s.AbandonCh() 287 snap, _ := s.Snapshot() 288 stateSnap := &snap.StateStore 289 290 // We can skip all watch tracking if this isn't a blocking query. 291 var ws memdb.WatchSet 292 if minIndex > 0 { 293 ws = memdb.NewWatchSet() 294 295 // This channel will be closed if a snapshot is restored and the 296 // whole state store is abandoned. 297 ws.Add(abandonCh) 298 } 299 300 resp, index, err = query(ws, stateSnap) 301 if err != nil { 302 return nil, index, err 303 } 304 305 // We haven't reached the min-index yet. 306 if minIndex > 0 && index <= minIndex { 307 if err := ws.WatchCtx(ctx); err != nil { 308 return nil, index, err 309 } 310 311 goto RUN_QUERY 312 } 313 314 return resp, index, nil 315} 316 317// UpsertPlanResults is used to upsert the results of a plan. 318func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64, results *structs.ApplyPlanResultsRequest) error { 319 snapshot, err := s.Snapshot() 320 if err != nil { 321 return err 322 } 323 324 allocsStopped, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsStopped) 325 if err != nil { 326 return err 327 } 328 329 allocsPreempted, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsPreempted) 330 if err != nil { 331 return err 332 } 333 334 // COMPAT 0.11: Remove this denormalization when NodePreemptions is removed 335 results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions) 336 if err != nil { 337 return err 338 } 339 340 txn := s.db.WriteTxnMsgT(msgType, index) 341 defer txn.Abort() 342 343 // Upsert the newly created or updated deployment 344 if results.Deployment != nil { 345 if err := s.upsertDeploymentImpl(index, results.Deployment, txn); err != nil { 346 return err 347 } 348 } 349 350 // Update the status of deployments effected by the plan. 351 if len(results.DeploymentUpdates) != 0 { 352 s.upsertDeploymentUpdates(index, results.DeploymentUpdates, txn) 353 } 354 355 if results.EvalID != "" { 356 // Update the modify index of the eval id 357 if err := s.updateEvalModifyIndex(txn, index, results.EvalID); err != nil { 358 return err 359 } 360 } 361 362 numAllocs := 0 363 if len(results.Alloc) > 0 || len(results.NodePreemptions) > 0 { 364 // COMPAT 0.11: This branch will be removed, when Alloc is removed 365 // Attach the job to all the allocations. It is pulled out in the payload to 366 // avoid the redundancy of encoding, but should be denormalized prior to 367 // being inserted into MemDB. 368 addComputedAllocAttrs(results.Alloc, results.Job) 369 numAllocs = len(results.Alloc) + len(results.NodePreemptions) 370 } else { 371 // Attach the job to all the allocations. It is pulled out in the payload to 372 // avoid the redundancy of encoding, but should be denormalized prior to 373 // being inserted into MemDB. 374 addComputedAllocAttrs(results.AllocsUpdated, results.Job) 375 numAllocs = len(allocsStopped) + len(results.AllocsUpdated) + len(allocsPreempted) 376 } 377 378 allocsToUpsert := make([]*structs.Allocation, 0, numAllocs) 379 380 // COMPAT 0.11: Both these appends should be removed when Alloc and NodePreemptions are removed 381 allocsToUpsert = append(allocsToUpsert, results.Alloc...) 382 allocsToUpsert = append(allocsToUpsert, results.NodePreemptions...) 383 384 allocsToUpsert = append(allocsToUpsert, allocsStopped...) 385 allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...) 386 allocsToUpsert = append(allocsToUpsert, allocsPreempted...) 387 388 // handle upgrade path 389 for _, alloc := range allocsToUpsert { 390 alloc.Canonicalize() 391 } 392 393 if err := s.upsertAllocsImpl(index, allocsToUpsert, txn); err != nil { 394 return err 395 } 396 397 // Upsert followup evals for allocs that were preempted 398 for _, eval := range results.PreemptionEvals { 399 if err := s.nestedUpsertEval(txn, index, eval); err != nil { 400 return err 401 } 402 } 403 404 return txn.Commit() 405} 406 407// addComputedAllocAttrs adds the computed/derived attributes to the allocation. 408// This method is used when an allocation is being denormalized. 409func addComputedAllocAttrs(allocs []*structs.Allocation, job *structs.Job) { 410 structs.DenormalizeAllocationJobs(job, allocs) 411 412 // COMPAT(0.11): Remove in 0.11 413 // Calculate the total resources of allocations. It is pulled out in the 414 // payload to avoid encoding something that can be computed, but should be 415 // denormalized prior to being inserted into MemDB. 416 for _, alloc := range allocs { 417 if alloc.Resources != nil { 418 continue 419 } 420 421 alloc.Resources = new(structs.Resources) 422 for _, task := range alloc.TaskResources { 423 alloc.Resources.Add(task) 424 } 425 426 // Add the shared resources 427 alloc.Resources.Add(alloc.SharedResources) 428 } 429} 430 431// upsertDeploymentUpdates updates the deployments given the passed status 432// updates. 433func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *txn) error { 434 for _, u := range updates { 435 if err := s.updateDeploymentStatusImpl(index, u, txn); err != nil { 436 return err 437 } 438 } 439 440 return nil 441} 442 443// UpsertJobSummary upserts a job summary into the state store. 444func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error { 445 txn := s.db.WriteTxn(index) 446 defer txn.Abort() 447 448 // Check if the job summary already exists 449 existing, err := txn.First("job_summary", "id", jobSummary.Namespace, jobSummary.JobID) 450 if err != nil { 451 return fmt.Errorf("job summary lookup failed: %v", err) 452 } 453 454 // Setup the indexes correctly 455 if existing != nil { 456 jobSummary.CreateIndex = existing.(*structs.JobSummary).CreateIndex 457 jobSummary.ModifyIndex = index 458 } else { 459 jobSummary.CreateIndex = index 460 jobSummary.ModifyIndex = index 461 } 462 463 // Update the index 464 if err := txn.Insert("job_summary", jobSummary); err != nil { 465 return err 466 } 467 468 // Update the indexes table for job summary 469 if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { 470 return fmt.Errorf("index update failed: %v", err) 471 } 472 473 return txn.Commit() 474} 475 476// DeleteJobSummary deletes the job summary with the given ID. This is for 477// testing purposes only. 478func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error { 479 txn := s.db.WriteTxn(index) 480 defer txn.Abort() 481 482 // Delete the job summary 483 if _, err := txn.DeleteAll("job_summary", "id", namespace, id); err != nil { 484 return fmt.Errorf("deleting job summary failed: %v", err) 485 } 486 if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { 487 return fmt.Errorf("index update failed: %v", err) 488 } 489 return txn.Commit() 490} 491 492// UpsertDeployment is used to insert a new deployment. If cancelPrior is set to 493// true, all prior deployments for the same job will be cancelled. 494func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment) error { 495 txn := s.db.WriteTxn(index) 496 defer txn.Abort() 497 if err := s.upsertDeploymentImpl(index, deployment, txn); err != nil { 498 return err 499 } 500 return txn.Commit() 501} 502 503func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Deployment, txn *txn) error { 504 // Check if the deployment already exists 505 existing, err := txn.First("deployment", "id", deployment.ID) 506 if err != nil { 507 return fmt.Errorf("deployment lookup failed: %v", err) 508 } 509 510 // Setup the indexes correctly 511 if existing != nil { 512 deployment.CreateIndex = existing.(*structs.Deployment).CreateIndex 513 deployment.ModifyIndex = index 514 } else { 515 deployment.CreateIndex = index 516 deployment.ModifyIndex = index 517 } 518 519 // Insert the deployment 520 if err := txn.Insert("deployment", deployment); err != nil { 521 return err 522 } 523 524 // Update the indexes table for deployment 525 if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil { 526 return fmt.Errorf("index update failed: %v", err) 527 } 528 529 // If the deployment is being marked as complete, set the job to stable. 530 if deployment.Status == structs.DeploymentStatusSuccessful { 531 if err := s.updateJobStabilityImpl(index, deployment.Namespace, deployment.JobID, deployment.JobVersion, true, txn); err != nil { 532 return fmt.Errorf("failed to update job stability: %v", err) 533 } 534 } 535 536 return nil 537} 538 539func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error) { 540 txn := s.db.ReadTxn() 541 542 // Walk the entire deployments table 543 iter, err := txn.Get("deployment", "id") 544 if err != nil { 545 return nil, err 546 } 547 548 ws.Add(iter.WatchCh()) 549 return iter, nil 550} 551 552func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { 553 txn := s.db.ReadTxn() 554 555 // Walk the entire deployments table 556 iter, err := txn.Get("deployment", "namespace", namespace) 557 if err != nil { 558 return nil, err 559 } 560 561 ws.Add(iter.WatchCh()) 562 return iter, nil 563} 564 565func (s *StateStore) DeploymentsByIDPrefix(ws memdb.WatchSet, namespace, deploymentID string) (memdb.ResultIterator, error) { 566 txn := s.db.ReadTxn() 567 568 // Walk the entire deployments table 569 iter, err := txn.Get("deployment", "id_prefix", deploymentID) 570 if err != nil { 571 return nil, err 572 } 573 574 ws.Add(iter.WatchCh()) 575 576 // Wrap the iterator in a filter 577 wrap := memdb.NewFilterIterator(iter, deploymentNamespaceFilter(namespace)) 578 return wrap, nil 579} 580 581// deploymentNamespaceFilter returns a filter function that filters all 582// deployment not in the given namespace. 583func deploymentNamespaceFilter(namespace string) func(interface{}) bool { 584 return func(raw interface{}) bool { 585 d, ok := raw.(*structs.Deployment) 586 if !ok { 587 return true 588 } 589 590 return d.Namespace != namespace 591 } 592} 593 594func (s *StateStore) DeploymentByID(ws memdb.WatchSet, deploymentID string) (*structs.Deployment, error) { 595 txn := s.db.ReadTxn() 596 return s.deploymentByIDImpl(ws, deploymentID, txn) 597} 598 599func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, txn *txn) (*structs.Deployment, error) { 600 watchCh, existing, err := txn.FirstWatch("deployment", "id", deploymentID) 601 if err != nil { 602 return nil, fmt.Errorf("deployment lookup failed: %v", err) 603 } 604 ws.Add(watchCh) 605 606 if existing != nil { 607 return existing.(*structs.Deployment), nil 608 } 609 610 return nil, nil 611} 612 613func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Deployment, error) { 614 txn := s.db.ReadTxn() 615 616 var job *structs.Job 617 // Read job from state store 618 _, existing, err := txn.FirstWatch("jobs", "id", namespace, jobID) 619 if err != nil { 620 return nil, fmt.Errorf("job lookup failed: %v", err) 621 } 622 if existing != nil { 623 job = existing.(*structs.Job) 624 } 625 626 // Get an iterator over the deployments 627 iter, err := txn.Get("deployment", "job", namespace, jobID) 628 if err != nil { 629 return nil, err 630 } 631 632 ws.Add(iter.WatchCh()) 633 634 var out []*structs.Deployment 635 for { 636 raw := iter.Next() 637 if raw == nil { 638 break 639 } 640 d := raw.(*structs.Deployment) 641 642 // If the allocation belongs to a job with the same ID but a different 643 // create index and we are not getting all the allocations whose Jobs 644 // matches the same Job ID then we skip it 645 if !all && job != nil && d.JobCreateIndex != job.CreateIndex { 646 continue 647 } 648 out = append(out, d) 649 } 650 651 return out, nil 652} 653 654// LatestDeploymentByJobID returns the latest deployment for the given job. The 655// latest is determined strictly by CreateIndex. 656func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID string) (*structs.Deployment, error) { 657 txn := s.db.ReadTxn() 658 659 // Get an iterator over the deployments 660 iter, err := txn.Get("deployment", "job", namespace, jobID) 661 if err != nil { 662 return nil, err 663 } 664 665 ws.Add(iter.WatchCh()) 666 667 var out *structs.Deployment 668 for { 669 raw := iter.Next() 670 if raw == nil { 671 break 672 } 673 674 d := raw.(*structs.Deployment) 675 if out == nil || out.CreateIndex < d.CreateIndex { 676 out = d 677 } 678 } 679 680 return out, nil 681} 682 683// DeleteDeployment is used to delete a set of deployments by ID 684func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) error { 685 txn := s.db.WriteTxn(index) 686 defer txn.Abort() 687 688 if len(deploymentIDs) == 0 { 689 return nil 690 } 691 692 for _, deploymentID := range deploymentIDs { 693 // Lookup the deployment 694 existing, err := txn.First("deployment", "id", deploymentID) 695 if err != nil { 696 return fmt.Errorf("deployment lookup failed: %v", err) 697 } 698 if existing == nil { 699 return fmt.Errorf("deployment not found") 700 } 701 702 // Delete the deployment 703 if err := txn.Delete("deployment", existing); err != nil { 704 return fmt.Errorf("deployment delete failed: %v", err) 705 } 706 } 707 708 if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil { 709 return fmt.Errorf("index update failed: %v", err) 710 } 711 712 return txn.Commit() 713} 714 715// UpsertScalingEvent is used to insert a new scaling event. 716// Only the most recent JobTrackedScalingEvents will be kept. 717func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventRequest) error { 718 txn := s.db.WriteTxn(index) 719 defer txn.Abort() 720 721 // Get the existing events 722 existing, err := txn.First("scaling_event", "id", req.Namespace, req.JobID) 723 if err != nil { 724 return fmt.Errorf("scaling event lookup failed: %v", err) 725 } 726 727 var jobEvents *structs.JobScalingEvents 728 if existing != nil { 729 jobEvents = existing.(*structs.JobScalingEvents) 730 } else { 731 jobEvents = &structs.JobScalingEvents{ 732 Namespace: req.Namespace, 733 JobID: req.JobID, 734 ScalingEvents: make(map[string][]*structs.ScalingEvent), 735 } 736 } 737 738 jobEvents.ModifyIndex = index 739 req.ScalingEvent.CreateIndex = index 740 741 events := jobEvents.ScalingEvents[req.TaskGroup] 742 // Prepend this latest event 743 events = append( 744 []*structs.ScalingEvent{req.ScalingEvent}, 745 events..., 746 ) 747 // Truncate older events 748 if len(events) > structs.JobTrackedScalingEvents { 749 events = events[0:structs.JobTrackedScalingEvents] 750 } 751 jobEvents.ScalingEvents[req.TaskGroup] = events 752 753 // Insert the new event 754 if err := txn.Insert("scaling_event", jobEvents); err != nil { 755 return fmt.Errorf("scaling event insert failed: %v", err) 756 } 757 758 // Update the indexes table for scaling_event 759 if err := txn.Insert("index", &IndexEntry{"scaling_event", index}); err != nil { 760 return fmt.Errorf("index update failed: %v", err) 761 } 762 763 return txn.Commit() 764} 765 766// ScalingEvents returns an iterator over all the job scaling events 767func (s *StateStore) ScalingEvents(ws memdb.WatchSet) (memdb.ResultIterator, error) { 768 txn := s.db.ReadTxn() 769 770 // Walk the entire scaling_event table 771 iter, err := txn.Get("scaling_event", "id") 772 if err != nil { 773 return nil, err 774 } 775 776 ws.Add(iter.WatchCh()) 777 778 return iter, nil 779} 780 781func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID string) (map[string][]*structs.ScalingEvent, uint64, error) { 782 txn := s.db.ReadTxn() 783 784 watchCh, existing, err := txn.FirstWatch("scaling_event", "id", namespace, jobID) 785 if err != nil { 786 return nil, 0, fmt.Errorf("job scaling events lookup failed: %v", err) 787 } 788 ws.Add(watchCh) 789 790 if existing != nil { 791 events := existing.(*structs.JobScalingEvents) 792 return events.ScalingEvents, events.ModifyIndex, nil 793 } 794 return nil, 0, nil 795} 796 797// UpsertNode is used to register a node or update a node definition 798// This is assumed to be triggered by the client, so we retain the value 799// of drain/eligibility which is set by the scheduler. 800func (s *StateStore) UpsertNode(msgType structs.MessageType, index uint64, node *structs.Node) error { 801 txn := s.db.WriteTxnMsgT(msgType, index) 802 defer txn.Abort() 803 804 err := upsertNodeTxn(txn, index, node) 805 if err != nil { 806 return nil 807 } 808 return txn.Commit() 809} 810 811func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error { 812 // Check if the node already exists 813 existing, err := txn.First("nodes", "id", node.ID) 814 if err != nil { 815 return fmt.Errorf("node lookup failed: %v", err) 816 } 817 818 // Setup the indexes correctly 819 if existing != nil { 820 exist := existing.(*structs.Node) 821 node.CreateIndex = exist.CreateIndex 822 node.ModifyIndex = index 823 824 // Retain node events that have already been set on the node 825 node.Events = exist.Events 826 827 // If we are transitioning from down, record the re-registration 828 if exist.Status == structs.NodeStatusDown && node.Status != structs.NodeStatusDown { 829 appendNodeEvents(index, node, []*structs.NodeEvent{ 830 structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster). 831 SetMessage(NodeRegisterEventReregistered). 832 SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))}) 833 } 834 835 node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility 836 node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy 837 node.LastDrain = exist.LastDrain // Retain the drain metadata 838 } else { 839 // Because this is the first time the node is being registered, we should 840 // also create a node registration event 841 nodeEvent := structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster). 842 SetMessage(NodeRegisterEventRegistered). 843 SetTimestamp(time.Unix(node.StatusUpdatedAt, 0)) 844 node.Events = []*structs.NodeEvent{nodeEvent} 845 node.CreateIndex = index 846 node.ModifyIndex = index 847 } 848 849 // Insert the node 850 if err := txn.Insert("nodes", node); err != nil { 851 return fmt.Errorf("node insert failed: %v", err) 852 } 853 if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { 854 return fmt.Errorf("index update failed: %v", err) 855 } 856 if err := upsertNodeCSIPlugins(txn, node, index); err != nil { 857 return fmt.Errorf("csi plugin update failed: %v", err) 858 } 859 860 return nil 861} 862 863// DeleteNode deregisters a batch of nodes 864func (s *StateStore) DeleteNode(msgType structs.MessageType, index uint64, nodes []string) error { 865 txn := s.db.WriteTxn(index) 866 defer txn.Abort() 867 868 err := deleteNodeTxn(txn, index, nodes) 869 if err != nil { 870 return nil 871 } 872 return txn.Commit() 873} 874 875func deleteNodeTxn(txn *txn, index uint64, nodes []string) error { 876 if len(nodes) == 0 { 877 return fmt.Errorf("node ids missing") 878 } 879 880 for _, nodeID := range nodes { 881 existing, err := txn.First("nodes", "id", nodeID) 882 if err != nil { 883 return fmt.Errorf("node lookup failed: %s: %v", nodeID, err) 884 } 885 if existing == nil { 886 return fmt.Errorf("node not found: %s", nodeID) 887 } 888 889 // Delete the node 890 if err := txn.Delete("nodes", existing); err != nil { 891 return fmt.Errorf("node delete failed: %s: %v", nodeID, err) 892 } 893 894 node := existing.(*structs.Node) 895 if err := deleteNodeCSIPlugins(txn, node, index); err != nil { 896 return fmt.Errorf("csi plugin delete failed: %v", err) 897 } 898 } 899 900 if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { 901 return fmt.Errorf("index update failed: %v", err) 902 } 903 904 return nil 905} 906 907// UpdateNodeStatus is used to update the status of a node 908func (s *StateStore) UpdateNodeStatus(msgType structs.MessageType, index uint64, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error { 909 txn := s.db.WriteTxnMsgT(msgType, index) 910 defer txn.Abort() 911 912 if err := s.updateNodeStatusTxn(txn, nodeID, status, updatedAt, event); err != nil { 913 return err 914 } 915 916 return txn.Commit() 917} 918 919func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error { 920 921 // Lookup the node 922 existing, err := txn.First("nodes", "id", nodeID) 923 if err != nil { 924 return fmt.Errorf("node lookup failed: %v", err) 925 } 926 if existing == nil { 927 return fmt.Errorf("node not found") 928 } 929 930 // Copy the existing node 931 existingNode := existing.(*structs.Node) 932 copyNode := existingNode.Copy() 933 copyNode.StatusUpdatedAt = updatedAt 934 935 // Add the event if given 936 if event != nil { 937 appendNodeEvents(txn.Index, copyNode, []*structs.NodeEvent{event}) 938 } 939 940 // Update the status in the copy 941 copyNode.Status = status 942 copyNode.ModifyIndex = txn.Index 943 944 // Insert the node 945 if err := txn.Insert("nodes", copyNode); err != nil { 946 return fmt.Errorf("node update failed: %v", err) 947 } 948 if err := txn.Insert("index", &IndexEntry{"nodes", txn.Index}); err != nil { 949 return fmt.Errorf("index update failed: %v", err) 950 } 951 return nil 952} 953 954// BatchUpdateNodeDrain is used to update the drain of a node set of nodes. 955// This is currently only called when node drain is completed by the drainer. 956func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uint64, updatedAt int64, 957 updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error { 958 txn := s.db.WriteTxnMsgT(msgType, index) 959 defer txn.Abort() 960 for node, update := range updates { 961 if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, updatedAt, 962 events[node], nil, "", true); err != nil { 963 return err 964 } 965 } 966 return txn.Commit() 967} 968 969// UpdateNodeDrain is used to update the drain of a node 970func (s *StateStore) UpdateNodeDrain(msgType structs.MessageType, index uint64, nodeID string, 971 drain *structs.DrainStrategy, markEligible bool, updatedAt int64, 972 event *structs.NodeEvent, drainMeta map[string]string, accessorId string) error { 973 974 txn := s.db.WriteTxnMsgT(msgType, index) 975 defer txn.Abort() 976 if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event, 977 drainMeta, accessorId, false); err != nil { 978 979 return err 980 } 981 return txn.Commit() 982} 983 984func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string, 985 drain *structs.DrainStrategy, markEligible bool, updatedAt int64, 986 event *structs.NodeEvent, drainMeta map[string]string, accessorId string, 987 drainCompleted bool) error { 988 989 // Lookup the node 990 existing, err := txn.First("nodes", "id", nodeID) 991 if err != nil { 992 return fmt.Errorf("node lookup failed: %v", err) 993 } 994 if existing == nil { 995 return fmt.Errorf("node not found") 996 } 997 998 // Copy the existing node 999 existingNode := existing.(*structs.Node) 1000 updatedNode := existingNode.Copy() 1001 updatedNode.StatusUpdatedAt = updatedAt 1002 1003 // Add the event if given 1004 if event != nil { 1005 appendNodeEvents(index, updatedNode, []*structs.NodeEvent{event}) 1006 } 1007 1008 // Update the drain in the copy 1009 updatedNode.DrainStrategy = drain 1010 if drain != nil { 1011 updatedNode.SchedulingEligibility = structs.NodeSchedulingIneligible 1012 } else if markEligible { 1013 updatedNode.SchedulingEligibility = structs.NodeSchedulingEligible 1014 } 1015 1016 // Update LastDrain 1017 updateTime := time.Unix(updatedAt, 0) 1018 1019 // if drain strategy isn't set before or after, this wasn't a drain operation 1020 // in that case, we don't care about .LastDrain 1021 drainNoop := existingNode.DrainStrategy == nil && updatedNode.DrainStrategy == nil 1022 // otherwise, when done with this method, updatedNode.LastDrain should be set 1023 // if starting a new drain operation, create a new LastDrain. otherwise, update the existing one. 1024 startedDraining := existingNode.DrainStrategy == nil && updatedNode.DrainStrategy != nil 1025 if !drainNoop { 1026 if startedDraining { 1027 updatedNode.LastDrain = &structs.DrainMetadata{ 1028 StartedAt: updateTime, 1029 Meta: drainMeta, 1030 } 1031 } else if updatedNode.LastDrain == nil { 1032 // if already draining and LastDrain doesn't exist, we need to create a new one 1033 // this could happen if we upgraded to 1.1.x during a drain 1034 updatedNode.LastDrain = &structs.DrainMetadata{ 1035 // we don't have sub-second accuracy on these fields, so truncate this 1036 StartedAt: time.Unix(existingNode.DrainStrategy.StartedAt.Unix(), 0), 1037 Meta: drainMeta, 1038 } 1039 } 1040 1041 updatedNode.LastDrain.UpdatedAt = updateTime 1042 1043 // won't have new metadata on drain complete; keep the existing operator-provided metadata 1044 // also, keep existing if they didn't provide it 1045 if len(drainMeta) != 0 { 1046 updatedNode.LastDrain.Meta = drainMeta 1047 } 1048 1049 // we won't have an accessor ID on drain complete, so don't overwrite the existing one 1050 if accessorId != "" { 1051 updatedNode.LastDrain.AccessorID = accessorId 1052 } 1053 1054 if updatedNode.DrainStrategy != nil { 1055 updatedNode.LastDrain.Status = structs.DrainStatusDraining 1056 } else if drainCompleted { 1057 updatedNode.LastDrain.Status = structs.DrainStatusComplete 1058 } else { 1059 updatedNode.LastDrain.Status = structs.DrainStatusCanceled 1060 } 1061 } 1062 1063 updatedNode.ModifyIndex = index 1064 1065 // Insert the node 1066 if err := txn.Insert("nodes", updatedNode); err != nil { 1067 return fmt.Errorf("node update failed: %v", err) 1068 } 1069 if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { 1070 return fmt.Errorf("index update failed: %v", err) 1071 } 1072 1073 return nil 1074} 1075 1076// UpdateNodeEligibility is used to update the scheduling eligibility of a node 1077func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error { 1078 1079 txn := s.db.WriteTxnMsgT(msgType, index) 1080 defer txn.Abort() 1081 1082 // Lookup the node 1083 existing, err := txn.First("nodes", "id", nodeID) 1084 if err != nil { 1085 return fmt.Errorf("node lookup failed: %v", err) 1086 } 1087 if existing == nil { 1088 return fmt.Errorf("node not found") 1089 } 1090 1091 // Copy the existing node 1092 existingNode := existing.(*structs.Node) 1093 copyNode := existingNode.Copy() 1094 copyNode.StatusUpdatedAt = updatedAt 1095 1096 // Add the event if given 1097 if event != nil { 1098 appendNodeEvents(index, copyNode, []*structs.NodeEvent{event}) 1099 } 1100 1101 // Check if this is a valid action 1102 if copyNode.DrainStrategy != nil && eligibility == structs.NodeSchedulingEligible { 1103 return fmt.Errorf("can not set node's scheduling eligibility to eligible while it is draining") 1104 } 1105 1106 // Update the eligibility in the copy 1107 copyNode.SchedulingEligibility = eligibility 1108 copyNode.ModifyIndex = index 1109 1110 // Insert the node 1111 if err := txn.Insert("nodes", copyNode); err != nil { 1112 return fmt.Errorf("node update failed: %v", err) 1113 } 1114 if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { 1115 return fmt.Errorf("index update failed: %v", err) 1116 } 1117 1118 return txn.Commit() 1119} 1120 1121// UpsertNodeEvents adds the node events to the nodes, rotating events as 1122// necessary. 1123func (s *StateStore) UpsertNodeEvents(msgType structs.MessageType, index uint64, nodeEvents map[string][]*structs.NodeEvent) error { 1124 txn := s.db.WriteTxnMsgT(msgType, index) 1125 defer txn.Abort() 1126 1127 for nodeID, events := range nodeEvents { 1128 if err := s.upsertNodeEvents(index, nodeID, events, txn); err != nil { 1129 return err 1130 } 1131 } 1132 1133 return txn.Commit() 1134} 1135 1136// upsertNodeEvent upserts a node event for a respective node. It also maintains 1137// that a fixed number of node events are ever stored simultaneously, deleting 1138// older events once this bound has been reached. 1139func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*structs.NodeEvent, txn *txn) error { 1140 // Lookup the node 1141 existing, err := txn.First("nodes", "id", nodeID) 1142 if err != nil { 1143 return fmt.Errorf("node lookup failed: %v", err) 1144 } 1145 if existing == nil { 1146 return fmt.Errorf("node not found") 1147 } 1148 1149 // Copy the existing node 1150 existingNode := existing.(*structs.Node) 1151 copyNode := existingNode.Copy() 1152 appendNodeEvents(index, copyNode, events) 1153 1154 // Insert the node 1155 if err := txn.Insert("nodes", copyNode); err != nil { 1156 return fmt.Errorf("node update failed: %v", err) 1157 } 1158 if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil { 1159 return fmt.Errorf("index update failed: %v", err) 1160 } 1161 1162 return nil 1163} 1164 1165// appendNodeEvents is a helper that takes a node and new events and appends 1166// them, pruning older events as needed. 1167func appendNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEvent) { 1168 // Add the events, updating the indexes 1169 for _, e := range events { 1170 e.CreateIndex = index 1171 node.Events = append(node.Events, e) 1172 } 1173 1174 // Keep node events pruned to not exceed the max allowed 1175 if l := len(node.Events); l > structs.MaxRetainedNodeEvents { 1176 delta := l - structs.MaxRetainedNodeEvents 1177 node.Events = node.Events[delta:] 1178 } 1179} 1180 1181// upsertNodeCSIPlugins indexes csi plugins for volume retrieval, with health. It's called 1182// on upsertNodeEvents, so that event driven health changes are updated 1183func upsertNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error { 1184 1185 loop := func(info *structs.CSIInfo) error { 1186 raw, err := txn.First("csi_plugins", "id", info.PluginID) 1187 if err != nil { 1188 return fmt.Errorf("csi_plugin lookup error: %s %v", info.PluginID, err) 1189 } 1190 1191 var plug *structs.CSIPlugin 1192 if raw != nil { 1193 plug = raw.(*structs.CSIPlugin).Copy() 1194 } else { 1195 if !info.Healthy { 1196 // we don't want to create new plugins for unhealthy 1197 // allocs, otherwise we'd recreate the plugin when we 1198 // get the update for the alloc becoming terminal 1199 return nil 1200 } 1201 plug = structs.NewCSIPlugin(info.PluginID, index) 1202 } 1203 1204 // the plugin may have been created by the job being updated, in which case 1205 // this data will not be configured, it's only available to the fingerprint 1206 // system 1207 plug.Provider = info.Provider 1208 plug.Version = info.ProviderVersion 1209 1210 err = plug.AddPlugin(node.ID, info) 1211 if err != nil { 1212 return err 1213 } 1214 1215 plug.ModifyIndex = index 1216 1217 err = txn.Insert("csi_plugins", plug) 1218 if err != nil { 1219 return fmt.Errorf("csi_plugins insert error: %v", err) 1220 } 1221 1222 return nil 1223 } 1224 1225 inUseController := map[string]struct{}{} 1226 inUseNode := map[string]struct{}{} 1227 1228 for _, info := range node.CSIControllerPlugins { 1229 err := loop(info) 1230 if err != nil { 1231 return err 1232 } 1233 inUseController[info.PluginID] = struct{}{} 1234 } 1235 1236 for _, info := range node.CSINodePlugins { 1237 err := loop(info) 1238 if err != nil { 1239 return err 1240 } 1241 inUseNode[info.PluginID] = struct{}{} 1242 } 1243 1244 // remove the client node from any plugin that's not 1245 // running on it. 1246 iter, err := txn.Get("csi_plugins", "id") 1247 if err != nil { 1248 return fmt.Errorf("csi_plugins lookup failed: %v", err) 1249 } 1250 for { 1251 raw := iter.Next() 1252 if raw == nil { 1253 break 1254 } 1255 plug, ok := raw.(*structs.CSIPlugin) 1256 if !ok { 1257 continue 1258 } 1259 plug = plug.Copy() 1260 1261 var hadDelete bool 1262 if _, ok := inUseController[plug.ID]; !ok { 1263 if _, asController := plug.Controllers[node.ID]; asController { 1264 err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeController) 1265 if err != nil { 1266 return err 1267 } 1268 hadDelete = true 1269 } 1270 } 1271 if _, ok := inUseNode[plug.ID]; !ok { 1272 if _, asNode := plug.Nodes[node.ID]; asNode { 1273 err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeNode) 1274 if err != nil { 1275 return err 1276 } 1277 hadDelete = true 1278 } 1279 } 1280 // we check this flag both for performance and to make sure we 1281 // don't delete a plugin when registering a node plugin but 1282 // no controller 1283 if hadDelete { 1284 err = updateOrGCPlugin(index, txn, plug) 1285 if err != nil { 1286 return err 1287 } 1288 } 1289 } 1290 1291 if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { 1292 return fmt.Errorf("index update failed: %v", err) 1293 } 1294 1295 return nil 1296} 1297 1298// deleteNodeCSIPlugins cleans up CSIInfo node health status, called in DeleteNode 1299func deleteNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error { 1300 if len(node.CSIControllerPlugins) == 0 && len(node.CSINodePlugins) == 0 { 1301 return nil 1302 } 1303 1304 names := map[string]struct{}{} 1305 for _, info := range node.CSIControllerPlugins { 1306 names[info.PluginID] = struct{}{} 1307 } 1308 for _, info := range node.CSINodePlugins { 1309 names[info.PluginID] = struct{}{} 1310 } 1311 1312 for id := range names { 1313 raw, err := txn.First("csi_plugins", "id", id) 1314 if err != nil { 1315 return fmt.Errorf("csi_plugins lookup error %s: %v", id, err) 1316 } 1317 if raw == nil { 1318 // plugin may have been deregistered but we didn't 1319 // update the fingerprint yet 1320 continue 1321 } 1322 1323 plug := raw.(*structs.CSIPlugin).Copy() 1324 err = plug.DeleteNode(node.ID) 1325 if err != nil { 1326 return err 1327 } 1328 err = updateOrGCPlugin(index, txn, plug) 1329 if err != nil { 1330 return err 1331 } 1332 } 1333 1334 if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { 1335 return fmt.Errorf("index update failed: %v", err) 1336 } 1337 1338 return nil 1339} 1340 1341// updateOrGCPlugin updates a plugin but will delete it if the plugin is empty 1342func updateOrGCPlugin(index uint64, txn Txn, plug *structs.CSIPlugin) error { 1343 plug.ModifyIndex = index 1344 1345 if plug.IsEmpty() { 1346 err := txn.Delete("csi_plugins", plug) 1347 if err != nil { 1348 return fmt.Errorf("csi_plugins delete error: %v", err) 1349 } 1350 } else { 1351 err := txn.Insert("csi_plugins", plug) 1352 if err != nil { 1353 return fmt.Errorf("csi_plugins update error %s: %v", plug.ID, err) 1354 } 1355 } 1356 return nil 1357} 1358 1359// deleteJobFromPlugins removes the allocations of this job from any plugins the job is 1360// running, possibly deleting the plugin if it's no longer in use. It's called in DeleteJobTxn 1361func (s *StateStore) deleteJobFromPlugins(index uint64, txn Txn, job *structs.Job) error { 1362 ws := memdb.NewWatchSet() 1363 summary, err := s.JobSummaryByID(ws, job.Namespace, job.ID) 1364 if err != nil { 1365 return fmt.Errorf("error getting job summary: %v", err) 1366 } 1367 1368 allocs, err := s.AllocsByJob(ws, job.Namespace, job.ID, false) 1369 if err != nil { 1370 return fmt.Errorf("error getting allocations: %v", err) 1371 } 1372 1373 type pair struct { 1374 pluginID string 1375 alloc *structs.Allocation 1376 } 1377 1378 plugAllocs := []*pair{} 1379 found := map[string]struct{}{} 1380 1381 // Find plugins for allocs that belong to this job 1382 for _, a := range allocs { 1383 tg := a.Job.LookupTaskGroup(a.TaskGroup) 1384 found[tg.Name] = struct{}{} 1385 for _, t := range tg.Tasks { 1386 if t.CSIPluginConfig == nil { 1387 continue 1388 } 1389 plugAllocs = append(plugAllocs, &pair{ 1390 pluginID: t.CSIPluginConfig.ID, 1391 alloc: a, 1392 }) 1393 } 1394 } 1395 1396 // Find any plugins that do not yet have allocs for this job 1397 for _, tg := range job.TaskGroups { 1398 if _, ok := found[tg.Name]; ok { 1399 continue 1400 } 1401 1402 for _, t := range tg.Tasks { 1403 if t.CSIPluginConfig == nil { 1404 continue 1405 } 1406 plugAllocs = append(plugAllocs, &pair{ 1407 pluginID: t.CSIPluginConfig.ID, 1408 }) 1409 } 1410 } 1411 1412 plugins := map[string]*structs.CSIPlugin{} 1413 1414 for _, x := range plugAllocs { 1415 plug, ok := plugins[x.pluginID] 1416 1417 if !ok { 1418 plug, err = s.CSIPluginByIDTxn(txn, nil, x.pluginID) 1419 if err != nil { 1420 return fmt.Errorf("error getting plugin: %s, %v", x.pluginID, err) 1421 } 1422 if plug == nil { 1423 return fmt.Errorf("plugin missing: %s %v", x.pluginID, err) 1424 } 1425 // only copy once, so we update the same plugin on each alloc 1426 plugins[x.pluginID] = plug.Copy() 1427 plug = plugins[x.pluginID] 1428 } 1429 1430 if x.alloc == nil { 1431 continue 1432 } 1433 err := plug.DeleteAlloc(x.alloc.ID, x.alloc.NodeID) 1434 if err != nil { 1435 return err 1436 } 1437 } 1438 1439 for _, plug := range plugins { 1440 plug.DeleteJob(job, summary) 1441 err = updateOrGCPlugin(index, txn, plug) 1442 if err != nil { 1443 return err 1444 } 1445 } 1446 1447 if err = txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { 1448 return fmt.Errorf("index update failed: %v", err) 1449 } 1450 1451 return nil 1452} 1453 1454// NodeByID is used to lookup a node by ID 1455func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) { 1456 txn := s.db.ReadTxn() 1457 1458 watchCh, existing, err := txn.FirstWatch("nodes", "id", nodeID) 1459 if err != nil { 1460 return nil, fmt.Errorf("node lookup failed: %v", err) 1461 } 1462 ws.Add(watchCh) 1463 1464 if existing != nil { 1465 return existing.(*structs.Node), nil 1466 } 1467 return nil, nil 1468} 1469 1470// NodesByIDPrefix is used to lookup nodes by prefix 1471func (s *StateStore) NodesByIDPrefix(ws memdb.WatchSet, nodeID string) (memdb.ResultIterator, error) { 1472 txn := s.db.ReadTxn() 1473 1474 iter, err := txn.Get("nodes", "id_prefix", nodeID) 1475 if err != nil { 1476 return nil, fmt.Errorf("node lookup failed: %v", err) 1477 } 1478 ws.Add(iter.WatchCh()) 1479 1480 return iter, nil 1481} 1482 1483// NodeBySecretID is used to lookup a node by SecretID 1484func (s *StateStore) NodeBySecretID(ws memdb.WatchSet, secretID string) (*structs.Node, error) { 1485 txn := s.db.ReadTxn() 1486 1487 watchCh, existing, err := txn.FirstWatch("nodes", "secret_id", secretID) 1488 if err != nil { 1489 return nil, fmt.Errorf("node lookup by SecretID failed: %v", err) 1490 } 1491 ws.Add(watchCh) 1492 1493 if existing != nil { 1494 return existing.(*structs.Node), nil 1495 } 1496 return nil, nil 1497} 1498 1499// Nodes returns an iterator over all the nodes 1500func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) { 1501 txn := s.db.ReadTxn() 1502 1503 // Walk the entire nodes table 1504 iter, err := txn.Get("nodes", "id") 1505 if err != nil { 1506 return nil, err 1507 } 1508 ws.Add(iter.WatchCh()) 1509 return iter, nil 1510} 1511 1512// UpsertJob is used to register a job or update a job definition 1513func (s *StateStore) UpsertJob(msgType structs.MessageType, index uint64, job *structs.Job) error { 1514 txn := s.db.WriteTxnMsgT(msgType, index) 1515 defer txn.Abort() 1516 if err := s.upsertJobImpl(index, job, false, txn); err != nil { 1517 return err 1518 } 1519 return txn.Commit() 1520} 1521 1522// UpsertJobTxn is used to register a job or update a job definition, like UpsertJob, 1523// but in a transaction. Useful for when making multiple modifications atomically 1524func (s *StateStore) UpsertJobTxn(index uint64, job *structs.Job, txn Txn) error { 1525 return s.upsertJobImpl(index, job, false, txn) 1526} 1527 1528// upsertJobImpl is the implementation for registering a job or updating a job definition 1529func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *txn) error { 1530 // Assert the namespace exists 1531 if exists, err := s.namespaceExists(txn, job.Namespace); err != nil { 1532 return err 1533 } else if !exists { 1534 return fmt.Errorf("job %q is in nonexistent namespace %q", job.ID, job.Namespace) 1535 } 1536 1537 // Check if the job already exists 1538 existing, err := txn.First("jobs", "id", job.Namespace, job.ID) 1539 var existingJob *structs.Job 1540 if err != nil { 1541 return fmt.Errorf("job lookup failed: %v", err) 1542 } 1543 1544 // Setup the indexes correctly 1545 if existing != nil { 1546 job.CreateIndex = existing.(*structs.Job).CreateIndex 1547 job.ModifyIndex = index 1548 1549 existingJob = existing.(*structs.Job) 1550 1551 // Bump the version unless asked to keep it. This should only be done 1552 // when changing an internal field such as Stable. A spec change should 1553 // always come with a version bump 1554 if !keepVersion { 1555 job.JobModifyIndex = index 1556 if job.Version <= existingJob.Version { 1557 job.Version = existingJob.Version + 1 1558 } 1559 } 1560 1561 // Compute the job status 1562 var err error 1563 job.Status, err = s.getJobStatus(txn, job, false) 1564 if err != nil { 1565 return fmt.Errorf("setting job status for %q failed: %v", job.ID, err) 1566 } 1567 } else { 1568 job.CreateIndex = index 1569 job.ModifyIndex = index 1570 job.JobModifyIndex = index 1571 1572 if err := s.setJobStatus(index, txn, job, false, ""); err != nil { 1573 return fmt.Errorf("setting job status for %q failed: %v", job.ID, err) 1574 } 1575 1576 // Have to get the job again since it could have been updated 1577 updated, err := txn.First("jobs", "id", job.Namespace, job.ID) 1578 if err != nil { 1579 return fmt.Errorf("job lookup failed: %v", err) 1580 } 1581 if updated != nil { 1582 job = updated.(*structs.Job) 1583 } 1584 } 1585 1586 if err := s.updateSummaryWithJob(index, job, txn); err != nil { 1587 return fmt.Errorf("unable to create job summary: %v", err) 1588 } 1589 1590 if err := s.upsertJobVersion(index, job, txn); err != nil { 1591 return fmt.Errorf("unable to upsert job into job_version table: %v", err) 1592 } 1593 1594 if err := s.updateJobScalingPolicies(index, job, txn); err != nil { 1595 return fmt.Errorf("unable to update job scaling policies: %v", err) 1596 } 1597 1598 if err := s.updateJobRecommendations(index, txn, existingJob, job); err != nil { 1599 return fmt.Errorf("unable to update job recommendations: %v", err) 1600 } 1601 1602 if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil { 1603 return fmt.Errorf("unable to update job scaling policies: %v", err) 1604 } 1605 1606 // Insert the job 1607 if err := txn.Insert("jobs", job); err != nil { 1608 return fmt.Errorf("job insert failed: %v", err) 1609 } 1610 if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil { 1611 return fmt.Errorf("index update failed: %v", err) 1612 } 1613 1614 return nil 1615} 1616 1617// DeleteJob is used to deregister a job 1618func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error { 1619 txn := s.db.WriteTxn(index) 1620 defer txn.Abort() 1621 1622 err := s.DeleteJobTxn(index, namespace, jobID, txn) 1623 if err == nil { 1624 return txn.Commit() 1625 } 1626 return err 1627} 1628 1629// DeleteJobTxn is used to deregister a job, like DeleteJob, 1630// but in a transaction. Useful for when making multiple modifications atomically 1631func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn) error { 1632 // Lookup the node 1633 existing, err := txn.First("jobs", "id", namespace, jobID) 1634 if err != nil { 1635 return fmt.Errorf("job lookup failed: %v", err) 1636 } 1637 if existing == nil { 1638 return fmt.Errorf("job not found") 1639 } 1640 1641 // Check if we should update a parent job summary 1642 job := existing.(*structs.Job) 1643 if job.ParentID != "" { 1644 summaryRaw, err := txn.First("job_summary", "id", namespace, job.ParentID) 1645 if err != nil { 1646 return fmt.Errorf("unable to retrieve summary for parent job: %v", err) 1647 } 1648 1649 // Only continue if the summary exists. It could not exist if the parent 1650 // job was removed 1651 if summaryRaw != nil { 1652 existing := summaryRaw.(*structs.JobSummary) 1653 pSummary := existing.Copy() 1654 if pSummary.Children != nil { 1655 1656 modified := false 1657 switch job.Status { 1658 case structs.JobStatusPending: 1659 pSummary.Children.Pending-- 1660 pSummary.Children.Dead++ 1661 modified = true 1662 case structs.JobStatusRunning: 1663 pSummary.Children.Running-- 1664 pSummary.Children.Dead++ 1665 modified = true 1666 case structs.JobStatusDead: 1667 default: 1668 return fmt.Errorf("unknown old job status %q", job.Status) 1669 } 1670 1671 if modified { 1672 // Update the modify index 1673 pSummary.ModifyIndex = index 1674 1675 // Insert the summary 1676 if err := txn.Insert("job_summary", pSummary); err != nil { 1677 return fmt.Errorf("job summary insert failed: %v", err) 1678 } 1679 if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { 1680 return fmt.Errorf("index update failed: %v", err) 1681 } 1682 } 1683 } 1684 } 1685 } 1686 1687 // Delete the job 1688 if err := txn.Delete("jobs", existing); err != nil { 1689 return fmt.Errorf("job delete failed: %v", err) 1690 } 1691 if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil { 1692 return fmt.Errorf("index update failed: %v", err) 1693 } 1694 1695 // Delete the job versions 1696 if err := s.deleteJobVersions(index, job, txn); err != nil { 1697 return err 1698 } 1699 1700 // Cleanup plugins registered by this job, before we delete the summary 1701 err = s.deleteJobFromPlugins(index, txn, job) 1702 if err != nil { 1703 return fmt.Errorf("deleting job from plugin: %v", err) 1704 } 1705 1706 // Delete the job summary 1707 if _, err = txn.DeleteAll("job_summary", "id", namespace, jobID); err != nil { 1708 return fmt.Errorf("deleting job summary failed: %v", err) 1709 } 1710 if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { 1711 return fmt.Errorf("index update failed: %v", err) 1712 } 1713 1714 // Delete any remaining job scaling policies 1715 if err := s.deleteJobScalingPolicies(index, job, txn); err != nil { 1716 return fmt.Errorf("deleting job scaling policies failed: %v", err) 1717 } 1718 1719 // Delete any job recommendations 1720 if err := s.deleteRecommendationsByJob(index, txn, job); err != nil { 1721 return fmt.Errorf("deleting job recommendatons failed: %v", err) 1722 } 1723 1724 // Delete the scaling events 1725 if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil { 1726 return fmt.Errorf("deleting job scaling events failed: %v", err) 1727 } 1728 if err := txn.Insert("index", &IndexEntry{"scaling_event", index}); err != nil { 1729 return fmt.Errorf("index update failed: %v", err) 1730 } 1731 1732 return nil 1733} 1734 1735// deleteJobScalingPolicies deletes any scaling policies associated with the job 1736func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *txn) error { 1737 iter, err := s.ScalingPoliciesByJobTxn(nil, job.Namespace, job.ID, txn) 1738 if err != nil { 1739 return fmt.Errorf("getting job scaling policies for deletion failed: %v", err) 1740 } 1741 1742 // Put them into a slice so there are no safety concerns while actually 1743 // performing the deletes 1744 policies := []interface{}{} 1745 for { 1746 raw := iter.Next() 1747 if raw == nil { 1748 break 1749 } 1750 policies = append(policies, raw) 1751 } 1752 1753 // Do the deletes 1754 for _, p := range policies { 1755 if err := txn.Delete("scaling_policy", p); err != nil { 1756 return fmt.Errorf("deleting scaling policy failed: %v", err) 1757 } 1758 } 1759 1760 if len(policies) > 0 { 1761 if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { 1762 return fmt.Errorf("index update failed: %v", err) 1763 } 1764 } 1765 return nil 1766} 1767 1768// deleteJobVersions deletes all versions of the given job. 1769func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *txn) error { 1770 iter, err := txn.Get("job_version", "id_prefix", job.Namespace, job.ID) 1771 if err != nil { 1772 return err 1773 } 1774 1775 // Put them into a slice so there are no safety concerns while actually 1776 // performing the deletes 1777 jobs := []*structs.Job{} 1778 for { 1779 raw := iter.Next() 1780 if raw == nil { 1781 break 1782 } 1783 1784 // Ensure the ID is an exact match 1785 j := raw.(*structs.Job) 1786 if j.ID != job.ID { 1787 continue 1788 } 1789 1790 jobs = append(jobs, j) 1791 } 1792 1793 // Do the deletes 1794 for _, j := range jobs { 1795 if err := txn.Delete("job_version", j); err != nil { 1796 return fmt.Errorf("deleting job versions failed: %v", err) 1797 } 1798 } 1799 1800 if err := txn.Insert("index", &IndexEntry{"job_version", index}); err != nil { 1801 return fmt.Errorf("index update failed: %v", err) 1802 } 1803 1804 return nil 1805} 1806 1807// upsertJobVersion inserts a job into its historic version table and limits the 1808// number of job versions that are tracked. 1809func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) error { 1810 // Insert the job 1811 if err := txn.Insert("job_version", job); err != nil { 1812 return fmt.Errorf("failed to insert job into job_version table: %v", err) 1813 } 1814 1815 if err := txn.Insert("index", &IndexEntry{"job_version", index}); err != nil { 1816 return fmt.Errorf("index update failed: %v", err) 1817 } 1818 1819 // Get all the historic jobs for this ID 1820 all, err := s.jobVersionByID(txn, nil, job.Namespace, job.ID) 1821 if err != nil { 1822 return fmt.Errorf("failed to look up job versions for %q: %v", job.ID, err) 1823 } 1824 1825 // If we are below the limit there is no GCing to be done 1826 if len(all) <= structs.JobTrackedVersions { 1827 return nil 1828 } 1829 1830 // We have to delete a historic job to make room. 1831 // Find index of the highest versioned stable job 1832 stableIdx := -1 1833 for i, j := range all { 1834 if j.Stable { 1835 stableIdx = i 1836 break 1837 } 1838 } 1839 1840 // If the stable job is the oldest version, do a swap to bring it into the 1841 // keep set. 1842 max := structs.JobTrackedVersions 1843 if stableIdx == max { 1844 all[max-1], all[max] = all[max], all[max-1] 1845 } 1846 1847 // Delete the job outside of the set that are being kept. 1848 d := all[max] 1849 if err := txn.Delete("job_version", d); err != nil { 1850 return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version) 1851 } 1852 1853 return nil 1854} 1855 1856// JobByID is used to lookup a job by its ID. JobByID returns the current/latest job 1857// version. 1858func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error) { 1859 txn := s.db.ReadTxn() 1860 return s.JobByIDTxn(ws, namespace, id, txn) 1861} 1862 1863// JobByIDTxn is used to lookup a job by its ID, like JobByID. JobByID returns the job version 1864// accessible through in the transaction 1865func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn) (*structs.Job, error) { 1866 watchCh, existing, err := txn.FirstWatch("jobs", "id", namespace, id) 1867 if err != nil { 1868 return nil, fmt.Errorf("job lookup failed: %v", err) 1869 } 1870 ws.Add(watchCh) 1871 1872 if existing != nil { 1873 return existing.(*structs.Job), nil 1874 } 1875 return nil, nil 1876} 1877 1878// JobsByIDPrefix is used to lookup a job by prefix 1879func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { 1880 txn := s.db.ReadTxn() 1881 1882 iter, err := txn.Get("jobs", "id_prefix", namespace, id) 1883 if err != nil { 1884 return nil, fmt.Errorf("job lookup failed: %v", err) 1885 } 1886 1887 ws.Add(iter.WatchCh()) 1888 1889 return iter, nil 1890} 1891 1892// JobVersionsByID returns all the tracked versions of a job. 1893func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { 1894 txn := s.db.ReadTxn() 1895 1896 return s.jobVersionByID(txn, ws, namespace, id) 1897} 1898 1899// jobVersionByID is the underlying implementation for retrieving all tracked 1900// versions of a job and is called under an existing transaction. A watch set 1901// can optionally be passed in to add the job histories to the watch set. 1902func (s *StateStore) jobVersionByID(txn *txn, ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { 1903 // Get all the historic jobs for this ID 1904 iter, err := txn.Get("job_version", "id_prefix", namespace, id) 1905 if err != nil { 1906 return nil, err 1907 } 1908 1909 ws.Add(iter.WatchCh()) 1910 1911 var all []*structs.Job 1912 for { 1913 raw := iter.Next() 1914 if raw == nil { 1915 break 1916 } 1917 1918 // Ensure the ID is an exact match 1919 j := raw.(*structs.Job) 1920 if j.ID != id { 1921 continue 1922 } 1923 1924 all = append(all, j) 1925 } 1926 1927 // Sort in reverse order so that the highest version is first 1928 sort.Slice(all, func(i, j int) bool { 1929 return all[i].Version > all[j].Version 1930 }) 1931 1932 return all, nil 1933} 1934 1935// JobByIDAndVersion returns the job identified by its ID and Version. The 1936// passed watchset may be nil. 1937func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, version uint64) (*structs.Job, error) { 1938 txn := s.db.ReadTxn() 1939 return s.jobByIDAndVersionImpl(ws, namespace, id, version, txn) 1940} 1941 1942// jobByIDAndVersionImpl returns the job identified by its ID and Version. The 1943// passed watchset may be nil. 1944func (s *StateStore) jobByIDAndVersionImpl(ws memdb.WatchSet, namespace, id string, 1945 version uint64, txn *txn) (*structs.Job, error) { 1946 1947 watchCh, existing, err := txn.FirstWatch("job_version", "id", namespace, id, version) 1948 if err != nil { 1949 return nil, err 1950 } 1951 1952 ws.Add(watchCh) 1953 1954 if existing != nil { 1955 job := existing.(*structs.Job) 1956 return job, nil 1957 } 1958 1959 return nil, nil 1960} 1961 1962func (s *StateStore) JobVersions(ws memdb.WatchSet) (memdb.ResultIterator, error) { 1963 txn := s.db.ReadTxn() 1964 1965 // Walk the entire deployments table 1966 iter, err := txn.Get("job_version", "id") 1967 if err != nil { 1968 return nil, err 1969 } 1970 1971 ws.Add(iter.WatchCh()) 1972 return iter, nil 1973} 1974 1975// Jobs returns an iterator over all the jobs 1976func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) { 1977 txn := s.db.ReadTxn() 1978 1979 // Walk the entire jobs table 1980 iter, err := txn.Get("jobs", "id") 1981 if err != nil { 1982 return nil, err 1983 } 1984 1985 ws.Add(iter.WatchCh()) 1986 1987 return iter, nil 1988} 1989 1990// JobsByNamespace returns an iterator over all the jobs for the given namespace 1991func (s *StateStore) JobsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { 1992 txn := s.db.ReadTxn() 1993 return s.jobsByNamespaceImpl(ws, namespace, txn) 1994} 1995 1996// jobsByNamespaceImpl returns an iterator over all the jobs for the given namespace 1997func (s *StateStore) jobsByNamespaceImpl(ws memdb.WatchSet, namespace string, txn *txn) (memdb.ResultIterator, error) { 1998 // Walk the entire jobs table 1999 iter, err := txn.Get("jobs", "id_prefix", namespace, "") 2000 if err != nil { 2001 return nil, err 2002 } 2003 2004 ws.Add(iter.WatchCh()) 2005 2006 return iter, nil 2007} 2008 2009// JobsByPeriodic returns an iterator over all the periodic or non-periodic jobs. 2010func (s *StateStore) JobsByPeriodic(ws memdb.WatchSet, periodic bool) (memdb.ResultIterator, error) { 2011 txn := s.db.ReadTxn() 2012 2013 iter, err := txn.Get("jobs", "periodic", periodic) 2014 if err != nil { 2015 return nil, err 2016 } 2017 2018 ws.Add(iter.WatchCh()) 2019 2020 return iter, nil 2021} 2022 2023// JobsByScheduler returns an iterator over all the jobs with the specific 2024// scheduler type. 2025func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (memdb.ResultIterator, error) { 2026 txn := s.db.ReadTxn() 2027 2028 // Return an iterator for jobs with the specific type. 2029 iter, err := txn.Get("jobs", "type", schedulerType) 2030 if err != nil { 2031 return nil, err 2032 } 2033 2034 ws.Add(iter.WatchCh()) 2035 2036 return iter, nil 2037} 2038 2039// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage 2040// collection. 2041func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, error) { 2042 txn := s.db.ReadTxn() 2043 2044 iter, err := txn.Get("jobs", "gc", gc) 2045 if err != nil { 2046 return nil, err 2047 } 2048 2049 ws.Add(iter.WatchCh()) 2050 2051 return iter, nil 2052} 2053 2054// JobSummary returns a job summary object which matches a specific id. 2055func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) { 2056 txn := s.db.ReadTxn() 2057 2058 watchCh, existing, err := txn.FirstWatch("job_summary", "id", namespace, jobID) 2059 if err != nil { 2060 return nil, err 2061 } 2062 2063 ws.Add(watchCh) 2064 2065 if existing != nil { 2066 summary := existing.(*structs.JobSummary) 2067 return summary, nil 2068 } 2069 2070 return nil, nil 2071} 2072 2073// JobSummaries walks the entire job summary table and returns all the job 2074// summary objects 2075func (s *StateStore) JobSummaries(ws memdb.WatchSet) (memdb.ResultIterator, error) { 2076 txn := s.db.ReadTxn() 2077 2078 iter, err := txn.Get("job_summary", "id") 2079 if err != nil { 2080 return nil, err 2081 } 2082 2083 ws.Add(iter.WatchCh()) 2084 2085 return iter, nil 2086} 2087 2088// JobSummaryByPrefix is used to look up Job Summary by id prefix 2089func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { 2090 txn := s.db.ReadTxn() 2091 2092 iter, err := txn.Get("job_summary", "id_prefix", namespace, id) 2093 if err != nil { 2094 return nil, fmt.Errorf("job_summary lookup failed: %v", err) 2095 } 2096 2097 ws.Add(iter.WatchCh()) 2098 2099 return iter, nil 2100} 2101 2102// CSIVolumeRegister adds a volume to the server store, failing if it already exists 2103func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolume) error { 2104 txn := s.db.WriteTxn(index) 2105 defer txn.Abort() 2106 2107 for _, v := range volumes { 2108 if exists, err := s.namespaceExists(txn, v.Namespace); err != nil { 2109 return err 2110 } else if !exists { 2111 return fmt.Errorf("volume %s is in nonexistent namespace %s", v.ID, v.Namespace) 2112 } 2113 2114 // Check for volume existence 2115 obj, err := txn.First("csi_volumes", "id", v.Namespace, v.ID) 2116 if err != nil { 2117 return fmt.Errorf("volume existence check error: %v", err) 2118 } 2119 if obj != nil { 2120 // Allow some properties of a volume to be updated in place, but 2121 // prevent accidentally overwriting important properties, or 2122 // overwriting a volume in use 2123 old, ok := obj.(*structs.CSIVolume) 2124 if ok && 2125 old.InUse() || 2126 old.ExternalID != v.ExternalID || 2127 old.PluginID != v.PluginID || 2128 old.Provider != v.Provider { 2129 return fmt.Errorf("volume exists: %s", v.ID) 2130 } 2131 } 2132 2133 if v.CreateIndex == 0 { 2134 v.CreateIndex = index 2135 v.ModifyIndex = index 2136 } 2137 2138 // Allocations are copy on write, so we want to keep the Allocation ID 2139 // but we need to clear the pointer so that we don't store it when we 2140 // write the volume to the state store. We'll get it from the db in 2141 // denormalize. 2142 for allocID := range v.ReadAllocs { 2143 v.ReadAllocs[allocID] = nil 2144 } 2145 for allocID := range v.WriteAllocs { 2146 v.WriteAllocs[allocID] = nil 2147 } 2148 2149 err = txn.Insert("csi_volumes", v) 2150 if err != nil { 2151 return fmt.Errorf("volume insert: %v", err) 2152 } 2153 } 2154 2155 if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil { 2156 return fmt.Errorf("index update failed: %v", err) 2157 } 2158 2159 return txn.Commit() 2160} 2161 2162// CSIVolumes returns the unfiltered list of all volumes. Caller should 2163// snapshot if it wants to also denormalize the plugins. 2164func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) { 2165 txn := s.db.ReadTxn() 2166 defer txn.Abort() 2167 2168 iter, err := txn.Get("csi_volumes", "id") 2169 if err != nil { 2170 return nil, fmt.Errorf("csi_volumes lookup failed: %v", err) 2171 } 2172 2173 ws.Add(iter.WatchCh()) 2174 2175 return iter, nil 2176} 2177 2178// CSIVolumeByID is used to lookup a single volume. Returns a copy of the 2179// volume because its plugins and allocations are denormalized to provide 2180// accurate Health. 2181func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) { 2182 txn := s.db.ReadTxn() 2183 2184 watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", namespace, id) 2185 if err != nil { 2186 return nil, fmt.Errorf("volume lookup failed for %s: %v", id, err) 2187 } 2188 ws.Add(watchCh) 2189 2190 if obj == nil { 2191 return nil, nil 2192 } 2193 vol := obj.(*structs.CSIVolume) 2194 2195 // we return the volume with the plugins denormalized by default, 2196 // because the scheduler needs them for feasibility checking 2197 return s.CSIVolumeDenormalizePluginsTxn(txn, vol.Copy()) 2198} 2199 2200// CSIVolumes looks up csi_volumes by pluginID. Caller should snapshot if it 2201// wants to also denormalize the plugins. 2202func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix, pluginID string) (memdb.ResultIterator, error) { 2203 txn := s.db.ReadTxn() 2204 2205 iter, err := txn.Get("csi_volumes", "plugin_id", pluginID) 2206 if err != nil { 2207 return nil, fmt.Errorf("volume lookup failed: %v", err) 2208 } 2209 2210 // Filter the iterator by namespace 2211 f := func(raw interface{}) bool { 2212 v, ok := raw.(*structs.CSIVolume) 2213 if !ok { 2214 return false 2215 } 2216 return v.Namespace != namespace && strings.HasPrefix(v.ID, prefix) 2217 } 2218 2219 wrap := memdb.NewFilterIterator(iter, f) 2220 return wrap, nil 2221} 2222 2223// CSIVolumesByIDPrefix supports search. Caller should snapshot if it wants to 2224// also denormalize the plugins. 2225func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID string) (memdb.ResultIterator, error) { 2226 txn := s.db.ReadTxn() 2227 2228 iter, err := txn.Get("csi_volumes", "id_prefix", namespace, volumeID) 2229 if err != nil { 2230 return nil, err 2231 } 2232 2233 ws.Add(iter.WatchCh()) 2234 2235 return iter, nil 2236} 2237 2238// CSIVolumesByNodeID looks up CSIVolumes in use on a node. Caller should 2239// snapshot if it wants to also denormalize the plugins. 2240func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string) (memdb.ResultIterator, error) { 2241 allocs, err := s.AllocsByNode(ws, nodeID) 2242 if err != nil { 2243 return nil, fmt.Errorf("alloc lookup failed: %v", err) 2244 } 2245 2246 // Find volume ids for CSI volumes in running allocs, or allocs that we desire to run 2247 ids := map[string]string{} // Map volumeID to Namespace 2248 for _, a := range allocs { 2249 tg := a.Job.LookupTaskGroup(a.TaskGroup) 2250 2251 if !(a.DesiredStatus == structs.AllocDesiredStatusRun || 2252 a.ClientStatus == structs.AllocClientStatusRunning) || 2253 len(tg.Volumes) == 0 { 2254 continue 2255 } 2256 2257 for _, v := range tg.Volumes { 2258 if v.Type != structs.VolumeTypeCSI { 2259 continue 2260 } 2261 ids[v.Source] = a.Namespace 2262 } 2263 } 2264 2265 // Lookup the raw CSIVolumes to match the other list interfaces 2266 iter := NewSliceIterator() 2267 txn := s.db.ReadTxn() 2268 for id, namespace := range ids { 2269 if strings.HasPrefix(id, prefix) { 2270 watchCh, raw, err := txn.FirstWatch("csi_volumes", "id", namespace, id) 2271 if err != nil { 2272 return nil, fmt.Errorf("volume lookup failed: %s %v", id, err) 2273 } 2274 ws.Add(watchCh) 2275 iter.Add(raw) 2276 } 2277 } 2278 2279 return iter, nil 2280} 2281 2282// CSIVolumesByNamespace looks up the entire csi_volumes table 2283func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace, prefix string) (memdb.ResultIterator, error) { 2284 txn := s.db.ReadTxn() 2285 2286 iter, err := txn.Get("csi_volumes", "id_prefix", namespace, prefix) 2287 if err != nil { 2288 return nil, fmt.Errorf("volume lookup failed: %v", err) 2289 } 2290 2291 ws.Add(iter.WatchCh()) 2292 2293 return iter, nil 2294} 2295 2296// CSIVolumeClaim updates the volume's claim count and allocation list 2297func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *structs.CSIVolumeClaim) error { 2298 txn := s.db.WriteTxn(index) 2299 defer txn.Abort() 2300 2301 row, err := txn.First("csi_volumes", "id", namespace, id) 2302 if err != nil { 2303 return fmt.Errorf("volume lookup failed: %s: %v", id, err) 2304 } 2305 if row == nil { 2306 return fmt.Errorf("volume not found: %s", id) 2307 } 2308 2309 orig, ok := row.(*structs.CSIVolume) 2310 if !ok { 2311 return fmt.Errorf("volume row conversion error") 2312 } 2313 2314 var alloc *structs.Allocation 2315 if claim.State == structs.CSIVolumeClaimStateTaken { 2316 alloc, err = s.allocByIDImpl(txn, nil, claim.AllocationID) 2317 if err != nil { 2318 s.logger.Error("AllocByID failed", "error", err) 2319 return fmt.Errorf(structs.ErrUnknownAllocationPrefix) 2320 } 2321 if alloc == nil { 2322 s.logger.Error("AllocByID failed to find alloc", "alloc_id", claim.AllocationID) 2323 if err != nil { 2324 return fmt.Errorf(structs.ErrUnknownAllocationPrefix) 2325 } 2326 } 2327 } 2328 2329 volume, err := s.CSIVolumeDenormalizePluginsTxn(txn, orig.Copy()) 2330 if err != nil { 2331 return err 2332 } 2333 volume, err = s.CSIVolumeDenormalizeTxn(txn, nil, volume) 2334 if err != nil { 2335 return err 2336 } 2337 2338 // in the case of a job deregistration, there will be no allocation ID 2339 // for the claim but we still want to write an updated index to the volume 2340 // so that volume reaping is triggered 2341 if claim.AllocationID != "" { 2342 err = volume.Claim(claim, alloc) 2343 if err != nil { 2344 return err 2345 } 2346 } 2347 2348 volume.ModifyIndex = index 2349 2350 // Allocations are copy on write, so we want to keep the Allocation ID 2351 // but we need to clear the pointer so that we don't store it when we 2352 // write the volume to the state store. We'll get it from the db in 2353 // denormalize. 2354 for allocID := range volume.ReadAllocs { 2355 volume.ReadAllocs[allocID] = nil 2356 } 2357 for allocID := range volume.WriteAllocs { 2358 volume.WriteAllocs[allocID] = nil 2359 } 2360 2361 if err = txn.Insert("csi_volumes", volume); err != nil { 2362 return fmt.Errorf("volume update failed: %s: %v", id, err) 2363 } 2364 2365 if err = txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil { 2366 return fmt.Errorf("index update failed: %v", err) 2367 } 2368 2369 return txn.Commit() 2370} 2371 2372// CSIVolumeDeregister removes the volume from the server 2373func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []string, force bool) error { 2374 txn := s.db.WriteTxn(index) 2375 defer txn.Abort() 2376 2377 for _, id := range ids { 2378 existing, err := txn.First("csi_volumes", "id_prefix", namespace, id) 2379 if err != nil { 2380 return fmt.Errorf("volume lookup failed: %s: %v", id, err) 2381 } 2382 2383 if existing == nil { 2384 return fmt.Errorf("volume not found: %s", id) 2385 } 2386 2387 vol, ok := existing.(*structs.CSIVolume) 2388 if !ok { 2389 return fmt.Errorf("volume row conversion error: %s", id) 2390 } 2391 2392 // The common case for a volume deregister is when the volume is 2393 // unused, but we can also let an operator intervene in the case where 2394 // allocations have been stopped but claims can't be freed because 2395 // ex. the plugins have all been removed. 2396 if vol.InUse() { 2397 if !force || !s.volSafeToForce(txn, vol) { 2398 return fmt.Errorf("volume in use: %s", id) 2399 } 2400 } 2401 2402 if err = txn.Delete("csi_volumes", existing); err != nil { 2403 return fmt.Errorf("volume delete failed: %s: %v", id, err) 2404 } 2405 } 2406 2407 if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil { 2408 return fmt.Errorf("index update failed: %v", err) 2409 } 2410 2411 return txn.Commit() 2412} 2413 2414// volSafeToForce checks if the any of the remaining allocations 2415// are in a non-terminal state. 2416func (s *StateStore) volSafeToForce(txn Txn, v *structs.CSIVolume) bool { 2417 vol, err := s.CSIVolumeDenormalizeTxn(txn, nil, v) 2418 if err != nil { 2419 return false 2420 } 2421 2422 for _, alloc := range vol.ReadAllocs { 2423 if alloc != nil && !alloc.TerminalStatus() { 2424 return false 2425 } 2426 } 2427 for _, alloc := range vol.WriteAllocs { 2428 if alloc != nil && !alloc.TerminalStatus() { 2429 return false 2430 } 2431 } 2432 return true 2433} 2434 2435// CSIVolumeDenormalizePlugins returns a CSIVolume with current health and 2436// plugins, but without allocations. 2437// Use this for current volume metadata, handling lists of volumes. 2438// Use CSIVolumeDenormalize for volumes containing both health and current 2439// allocations. 2440func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { 2441 if vol == nil { 2442 return nil, nil 2443 } 2444 txn := s.db.ReadTxn() 2445 defer txn.Abort() 2446 return s.CSIVolumeDenormalizePluginsTxn(txn, vol) 2447} 2448 2449// CSIVolumeDenormalizePluginsTxn returns a CSIVolume with current health and 2450// plugins, but without allocations. 2451// Use this for current volume metadata, handling lists of volumes. 2452// Use CSIVolumeDenormalize for volumes containing both health and current 2453// allocations. 2454func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) { 2455 if vol == nil { 2456 return nil, nil 2457 } 2458 plug, err := s.CSIPluginByIDTxn(txn, nil, vol.PluginID) 2459 if err != nil { 2460 return nil, fmt.Errorf("plugin lookup error: %s %v", vol.PluginID, err) 2461 } 2462 if plug == nil { 2463 vol.ControllersHealthy = 0 2464 vol.NodesHealthy = 0 2465 vol.Schedulable = false 2466 return vol, nil 2467 } 2468 2469 vol.Provider = plug.Provider 2470 vol.ProviderVersion = plug.Version 2471 vol.ControllerRequired = plug.ControllerRequired 2472 vol.ControllersHealthy = plug.ControllersHealthy 2473 vol.NodesHealthy = plug.NodesHealthy 2474 2475 // This value may be stale, but stale is ok 2476 vol.ControllersExpected = plug.ControllersExpected 2477 vol.NodesExpected = plug.NodesExpected 2478 2479 vol.Schedulable = vol.NodesHealthy > 0 2480 if vol.ControllerRequired { 2481 vol.Schedulable = vol.ControllersHealthy > 0 && vol.Schedulable 2482 } 2483 2484 return vol, nil 2485} 2486 2487// CSIVolumeDenormalize returns a CSIVolume with allocations 2488func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { 2489 txn := s.db.ReadTxn() 2490 return s.CSIVolumeDenormalizeTxn(txn, ws, vol) 2491} 2492 2493// CSIVolumeDenormalizeTxn populates a CSIVolume with allocations 2494func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { 2495 if vol == nil { 2496 return nil, nil 2497 } 2498 for id := range vol.ReadAllocs { 2499 a, err := s.allocByIDImpl(txn, ws, id) 2500 if err != nil { 2501 return nil, err 2502 } 2503 if a != nil { 2504 vol.ReadAllocs[id] = a 2505 // COMPAT(1.0): the CSIVolumeClaim fields were added 2506 // after 0.11.1, so claims made before that may be 2507 // missing this value. (same for WriteAlloc below) 2508 if _, ok := vol.ReadClaims[id]; !ok { 2509 vol.ReadClaims[id] = &structs.CSIVolumeClaim{ 2510 AllocationID: a.ID, 2511 NodeID: a.NodeID, 2512 Mode: structs.CSIVolumeClaimRead, 2513 State: structs.CSIVolumeClaimStateTaken, 2514 } 2515 } 2516 } 2517 } 2518 2519 for id := range vol.WriteAllocs { 2520 a, err := s.allocByIDImpl(txn, ws, id) 2521 if err != nil { 2522 return nil, err 2523 } 2524 if a != nil { 2525 vol.WriteAllocs[id] = a 2526 if _, ok := vol.WriteClaims[id]; !ok { 2527 vol.WriteClaims[id] = &structs.CSIVolumeClaim{ 2528 AllocationID: a.ID, 2529 NodeID: a.NodeID, 2530 Mode: structs.CSIVolumeClaimWrite, 2531 State: structs.CSIVolumeClaimStateTaken, 2532 } 2533 } 2534 } 2535 } 2536 2537 // COMPAT: the AccessMode and AttachmentMode fields were added to claims 2538 // in 1.1.0, so claims made before that may be missing this value. In this 2539 // case, the volume will already have AccessMode/AttachmentMode until it 2540 // no longer has any claims, so set from those values 2541 for _, claim := range vol.ReadClaims { 2542 if claim.AccessMode == "" || claim.AttachmentMode == "" { 2543 claim.AccessMode = vol.AccessMode 2544 claim.AttachmentMode = vol.AttachmentMode 2545 } 2546 } 2547 for _, claim := range vol.WriteClaims { 2548 if claim.AccessMode == "" || claim.AttachmentMode == "" { 2549 claim.AccessMode = vol.AccessMode 2550 claim.AttachmentMode = vol.AttachmentMode 2551 } 2552 } 2553 2554 return vol, nil 2555} 2556 2557// CSIPlugins returns the unfiltered list of all plugin health status 2558func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error) { 2559 txn := s.db.ReadTxn() 2560 defer txn.Abort() 2561 2562 iter, err := txn.Get("csi_plugins", "id") 2563 if err != nil { 2564 return nil, fmt.Errorf("csi_plugins lookup failed: %v", err) 2565 } 2566 2567 ws.Add(iter.WatchCh()) 2568 2569 return iter, nil 2570} 2571 2572// CSIPluginsByIDPrefix supports search 2573func (s *StateStore) CSIPluginsByIDPrefix(ws memdb.WatchSet, pluginID string) (memdb.ResultIterator, error) { 2574 txn := s.db.ReadTxn() 2575 2576 iter, err := txn.Get("csi_plugins", "id_prefix", pluginID) 2577 if err != nil { 2578 return nil, err 2579 } 2580 2581 ws.Add(iter.WatchCh()) 2582 2583 return iter, nil 2584} 2585 2586// CSIPluginByID returns a named CSIPlugin. This method creates a new 2587// transaction so you should not call it from within another transaction. 2588func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) { 2589 txn := s.db.ReadTxn() 2590 plugin, err := s.CSIPluginByIDTxn(txn, ws, id) 2591 if err != nil { 2592 return nil, err 2593 } 2594 return plugin, nil 2595} 2596 2597// CSIPluginByIDTxn returns a named CSIPlugin 2598func (s *StateStore) CSIPluginByIDTxn(txn Txn, ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) { 2599 2600 watchCh, obj, err := txn.FirstWatch("csi_plugins", "id_prefix", id) 2601 if err != nil { 2602 return nil, fmt.Errorf("csi_plugin lookup failed: %s %v", id, err) 2603 } 2604 2605 ws.Add(watchCh) 2606 2607 if obj != nil { 2608 return obj.(*structs.CSIPlugin), nil 2609 } 2610 return nil, nil 2611} 2612 2613// CSIPluginDenormalize returns a CSIPlugin with allocation details. Always called on a copy of the plugin. 2614func (s *StateStore) CSIPluginDenormalize(ws memdb.WatchSet, plug *structs.CSIPlugin) (*structs.CSIPlugin, error) { 2615 txn := s.db.ReadTxn() 2616 return s.CSIPluginDenormalizeTxn(txn, ws, plug) 2617} 2618 2619func (s *StateStore) CSIPluginDenormalizeTxn(txn Txn, ws memdb.WatchSet, plug *structs.CSIPlugin) (*structs.CSIPlugin, error) { 2620 if plug == nil { 2621 return nil, nil 2622 } 2623 2624 // Get the unique list of allocation ids 2625 ids := map[string]struct{}{} 2626 for _, info := range plug.Controllers { 2627 ids[info.AllocID] = struct{}{} 2628 } 2629 for _, info := range plug.Nodes { 2630 ids[info.AllocID] = struct{}{} 2631 } 2632 2633 for id := range ids { 2634 alloc, err := s.allocByIDImpl(txn, ws, id) 2635 if err != nil { 2636 return nil, err 2637 } 2638 if alloc == nil { 2639 continue 2640 } 2641 plug.Allocations = append(plug.Allocations, alloc.Stub(nil)) 2642 } 2643 2644 return plug, nil 2645} 2646 2647// UpsertCSIPlugin writes the plugin to the state store. Note: there 2648// is currently no raft message for this, as it's intended to support 2649// testing use cases. 2650func (s *StateStore) UpsertCSIPlugin(index uint64, plug *structs.CSIPlugin) error { 2651 txn := s.db.WriteTxn(index) 2652 defer txn.Abort() 2653 2654 existing, err := txn.First("csi_plugins", "id", plug.ID) 2655 if err != nil { 2656 return fmt.Errorf("csi_plugin lookup error: %s %v", plug.ID, err) 2657 } 2658 2659 plug.ModifyIndex = index 2660 if existing != nil { 2661 plug.CreateIndex = existing.(*structs.CSIPlugin).CreateIndex 2662 } 2663 2664 err = txn.Insert("csi_plugins", plug) 2665 if err != nil { 2666 return fmt.Errorf("csi_plugins insert error: %v", err) 2667 } 2668 if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { 2669 return fmt.Errorf("index update failed: %v", err) 2670 } 2671 return txn.Commit() 2672} 2673 2674// DeleteCSIPlugin deletes the plugin if it's not in use. 2675func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error { 2676 txn := s.db.WriteTxn(index) 2677 defer txn.Abort() 2678 2679 plug, err := s.CSIPluginByIDTxn(txn, nil, id) 2680 if err != nil { 2681 return err 2682 } 2683 2684 if plug == nil { 2685 return nil 2686 } 2687 2688 plug, err = s.CSIPluginDenormalizeTxn(txn, nil, plug.Copy()) 2689 if err != nil { 2690 return err 2691 } 2692 if !plug.IsEmpty() { 2693 return fmt.Errorf("plugin in use") 2694 } 2695 2696 err = txn.Delete("csi_plugins", plug) 2697 if err != nil { 2698 return fmt.Errorf("csi_plugins delete error: %v", err) 2699 } 2700 return txn.Commit() 2701} 2702 2703// UpsertPeriodicLaunch is used to register a launch or update it. 2704func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error { 2705 txn := s.db.WriteTxn(index) 2706 defer txn.Abort() 2707 2708 // Check if the job already exists 2709 existing, err := txn.First("periodic_launch", "id", launch.Namespace, launch.ID) 2710 if err != nil { 2711 return fmt.Errorf("periodic launch lookup failed: %v", err) 2712 } 2713 2714 // Setup the indexes correctly 2715 if existing != nil { 2716 launch.CreateIndex = existing.(*structs.PeriodicLaunch).CreateIndex 2717 launch.ModifyIndex = index 2718 } else { 2719 launch.CreateIndex = index 2720 launch.ModifyIndex = index 2721 } 2722 2723 // Insert the job 2724 if err := txn.Insert("periodic_launch", launch); err != nil { 2725 return fmt.Errorf("launch insert failed: %v", err) 2726 } 2727 if err := txn.Insert("index", &IndexEntry{"periodic_launch", index}); err != nil { 2728 return fmt.Errorf("index update failed: %v", err) 2729 } 2730 2731 return txn.Commit() 2732} 2733 2734// DeletePeriodicLaunch is used to delete the periodic launch 2735func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string) error { 2736 txn := s.db.WriteTxn(index) 2737 defer txn.Abort() 2738 2739 err := s.DeletePeriodicLaunchTxn(index, namespace, jobID, txn) 2740 if err == nil { 2741 return txn.Commit() 2742 } 2743 return err 2744} 2745 2746// DeletePeriodicLaunchTxn is used to delete the periodic launch, like DeletePeriodicLaunch 2747// but in a transaction. Useful for when making multiple modifications atomically 2748func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID string, txn Txn) error { 2749 // Lookup the launch 2750 existing, err := txn.First("periodic_launch", "id", namespace, jobID) 2751 if err != nil { 2752 return fmt.Errorf("launch lookup failed: %v", err) 2753 } 2754 if existing == nil { 2755 return fmt.Errorf("launch not found") 2756 } 2757 2758 // Delete the launch 2759 if err := txn.Delete("periodic_launch", existing); err != nil { 2760 return fmt.Errorf("launch delete failed: %v", err) 2761 } 2762 if err := txn.Insert("index", &IndexEntry{"periodic_launch", index}); err != nil { 2763 return fmt.Errorf("index update failed: %v", err) 2764 } 2765 2766 return nil 2767} 2768 2769// PeriodicLaunchByID is used to lookup a periodic launch by the periodic job 2770// ID. 2771func (s *StateStore) PeriodicLaunchByID(ws memdb.WatchSet, namespace, id string) (*structs.PeriodicLaunch, error) { 2772 txn := s.db.ReadTxn() 2773 2774 watchCh, existing, err := txn.FirstWatch("periodic_launch", "id", namespace, id) 2775 if err != nil { 2776 return nil, fmt.Errorf("periodic launch lookup failed: %v", err) 2777 } 2778 2779 ws.Add(watchCh) 2780 2781 if existing != nil { 2782 return existing.(*structs.PeriodicLaunch), nil 2783 } 2784 return nil, nil 2785} 2786 2787// PeriodicLaunches returns an iterator over all the periodic launches 2788func (s *StateStore) PeriodicLaunches(ws memdb.WatchSet) (memdb.ResultIterator, error) { 2789 txn := s.db.ReadTxn() 2790 2791 // Walk the entire table 2792 iter, err := txn.Get("periodic_launch", "id") 2793 if err != nil { 2794 return nil, err 2795 } 2796 2797 ws.Add(iter.WatchCh()) 2798 2799 return iter, nil 2800} 2801 2802// UpsertEvals is used to upsert a set of evaluations 2803func (s *StateStore) UpsertEvals(msgType structs.MessageType, index uint64, evals []*structs.Evaluation) error { 2804 txn := s.db.WriteTxnMsgT(msgType, index) 2805 defer txn.Abort() 2806 2807 err := s.UpsertEvalsTxn(index, evals, txn) 2808 if err == nil { 2809 return txn.Commit() 2810 } 2811 return err 2812} 2813 2814// UpsertEvals is used to upsert a set of evaluations, like UpsertEvals 2815// but in a transaction. Useful for when making multiple modifications atomically 2816func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, txn Txn) error { 2817 // Do a nested upsert 2818 jobs := make(map[structs.NamespacedID]string, len(evals)) 2819 for _, eval := range evals { 2820 if err := s.nestedUpsertEval(txn, index, eval); err != nil { 2821 return err 2822 } 2823 2824 tuple := structs.NamespacedID{ 2825 ID: eval.JobID, 2826 Namespace: eval.Namespace, 2827 } 2828 jobs[tuple] = "" 2829 } 2830 2831 // Set the job's status 2832 if err := s.setJobStatuses(index, txn, jobs, false); err != nil { 2833 return fmt.Errorf("setting job status failed: %v", err) 2834 } 2835 2836 return nil 2837} 2838 2839// nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction 2840func (s *StateStore) nestedUpsertEval(txn *txn, index uint64, eval *structs.Evaluation) error { 2841 // Lookup the evaluation 2842 existing, err := txn.First("evals", "id", eval.ID) 2843 if err != nil { 2844 return fmt.Errorf("eval lookup failed: %v", err) 2845 } 2846 2847 // Update the indexes 2848 if existing != nil { 2849 eval.CreateIndex = existing.(*structs.Evaluation).CreateIndex 2850 eval.ModifyIndex = index 2851 } else { 2852 eval.CreateIndex = index 2853 eval.ModifyIndex = index 2854 } 2855 2856 // Update the job summary 2857 summaryRaw, err := txn.First("job_summary", "id", eval.Namespace, eval.JobID) 2858 if err != nil { 2859 return fmt.Errorf("job summary lookup failed: %v", err) 2860 } 2861 if summaryRaw != nil { 2862 js := summaryRaw.(*structs.JobSummary).Copy() 2863 hasSummaryChanged := false 2864 for tg, num := range eval.QueuedAllocations { 2865 if summary, ok := js.Summary[tg]; ok { 2866 if summary.Queued != num { 2867 summary.Queued = num 2868 js.Summary[tg] = summary 2869 hasSummaryChanged = true 2870 } 2871 } else { 2872 s.logger.Error("unable to update queued for job and task group", "job_id", eval.JobID, "task_group", tg, "namespace", eval.Namespace) 2873 } 2874 } 2875 2876 // Insert the job summary 2877 if hasSummaryChanged { 2878 js.ModifyIndex = index 2879 if err := txn.Insert("job_summary", js); err != nil { 2880 return fmt.Errorf("job summary insert failed: %v", err) 2881 } 2882 if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { 2883 return fmt.Errorf("index update failed: %v", err) 2884 } 2885 } 2886 } 2887 2888 // Check if the job has any blocked evaluations and cancel them 2889 if eval.Status == structs.EvalStatusComplete && len(eval.FailedTGAllocs) == 0 { 2890 // Get the blocked evaluation for a job if it exists 2891 iter, err := txn.Get("evals", "job", eval.Namespace, eval.JobID, structs.EvalStatusBlocked) 2892 if err != nil { 2893 return fmt.Errorf("failed to get blocked evals for job %q in namespace %q: %v", eval.JobID, eval.Namespace, err) 2894 } 2895 2896 var blocked []*structs.Evaluation 2897 for { 2898 raw := iter.Next() 2899 if raw == nil { 2900 break 2901 } 2902 blocked = append(blocked, raw.(*structs.Evaluation)) 2903 } 2904 2905 // Go through and update the evals 2906 for _, eval := range blocked { 2907 newEval := eval.Copy() 2908 newEval.Status = structs.EvalStatusCancelled 2909 newEval.StatusDescription = fmt.Sprintf("evaluation %q successful", newEval.ID) 2910 newEval.ModifyIndex = index 2911 2912 if err := txn.Insert("evals", newEval); err != nil { 2913 return fmt.Errorf("eval insert failed: %v", err) 2914 } 2915 } 2916 } 2917 2918 // Insert the eval 2919 if err := txn.Insert("evals", eval); err != nil { 2920 return fmt.Errorf("eval insert failed: %v", err) 2921 } 2922 if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil { 2923 return fmt.Errorf("index update failed: %v", err) 2924 } 2925 return nil 2926} 2927 2928// updateEvalModifyIndex is used to update the modify index of an evaluation that has been 2929// through a scheduler pass. This is done as part of plan apply. It ensures that when a subsequent 2930// scheduler workers process a re-queued evaluation it sees any partial updates from the plan apply. 2931func (s *StateStore) updateEvalModifyIndex(txn *txn, index uint64, evalID string) error { 2932 // Lookup the evaluation 2933 existing, err := txn.First("evals", "id", evalID) 2934 if err != nil { 2935 return fmt.Errorf("eval lookup failed: %v", err) 2936 } 2937 if existing == nil { 2938 s.logger.Error("unable to find eval", "eval_id", evalID) 2939 return fmt.Errorf("unable to find eval id %q", evalID) 2940 } 2941 eval := existing.(*structs.Evaluation).Copy() 2942 // Update the indexes 2943 eval.ModifyIndex = index 2944 2945 // Insert the eval 2946 if err := txn.Insert("evals", eval); err != nil { 2947 return fmt.Errorf("eval insert failed: %v", err) 2948 } 2949 if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil { 2950 return fmt.Errorf("index update failed: %v", err) 2951 } 2952 return nil 2953} 2954 2955// DeleteEval is used to delete an evaluation 2956func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error { 2957 txn := s.db.WriteTxn(index) 2958 defer txn.Abort() 2959 2960 jobs := make(map[structs.NamespacedID]string, len(evals)) 2961 for _, eval := range evals { 2962 existing, err := txn.First("evals", "id", eval) 2963 if err != nil { 2964 return fmt.Errorf("eval lookup failed: %v", err) 2965 } 2966 if existing == nil { 2967 continue 2968 } 2969 if err := txn.Delete("evals", existing); err != nil { 2970 return fmt.Errorf("eval delete failed: %v", err) 2971 } 2972 eval := existing.(*structs.Evaluation) 2973 2974 tuple := structs.NamespacedID{ 2975 ID: eval.JobID, 2976 Namespace: eval.Namespace, 2977 } 2978 jobs[tuple] = "" 2979 } 2980 2981 for _, alloc := range allocs { 2982 raw, err := txn.First("allocs", "id", alloc) 2983 if err != nil { 2984 return fmt.Errorf("alloc lookup failed: %v", err) 2985 } 2986 if raw == nil { 2987 continue 2988 } 2989 if err := txn.Delete("allocs", raw); err != nil { 2990 return fmt.Errorf("alloc delete failed: %v", err) 2991 } 2992 } 2993 2994 // Update the indexes 2995 if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil { 2996 return fmt.Errorf("index update failed: %v", err) 2997 } 2998 if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil { 2999 return fmt.Errorf("index update failed: %v", err) 3000 } 3001 3002 // Set the job's status 3003 if err := s.setJobStatuses(index, txn, jobs, true); err != nil { 3004 return fmt.Errorf("setting job status failed: %v", err) 3005 } 3006 3007 return txn.Commit() 3008} 3009 3010// EvalByID is used to lookup an eval by its ID 3011func (s *StateStore) EvalByID(ws memdb.WatchSet, id string) (*structs.Evaluation, error) { 3012 txn := s.db.ReadTxn() 3013 3014 watchCh, existing, err := txn.FirstWatch("evals", "id", id) 3015 if err != nil { 3016 return nil, fmt.Errorf("eval lookup failed: %v", err) 3017 } 3018 3019 ws.Add(watchCh) 3020 3021 if existing != nil { 3022 return existing.(*structs.Evaluation), nil 3023 } 3024 return nil, nil 3025} 3026 3027// EvalsByIDPrefix is used to lookup evaluations by prefix in a particular 3028// namespace 3029func (s *StateStore) EvalsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { 3030 txn := s.db.ReadTxn() 3031 3032 // Get an iterator over all evals by the id prefix 3033 iter, err := txn.Get("evals", "id_prefix", id) 3034 if err != nil { 3035 return nil, fmt.Errorf("eval lookup failed: %v", err) 3036 } 3037 3038 ws.Add(iter.WatchCh()) 3039 3040 // Wrap the iterator in a filter 3041 wrap := memdb.NewFilterIterator(iter, evalNamespaceFilter(namespace)) 3042 return wrap, nil 3043} 3044 3045// evalNamespaceFilter returns a filter function that filters all evaluations 3046// not in the given namespace. 3047func evalNamespaceFilter(namespace string) func(interface{}) bool { 3048 return func(raw interface{}) bool { 3049 eval, ok := raw.(*structs.Evaluation) 3050 if !ok { 3051 return true 3052 } 3053 3054 return eval.Namespace != namespace 3055 } 3056} 3057 3058// EvalsByJob returns all the evaluations by job id 3059func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]*structs.Evaluation, error) { 3060 txn := s.db.ReadTxn() 3061 3062 // Get an iterator over the node allocations 3063 iter, err := txn.Get("evals", "job_prefix", namespace, jobID) 3064 if err != nil { 3065 return nil, err 3066 } 3067 3068 ws.Add(iter.WatchCh()) 3069 3070 var out []*structs.Evaluation 3071 for { 3072 raw := iter.Next() 3073 if raw == nil { 3074 break 3075 } 3076 3077 e := raw.(*structs.Evaluation) 3078 3079 // Filter non-exact matches 3080 if e.JobID != jobID { 3081 continue 3082 } 3083 3084 out = append(out, e) 3085 } 3086 return out, nil 3087} 3088 3089// Evals returns an iterator over all the evaluations 3090func (s *StateStore) Evals(ws memdb.WatchSet) (memdb.ResultIterator, error) { 3091 txn := s.db.ReadTxn() 3092 3093 // Walk the entire table 3094 iter, err := txn.Get("evals", "id") 3095 if err != nil { 3096 return nil, err 3097 } 3098 3099 ws.Add(iter.WatchCh()) 3100 3101 return iter, nil 3102} 3103 3104// EvalsByNamespace returns an iterator over all the evaluations in the given 3105// namespace 3106func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { 3107 txn := s.db.ReadTxn() 3108 3109 // Walk the entire table 3110 iter, err := txn.Get("evals", "namespace", namespace) 3111 if err != nil { 3112 return nil, err 3113 } 3114 3115 ws.Add(iter.WatchCh()) 3116 3117 return iter, nil 3118} 3119 3120// UpdateAllocsFromClient is used to update an allocation based on input 3121// from a client. While the schedulers are the authority on the allocation for 3122// most things, some updates are authoritative from the client. Specifically, 3123// the desired state comes from the schedulers, while the actual state comes 3124// from clients. 3125func (s *StateStore) UpdateAllocsFromClient(msgType structs.MessageType, index uint64, allocs []*structs.Allocation) error { 3126 txn := s.db.WriteTxnMsgT(msgType, index) 3127 defer txn.Abort() 3128 3129 // Handle each of the updated allocations 3130 for _, alloc := range allocs { 3131 if err := s.nestedUpdateAllocFromClient(txn, index, alloc); err != nil { 3132 return err 3133 } 3134 } 3135 3136 // Update the indexes 3137 if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil { 3138 return fmt.Errorf("index update failed: %v", err) 3139 } 3140 3141 return txn.Commit() 3142} 3143 3144// nestedUpdateAllocFromClient is used to nest an update of an allocation with client status 3145func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc *structs.Allocation) error { 3146 // Look for existing alloc 3147 existing, err := txn.First("allocs", "id", alloc.ID) 3148 if err != nil { 3149 return fmt.Errorf("alloc lookup failed: %v", err) 3150 } 3151 3152 // Nothing to do if this does not exist 3153 if existing == nil { 3154 return nil 3155 } 3156 exist := existing.(*structs.Allocation) 3157 3158 // Copy everything from the existing allocation 3159 copyAlloc := exist.Copy() 3160 3161 // Pull in anything the client is the authority on 3162 copyAlloc.ClientStatus = alloc.ClientStatus 3163 copyAlloc.ClientDescription = alloc.ClientDescription 3164 copyAlloc.TaskStates = alloc.TaskStates 3165 copyAlloc.NetworkStatus = alloc.NetworkStatus 3166 3167 // The client can only set its deployment health and timestamp, so just take 3168 // those 3169 if copyAlloc.DeploymentStatus != nil && alloc.DeploymentStatus != nil { 3170 oldHasHealthy := copyAlloc.DeploymentStatus.HasHealth() 3171 newHasHealthy := alloc.DeploymentStatus.HasHealth() 3172 3173 // We got new health information from the client 3174 if newHasHealthy && (!oldHasHealthy || *copyAlloc.DeploymentStatus.Healthy != *alloc.DeploymentStatus.Healthy) { 3175 // Updated deployment health and timestamp 3176 copyAlloc.DeploymentStatus.Healthy = helper.BoolToPtr(*alloc.DeploymentStatus.Healthy) 3177 copyAlloc.DeploymentStatus.Timestamp = alloc.DeploymentStatus.Timestamp 3178 copyAlloc.DeploymentStatus.ModifyIndex = index 3179 } 3180 } else if alloc.DeploymentStatus != nil { 3181 // First time getting a deployment status so copy everything and just 3182 // set the index 3183 copyAlloc.DeploymentStatus = alloc.DeploymentStatus.Copy() 3184 copyAlloc.DeploymentStatus.ModifyIndex = index 3185 } 3186 3187 // Update the modify index 3188 copyAlloc.ModifyIndex = index 3189 3190 // Update the modify time 3191 copyAlloc.ModifyTime = alloc.ModifyTime 3192 3193 if err := s.updateDeploymentWithAlloc(index, copyAlloc, exist, txn); err != nil { 3194 return fmt.Errorf("error updating deployment: %v", err) 3195 } 3196 3197 if err := s.updateSummaryWithAlloc(index, copyAlloc, exist, txn); err != nil { 3198 return fmt.Errorf("error updating job summary: %v", err) 3199 } 3200 3201 if err := s.updateEntWithAlloc(index, copyAlloc, exist, txn); err != nil { 3202 return err 3203 } 3204 3205 if err := s.updatePluginWithAlloc(index, copyAlloc, txn); err != nil { 3206 return err 3207 } 3208 3209 // Update the allocation 3210 if err := txn.Insert("allocs", copyAlloc); err != nil { 3211 return fmt.Errorf("alloc insert failed: %v", err) 3212 } 3213 3214 // Set the job's status 3215 forceStatus := "" 3216 if !copyAlloc.TerminalStatus() { 3217 forceStatus = structs.JobStatusRunning 3218 } 3219 3220 tuple := structs.NamespacedID{ 3221 ID: exist.JobID, 3222 Namespace: exist.Namespace, 3223 } 3224 jobs := map[structs.NamespacedID]string{tuple: forceStatus} 3225 3226 if err := s.setJobStatuses(index, txn, jobs, false); err != nil { 3227 return fmt.Errorf("setting job status failed: %v", err) 3228 } 3229 return nil 3230} 3231 3232// UpsertAllocs is used to evict a set of allocations and allocate new ones at 3233// the same time. 3234func (s *StateStore) UpsertAllocs(msgType structs.MessageType, index uint64, allocs []*structs.Allocation) error { 3235 txn := s.db.WriteTxn(index) 3236 defer txn.Abort() 3237 if err := s.upsertAllocsImpl(index, allocs, txn); err != nil { 3238 return err 3239 } 3240 return txn.Commit() 3241} 3242 3243// upsertAllocs is the actual implementation of UpsertAllocs so that it may be 3244// used with an existing transaction. 3245func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation, txn *txn) error { 3246 // Handle the allocations 3247 jobs := make(map[structs.NamespacedID]string, 1) 3248 for _, alloc := range allocs { 3249 existing, err := txn.First("allocs", "id", alloc.ID) 3250 if err != nil { 3251 return fmt.Errorf("alloc lookup failed: %v", err) 3252 } 3253 exist, _ := existing.(*structs.Allocation) 3254 3255 if exist == nil { 3256 alloc.CreateIndex = index 3257 alloc.ModifyIndex = index 3258 alloc.AllocModifyIndex = index 3259 if alloc.DeploymentStatus != nil { 3260 alloc.DeploymentStatus.ModifyIndex = index 3261 } 3262 3263 // Issue https://github.com/hashicorp/nomad/issues/2583 uncovered 3264 // the a race between a forced garbage collection and the scheduler 3265 // marking an allocation as terminal. The issue is that the 3266 // allocation from the scheduler has its job normalized and the FSM 3267 // will only denormalize if the allocation is not terminal. However 3268 // if the allocation is garbage collected, that will result in a 3269 // allocation being upserted for the first time without a job 3270 // attached. By returning an error here, it will cause the FSM to 3271 // error, causing the plan_apply to error and thus causing the 3272 // evaluation to be failed. This will force an index refresh that 3273 // should solve this issue. 3274 if alloc.Job == nil { 3275 return fmt.Errorf("attempting to upsert allocation %q without a job", alloc.ID) 3276 } 3277 } else { 3278 alloc.CreateIndex = exist.CreateIndex 3279 alloc.ModifyIndex = index 3280 alloc.AllocModifyIndex = index 3281 3282 // Keep the clients task states 3283 alloc.TaskStates = exist.TaskStates 3284 3285 // If the scheduler is marking this allocation as lost we do not 3286 // want to reuse the status of the existing allocation. 3287 if alloc.ClientStatus != structs.AllocClientStatusLost { 3288 alloc.ClientStatus = exist.ClientStatus 3289 alloc.ClientDescription = exist.ClientDescription 3290 } 3291 3292 // The job has been denormalized so re-attach the original job 3293 if alloc.Job == nil { 3294 alloc.Job = exist.Job 3295 } 3296 } 3297 3298 // OPTIMIZATION: 3299 // These should be given a map of new to old allocation and the updates 3300 // should be one on all changes. The current implementation causes O(n) 3301 // lookups/copies/insertions rather than O(1) 3302 if err := s.updateDeploymentWithAlloc(index, alloc, exist, txn); err != nil { 3303 return fmt.Errorf("error updating deployment: %v", err) 3304 } 3305 3306 if err := s.updateSummaryWithAlloc(index, alloc, exist, txn); err != nil { 3307 return fmt.Errorf("error updating job summary: %v", err) 3308 } 3309 3310 if err := s.updateEntWithAlloc(index, alloc, exist, txn); err != nil { 3311 return err 3312 } 3313 3314 if err := s.updatePluginWithAlloc(index, alloc, txn); err != nil { 3315 return err 3316 } 3317 3318 if err := txn.Insert("allocs", alloc); err != nil { 3319 return fmt.Errorf("alloc insert failed: %v", err) 3320 } 3321 3322 if alloc.PreviousAllocation != "" { 3323 prevAlloc, err := txn.First("allocs", "id", alloc.PreviousAllocation) 3324 if err != nil { 3325 return fmt.Errorf("alloc lookup failed: %v", err) 3326 } 3327 existingPrevAlloc, _ := prevAlloc.(*structs.Allocation) 3328 if existingPrevAlloc != nil { 3329 prevAllocCopy := existingPrevAlloc.Copy() 3330 prevAllocCopy.NextAllocation = alloc.ID 3331 prevAllocCopy.ModifyIndex = index 3332 if err := txn.Insert("allocs", prevAllocCopy); err != nil { 3333 return fmt.Errorf("alloc insert failed: %v", err) 3334 } 3335 } 3336 } 3337 3338 // If the allocation is running, force the job to running status. 3339 forceStatus := "" 3340 if !alloc.TerminalStatus() { 3341 forceStatus = structs.JobStatusRunning 3342 } 3343 3344 tuple := structs.NamespacedID{ 3345 ID: alloc.JobID, 3346 Namespace: alloc.Namespace, 3347 } 3348 jobs[tuple] = forceStatus 3349 } 3350 3351 // Update the indexes 3352 if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil { 3353 return fmt.Errorf("index update failed: %v", err) 3354 } 3355 3356 // Set the job's status 3357 if err := s.setJobStatuses(index, txn, jobs, false); err != nil { 3358 return fmt.Errorf("setting job status failed: %v", err) 3359 } 3360 3361 return nil 3362} 3363 3364// UpdateAllocsDesiredTransitions is used to update a set of allocations 3365// desired transitions. 3366func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, index uint64, allocs map[string]*structs.DesiredTransition, 3367 evals []*structs.Evaluation) error { 3368 3369 txn := s.db.WriteTxnMsgT(msgType, index) 3370 defer txn.Abort() 3371 3372 // Handle each of the updated allocations 3373 for id, transition := range allocs { 3374 if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transition); err != nil { 3375 return err 3376 } 3377 } 3378 3379 for _, eval := range evals { 3380 if err := s.nestedUpsertEval(txn, index, eval); err != nil { 3381 return err 3382 } 3383 } 3384 3385 // Update the indexes 3386 if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil { 3387 return fmt.Errorf("index update failed: %v", err) 3388 } 3389 3390 return txn.Commit() 3391} 3392 3393// nestedUpdateAllocDesiredTransition is used to nest an update of an 3394// allocations desired transition 3395func (s *StateStore) nestedUpdateAllocDesiredTransition( 3396 txn *txn, index uint64, allocID string, 3397 transition *structs.DesiredTransition) error { 3398 3399 // Look for existing alloc 3400 existing, err := txn.First("allocs", "id", allocID) 3401 if err != nil { 3402 return fmt.Errorf("alloc lookup failed: %v", err) 3403 } 3404 3405 // Nothing to do if this does not exist 3406 if existing == nil { 3407 return nil 3408 } 3409 exist := existing.(*structs.Allocation) 3410 3411 // Copy everything from the existing allocation 3412 copyAlloc := exist.Copy() 3413 3414 // Merge the desired transitions 3415 copyAlloc.DesiredTransition.Merge(transition) 3416 3417 // Update the modify index 3418 copyAlloc.ModifyIndex = index 3419 3420 // Update the allocation 3421 if err := txn.Insert("allocs", copyAlloc); err != nil { 3422 return fmt.Errorf("alloc insert failed: %v", err) 3423 } 3424 3425 return nil 3426} 3427 3428// AllocByID is used to lookup an allocation by its ID 3429func (s *StateStore) AllocByID(ws memdb.WatchSet, id string) (*structs.Allocation, error) { 3430 txn := s.db.ReadTxn() 3431 return s.allocByIDImpl(txn, ws, id) 3432} 3433 3434// allocByIDImpl retrives an allocation and is called under and existing 3435// transaction. An optional watch set can be passed to add allocations to the 3436// watch set 3437func (s *StateStore) allocByIDImpl(txn Txn, ws memdb.WatchSet, id string) (*structs.Allocation, error) { 3438 watchCh, raw, err := txn.FirstWatch("allocs", "id", id) 3439 if err != nil { 3440 return nil, fmt.Errorf("alloc lookup failed: %v", err) 3441 } 3442 3443 ws.Add(watchCh) 3444 3445 if raw == nil { 3446 return nil, nil 3447 } 3448 alloc := raw.(*structs.Allocation) 3449 return alloc, nil 3450} 3451 3452// AllocsByIDPrefix is used to lookup allocs by prefix 3453func (s *StateStore) AllocsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { 3454 txn := s.db.ReadTxn() 3455 3456 iter, err := txn.Get("allocs", "id_prefix", id) 3457 if err != nil { 3458 return nil, fmt.Errorf("alloc lookup failed: %v", err) 3459 } 3460 3461 ws.Add(iter.WatchCh()) 3462 3463 // Wrap the iterator in a filter 3464 wrap := memdb.NewFilterIterator(iter, allocNamespaceFilter(namespace)) 3465 return wrap, nil 3466} 3467 3468// allocNamespaceFilter returns a filter function that filters all allocations 3469// not in the given namespace. 3470func allocNamespaceFilter(namespace string) func(interface{}) bool { 3471 return func(raw interface{}) bool { 3472 alloc, ok := raw.(*structs.Allocation) 3473 if !ok { 3474 return true 3475 } 3476 3477 return alloc.Namespace != namespace 3478 } 3479} 3480 3481// AllocsByIDPrefix is used to lookup allocs by prefix 3482func (s *StateStore) AllocsByIDPrefixAllNSs(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) { 3483 txn := s.db.ReadTxn() 3484 3485 iter, err := txn.Get("allocs", "id_prefix", prefix) 3486 if err != nil { 3487 return nil, fmt.Errorf("alloc lookup failed: %v", err) 3488 } 3489 3490 ws.Add(iter.WatchCh()) 3491 3492 return iter, nil 3493} 3494 3495// AllocsByNode returns all the allocations by node 3496func (s *StateStore) AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error) { 3497 txn := s.db.ReadTxn() 3498 3499 return allocsByNodeTxn(txn, ws, node) 3500} 3501 3502func allocsByNodeTxn(txn ReadTxn, ws memdb.WatchSet, node string) ([]*structs.Allocation, error) { 3503 // Get an iterator over the node allocations, using only the 3504 // node prefix which ignores the terminal status 3505 iter, err := txn.Get("allocs", "node_prefix", node) 3506 if err != nil { 3507 return nil, err 3508 } 3509 3510 ws.Add(iter.WatchCh()) 3511 3512 var out []*structs.Allocation 3513 for { 3514 raw := iter.Next() 3515 if raw == nil { 3516 break 3517 } 3518 out = append(out, raw.(*structs.Allocation)) 3519 } 3520 return out, nil 3521} 3522 3523// AllocsByNode returns all the allocations by node and terminal status 3524func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error) { 3525 txn := s.db.ReadTxn() 3526 3527 // Get an iterator over the node allocations 3528 iter, err := txn.Get("allocs", "node", node, terminal) 3529 if err != nil { 3530 return nil, err 3531 } 3532 3533 ws.Add(iter.WatchCh()) 3534 3535 var out []*structs.Allocation 3536 for { 3537 raw := iter.Next() 3538 if raw == nil { 3539 break 3540 } 3541 out = append(out, raw.(*structs.Allocation)) 3542 } 3543 return out, nil 3544} 3545 3546// AllocsByJob returns allocations by job id 3547func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, anyCreateIndex bool) ([]*structs.Allocation, error) { 3548 txn := s.db.ReadTxn() 3549 3550 // Get the job 3551 var job *structs.Job 3552 rawJob, err := txn.First("jobs", "id", namespace, jobID) 3553 if err != nil { 3554 return nil, err 3555 } 3556 if rawJob != nil { 3557 job = rawJob.(*structs.Job) 3558 } 3559 3560 // Get an iterator over the node allocations 3561 iter, err := txn.Get("allocs", "job", namespace, jobID) 3562 if err != nil { 3563 return nil, err 3564 } 3565 3566 ws.Add(iter.WatchCh()) 3567 3568 var out []*structs.Allocation 3569 for { 3570 raw := iter.Next() 3571 if raw == nil { 3572 break 3573 } 3574 3575 alloc := raw.(*structs.Allocation) 3576 // If the allocation belongs to a job with the same ID but a different 3577 // create index and we are not getting all the allocations whose Jobs 3578 // matches the same Job ID then we skip it 3579 if !anyCreateIndex && job != nil && alloc.Job.CreateIndex != job.CreateIndex { 3580 continue 3581 } 3582 out = append(out, raw.(*structs.Allocation)) 3583 } 3584 return out, nil 3585} 3586 3587// AllocsByEval returns all the allocations by eval id 3588func (s *StateStore) AllocsByEval(ws memdb.WatchSet, evalID string) ([]*structs.Allocation, error) { 3589 txn := s.db.ReadTxn() 3590 3591 // Get an iterator over the eval allocations 3592 iter, err := txn.Get("allocs", "eval", evalID) 3593 if err != nil { 3594 return nil, err 3595 } 3596 3597 ws.Add(iter.WatchCh()) 3598 3599 var out []*structs.Allocation 3600 for { 3601 raw := iter.Next() 3602 if raw == nil { 3603 break 3604 } 3605 out = append(out, raw.(*structs.Allocation)) 3606 } 3607 return out, nil 3608} 3609 3610// AllocsByDeployment returns all the allocations by deployment id 3611func (s *StateStore) AllocsByDeployment(ws memdb.WatchSet, deploymentID string) ([]*structs.Allocation, error) { 3612 txn := s.db.ReadTxn() 3613 3614 // Get an iterator over the deployments allocations 3615 iter, err := txn.Get("allocs", "deployment", deploymentID) 3616 if err != nil { 3617 return nil, err 3618 } 3619 3620 ws.Add(iter.WatchCh()) 3621 3622 var out []*structs.Allocation 3623 for { 3624 raw := iter.Next() 3625 if raw == nil { 3626 break 3627 } 3628 out = append(out, raw.(*structs.Allocation)) 3629 } 3630 return out, nil 3631} 3632 3633// Allocs returns an iterator over all the evaluations 3634func (s *StateStore) Allocs(ws memdb.WatchSet) (memdb.ResultIterator, error) { 3635 txn := s.db.ReadTxn() 3636 3637 // Walk the entire table 3638 iter, err := txn.Get("allocs", "id") 3639 if err != nil { 3640 return nil, err 3641 } 3642 3643 ws.Add(iter.WatchCh()) 3644 3645 return iter, nil 3646} 3647 3648// AllocsByNamespace returns an iterator over all the allocations in the 3649// namespace 3650func (s *StateStore) AllocsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) { 3651 txn := s.db.ReadTxn() 3652 return s.allocsByNamespaceImpl(ws, txn, namespace) 3653} 3654 3655// allocsByNamespaceImpl returns an iterator over all the allocations in the 3656// namespace 3657func (s *StateStore) allocsByNamespaceImpl(ws memdb.WatchSet, txn *txn, namespace string) (memdb.ResultIterator, error) { 3658 // Walk the entire table 3659 iter, err := txn.Get("allocs", "namespace", namespace) 3660 if err != nil { 3661 return nil, err 3662 } 3663 3664 ws.Add(iter.WatchCh()) 3665 3666 return iter, nil 3667} 3668 3669// UpsertVaultAccessors is used to register a set of Vault Accessors 3670func (s *StateStore) UpsertVaultAccessor(index uint64, accessors []*structs.VaultAccessor) error { 3671 txn := s.db.WriteTxn(index) 3672 defer txn.Abort() 3673 3674 for _, accessor := range accessors { 3675 // Set the create index 3676 accessor.CreateIndex = index 3677 3678 // Insert the accessor 3679 if err := txn.Insert("vault_accessors", accessor); err != nil { 3680 return fmt.Errorf("accessor insert failed: %v", err) 3681 } 3682 } 3683 3684 if err := txn.Insert("index", &IndexEntry{"vault_accessors", index}); err != nil { 3685 return fmt.Errorf("index update failed: %v", err) 3686 } 3687 3688 return txn.Commit() 3689} 3690 3691// DeleteVaultAccessors is used to delete a set of Vault Accessors 3692func (s *StateStore) DeleteVaultAccessors(index uint64, accessors []*structs.VaultAccessor) error { 3693 txn := s.db.WriteTxn(index) 3694 defer txn.Abort() 3695 3696 // Lookup the accessor 3697 for _, accessor := range accessors { 3698 // Delete the accessor 3699 if err := txn.Delete("vault_accessors", accessor); err != nil { 3700 return fmt.Errorf("accessor delete failed: %v", err) 3701 } 3702 } 3703 3704 if err := txn.Insert("index", &IndexEntry{"vault_accessors", index}); err != nil { 3705 return fmt.Errorf("index update failed: %v", err) 3706 } 3707 3708 return txn.Commit() 3709} 3710 3711// VaultAccessor returns the given Vault accessor 3712func (s *StateStore) VaultAccessor(ws memdb.WatchSet, accessor string) (*structs.VaultAccessor, error) { 3713 txn := s.db.ReadTxn() 3714 3715 watchCh, existing, err := txn.FirstWatch("vault_accessors", "id", accessor) 3716 if err != nil { 3717 return nil, fmt.Errorf("accessor lookup failed: %v", err) 3718 } 3719 3720 ws.Add(watchCh) 3721 3722 if existing != nil { 3723 return existing.(*structs.VaultAccessor), nil 3724 } 3725 3726 return nil, nil 3727} 3728 3729// VaultAccessors returns an iterator of Vault accessors. 3730func (s *StateStore) VaultAccessors(ws memdb.WatchSet) (memdb.ResultIterator, error) { 3731 txn := s.db.ReadTxn() 3732 3733 iter, err := txn.Get("vault_accessors", "id") 3734 if err != nil { 3735 return nil, err 3736 } 3737 3738 ws.Add(iter.WatchCh()) 3739 3740 return iter, nil 3741} 3742 3743// VaultAccessorsByAlloc returns all the Vault accessors by alloc id 3744func (s *StateStore) VaultAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([]*structs.VaultAccessor, error) { 3745 txn := s.db.ReadTxn() 3746 3747 // Get an iterator over the accessors 3748 iter, err := txn.Get("vault_accessors", "alloc_id", allocID) 3749 if err != nil { 3750 return nil, err 3751 } 3752 3753 ws.Add(iter.WatchCh()) 3754 3755 var out []*structs.VaultAccessor 3756 for { 3757 raw := iter.Next() 3758 if raw == nil { 3759 break 3760 } 3761 out = append(out, raw.(*structs.VaultAccessor)) 3762 } 3763 return out, nil 3764} 3765 3766// VaultAccessorsByNode returns all the Vault accessors by node id 3767func (s *StateStore) VaultAccessorsByNode(ws memdb.WatchSet, nodeID string) ([]*structs.VaultAccessor, error) { 3768 txn := s.db.ReadTxn() 3769 3770 // Get an iterator over the accessors 3771 iter, err := txn.Get("vault_accessors", "node_id", nodeID) 3772 if err != nil { 3773 return nil, err 3774 } 3775 3776 ws.Add(iter.WatchCh()) 3777 3778 var out []*structs.VaultAccessor 3779 for { 3780 raw := iter.Next() 3781 if raw == nil { 3782 break 3783 } 3784 out = append(out, raw.(*structs.VaultAccessor)) 3785 } 3786 return out, nil 3787} 3788 3789func indexEntry(table string, index uint64) *IndexEntry { 3790 return &IndexEntry{ 3791 Key: table, 3792 Value: index, 3793 } 3794} 3795 3796const siTokenAccessorTable = "si_token_accessors" 3797 3798// UpsertSITokenAccessors is used to register a set of Service Identity token accessors. 3799func (s *StateStore) UpsertSITokenAccessors(index uint64, accessors []*structs.SITokenAccessor) error { 3800 txn := s.db.WriteTxn(index) 3801 defer txn.Abort() 3802 3803 for _, accessor := range accessors { 3804 // set the create index 3805 accessor.CreateIndex = index 3806 3807 // insert the accessor 3808 if err := txn.Insert(siTokenAccessorTable, accessor); err != nil { 3809 return errors.Wrap(err, "accessor insert failed") 3810 } 3811 } 3812 3813 // update the index for this table 3814 if err := txn.Insert("index", indexEntry(siTokenAccessorTable, index)); err != nil { 3815 return errors.Wrap(err, "index update failed") 3816 } 3817 3818 return txn.Commit() 3819} 3820 3821// DeleteSITokenAccessors is used to delete a set of Service Identity token accessors. 3822func (s *StateStore) DeleteSITokenAccessors(index uint64, accessors []*structs.SITokenAccessor) error { 3823 txn := s.db.WriteTxn(index) 3824 defer txn.Abort() 3825 3826 // Lookup each accessor 3827 for _, accessor := range accessors { 3828 // Delete the accessor 3829 if err := txn.Delete(siTokenAccessorTable, accessor); err != nil { 3830 return errors.Wrap(err, "accessor delete failed") 3831 } 3832 } 3833 3834 // update the index for this table 3835 if err := txn.Insert("index", indexEntry(siTokenAccessorTable, index)); err != nil { 3836 return errors.Wrap(err, "index update failed") 3837 } 3838 3839 return txn.Commit() 3840} 3841 3842// SITokenAccessor returns the given Service Identity token accessor. 3843func (s *StateStore) SITokenAccessor(ws memdb.WatchSet, accessorID string) (*structs.SITokenAccessor, error) { 3844 txn := s.db.ReadTxn() 3845 defer txn.Abort() 3846 3847 watchCh, existing, err := txn.FirstWatch(siTokenAccessorTable, "id", accessorID) 3848 if err != nil { 3849 return nil, errors.Wrap(err, "accessor lookup failed") 3850 } 3851 3852 ws.Add(watchCh) 3853 3854 if existing != nil { 3855 return existing.(*structs.SITokenAccessor), nil 3856 } 3857 3858 return nil, nil 3859} 3860 3861// SITokenAccessors returns an iterator of Service Identity token accessors. 3862func (s *StateStore) SITokenAccessors(ws memdb.WatchSet) (memdb.ResultIterator, error) { 3863 txn := s.db.ReadTxn() 3864 defer txn.Abort() 3865 3866 iter, err := txn.Get(siTokenAccessorTable, "id") 3867 if err != nil { 3868 return nil, err 3869 } 3870 3871 ws.Add(iter.WatchCh()) 3872 3873 return iter, nil 3874} 3875 3876// SITokenAccessorsByAlloc returns all the Service Identity token accessors by alloc ID. 3877func (s *StateStore) SITokenAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([]*structs.SITokenAccessor, error) { 3878 txn := s.db.ReadTxn() 3879 defer txn.Abort() 3880 3881 // Get an iterator over the accessors 3882 iter, err := txn.Get(siTokenAccessorTable, "alloc_id", allocID) 3883 if err != nil { 3884 return nil, err 3885 } 3886 3887 ws.Add(iter.WatchCh()) 3888 3889 var result []*structs.SITokenAccessor 3890 for raw := iter.Next(); raw != nil; raw = iter.Next() { 3891 result = append(result, raw.(*structs.SITokenAccessor)) 3892 } 3893 3894 return result, nil 3895} 3896 3897// SITokenAccessorsByNode returns all the Service Identity token accessors by node ID. 3898func (s *StateStore) SITokenAccessorsByNode(ws memdb.WatchSet, nodeID string) ([]*structs.SITokenAccessor, error) { 3899 txn := s.db.ReadTxn() 3900 defer txn.Abort() 3901 3902 // Get an iterator over the accessors 3903 iter, err := txn.Get(siTokenAccessorTable, "node_id", nodeID) 3904 if err != nil { 3905 return nil, err 3906 } 3907 3908 ws.Add(iter.WatchCh()) 3909 3910 var result []*structs.SITokenAccessor 3911 for raw := iter.Next(); raw != nil; raw = iter.Next() { 3912 result = append(result, raw.(*structs.SITokenAccessor)) 3913 } 3914 3915 return result, nil 3916} 3917 3918// UpdateDeploymentStatus is used to make deployment status updates and 3919// potentially make a evaluation 3920func (s *StateStore) UpdateDeploymentStatus(msgType structs.MessageType, index uint64, req *structs.DeploymentStatusUpdateRequest) error { 3921 txn := s.db.WriteTxnMsgT(msgType, index) 3922 defer txn.Abort() 3923 3924 if err := s.updateDeploymentStatusImpl(index, req.DeploymentUpdate, txn); err != nil { 3925 return err 3926 } 3927 3928 // Upsert the job if necessary 3929 if req.Job != nil { 3930 if err := s.upsertJobImpl(index, req.Job, false, txn); err != nil { 3931 return err 3932 } 3933 } 3934 3935 // Upsert the optional eval 3936 if req.Eval != nil { 3937 if err := s.nestedUpsertEval(txn, index, req.Eval); err != nil { 3938 return err 3939 } 3940 } 3941 3942 return txn.Commit() 3943} 3944 3945// updateDeploymentStatusImpl is used to make deployment status updates 3946func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.DeploymentStatusUpdate, txn *txn) error { 3947 // Retrieve deployment 3948 ws := memdb.NewWatchSet() 3949 deployment, err := s.deploymentByIDImpl(ws, u.DeploymentID, txn) 3950 if err != nil { 3951 return err 3952 } else if deployment == nil { 3953 return fmt.Errorf("Deployment ID %q couldn't be updated as it does not exist", u.DeploymentID) 3954 } else if !deployment.Active() { 3955 return fmt.Errorf("Deployment %q has terminal status %q:", deployment.ID, deployment.Status) 3956 } 3957 3958 // Apply the new status 3959 copy := deployment.Copy() 3960 copy.Status = u.Status 3961 copy.StatusDescription = u.StatusDescription 3962 copy.ModifyIndex = index 3963 3964 // Insert the deployment 3965 if err := txn.Insert("deployment", copy); err != nil { 3966 return err 3967 } 3968 3969 // Update the index 3970 if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil { 3971 return fmt.Errorf("index update failed: %v", err) 3972 } 3973 3974 // If the deployment is being marked as complete, set the job to stable. 3975 if copy.Status == structs.DeploymentStatusSuccessful { 3976 if err := s.updateJobStabilityImpl(index, copy.Namespace, copy.JobID, copy.JobVersion, true, txn); err != nil { 3977 return fmt.Errorf("failed to update job stability: %v", err) 3978 } 3979 } 3980 3981 return nil 3982} 3983 3984// UpdateJobStability updates the stability of the given job and version to the 3985// desired status. 3986func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, jobVersion uint64, stable bool) error { 3987 txn := s.db.WriteTxn(index) 3988 defer txn.Abort() 3989 3990 if err := s.updateJobStabilityImpl(index, namespace, jobID, jobVersion, stable, txn); err != nil { 3991 return err 3992 } 3993 3994 return txn.Commit() 3995} 3996 3997// updateJobStabilityImpl updates the stability of the given job and version 3998func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID string, jobVersion uint64, stable bool, txn *txn) error { 3999 // Get the job that is referenced 4000 job, err := s.jobByIDAndVersionImpl(nil, namespace, jobID, jobVersion, txn) 4001 if err != nil { 4002 return err 4003 } 4004 4005 // Has already been cleared, nothing to do 4006 if job == nil { 4007 return nil 4008 } 4009 4010 // If the job already has the desired stability, nothing to do 4011 if job.Stable == stable { 4012 return nil 4013 } 4014 4015 copy := job.Copy() 4016 copy.Stable = stable 4017 return s.upsertJobImpl(index, copy, true, txn) 4018} 4019 4020// UpdateDeploymentPromotion is used to promote canaries in a deployment and 4021// potentially make a evaluation 4022func (s *StateStore) UpdateDeploymentPromotion(msgType structs.MessageType, index uint64, req *structs.ApplyDeploymentPromoteRequest) error { 4023 txn := s.db.WriteTxnMsgT(msgType, index) 4024 defer txn.Abort() 4025 4026 // Retrieve deployment and ensure it is not terminal and is active 4027 ws := memdb.NewWatchSet() 4028 deployment, err := s.deploymentByIDImpl(ws, req.DeploymentID, txn) 4029 if err != nil { 4030 return err 4031 } else if deployment == nil { 4032 return fmt.Errorf("Deployment ID %q couldn't be updated as it does not exist", req.DeploymentID) 4033 } else if !deployment.Active() { 4034 return fmt.Errorf("Deployment %q has terminal status %q:", deployment.ID, deployment.Status) 4035 } 4036 4037 // Retrieve effected allocations 4038 iter, err := txn.Get("allocs", "deployment", req.DeploymentID) 4039 if err != nil { 4040 return err 4041 } 4042 4043 // groupIndex is a map of groups being promoted 4044 groupIndex := make(map[string]struct{}, len(req.Groups)) 4045 for _, g := range req.Groups { 4046 groupIndex[g] = struct{}{} 4047 } 4048 4049 // canaryIndex is the set of placed canaries in the deployment 4050 canaryIndex := make(map[string]struct{}, len(deployment.TaskGroups)) 4051 for _, dstate := range deployment.TaskGroups { 4052 for _, c := range dstate.PlacedCanaries { 4053 canaryIndex[c] = struct{}{} 4054 } 4055 } 4056 4057 // healthyCounts is a mapping of group to the number of healthy canaries 4058 healthyCounts := make(map[string]int, len(deployment.TaskGroups)) 4059 4060 // promotable is the set of allocations that we can move from canary to 4061 // non-canary 4062 var promotable []*structs.Allocation 4063 4064 for { 4065 raw := iter.Next() 4066 if raw == nil { 4067 break 4068 } 4069 4070 alloc := raw.(*structs.Allocation) 4071 4072 // Check that the alloc is a canary 4073 if _, ok := canaryIndex[alloc.ID]; !ok { 4074 continue 4075 } 4076 4077 // Check that the canary is part of a group being promoted 4078 if _, ok := groupIndex[alloc.TaskGroup]; !req.All && !ok { 4079 continue 4080 } 4081 4082 // Ensure the canaries are healthy 4083 if alloc.TerminalStatus() || !alloc.DeploymentStatus.IsHealthy() { 4084 continue 4085 } 4086 4087 healthyCounts[alloc.TaskGroup]++ 4088 promotable = append(promotable, alloc) 4089 } 4090 4091 // Determine if we have enough healthy allocations 4092 var unhealthyErr multierror.Error 4093 for tg, dstate := range deployment.TaskGroups { 4094 if _, ok := groupIndex[tg]; !req.All && !ok { 4095 continue 4096 } 4097 4098 need := dstate.DesiredCanaries 4099 if need == 0 { 4100 continue 4101 } 4102 4103 if have := healthyCounts[tg]; have < need { 4104 multierror.Append(&unhealthyErr, fmt.Errorf("Task group %q has %d/%d healthy allocations", tg, have, need)) 4105 } 4106 } 4107 4108 if err := unhealthyErr.ErrorOrNil(); err != nil { 4109 return err 4110 } 4111 4112 // Update deployment 4113 copy := deployment.Copy() 4114 copy.ModifyIndex = index 4115 for tg, status := range copy.TaskGroups { 4116 _, ok := groupIndex[tg] 4117 if !req.All && !ok { 4118 continue 4119 } 4120 4121 // reset the progress deadline 4122 if status.ProgressDeadline > 0 && !status.RequireProgressBy.IsZero() { 4123 status.RequireProgressBy = time.Now().Add(status.ProgressDeadline) 4124 } 4125 status.Promoted = true 4126 } 4127 4128 // If the deployment no longer needs promotion, update its status 4129 if !copy.RequiresPromotion() && copy.Status == structs.DeploymentStatusRunning { 4130 copy.StatusDescription = structs.DeploymentStatusDescriptionRunning 4131 } 4132 4133 // Insert the deployment 4134 if err := s.upsertDeploymentImpl(index, copy, txn); err != nil { 4135 return err 4136 } 4137 4138 // Upsert the optional eval 4139 if req.Eval != nil { 4140 if err := s.nestedUpsertEval(txn, index, req.Eval); err != nil { 4141 return err 4142 } 4143 } 4144 4145 // For each promotable allocation remove the canary field 4146 for _, alloc := range promotable { 4147 promoted := alloc.Copy() 4148 promoted.DeploymentStatus.Canary = false 4149 promoted.DeploymentStatus.ModifyIndex = index 4150 promoted.ModifyIndex = index 4151 promoted.AllocModifyIndex = index 4152 4153 if err := txn.Insert("allocs", promoted); err != nil { 4154 return fmt.Errorf("alloc insert failed: %v", err) 4155 } 4156 } 4157 4158 // Update the alloc index 4159 if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil { 4160 return fmt.Errorf("index update failed: %v", err) 4161 } 4162 4163 return txn.Commit() 4164} 4165 4166// UpdateDeploymentAllocHealth is used to update the health of allocations as 4167// part of the deployment and potentially make a evaluation 4168func (s *StateStore) UpdateDeploymentAllocHealth(msgType structs.MessageType, index uint64, req *structs.ApplyDeploymentAllocHealthRequest) error { 4169 txn := s.db.WriteTxnMsgT(msgType, index) 4170 defer txn.Abort() 4171 4172 // Retrieve deployment and ensure it is not terminal and is active 4173 ws := memdb.NewWatchSet() 4174 deployment, err := s.deploymentByIDImpl(ws, req.DeploymentID, txn) 4175 if err != nil { 4176 return err 4177 } else if deployment == nil { 4178 return fmt.Errorf("Deployment ID %q couldn't be updated as it does not exist", req.DeploymentID) 4179 } else if !deployment.Active() { 4180 return fmt.Errorf("Deployment %q has terminal status %q:", deployment.ID, deployment.Status) 4181 } 4182 4183 // Update the health status of each allocation 4184 if total := len(req.HealthyAllocationIDs) + len(req.UnhealthyAllocationIDs); total != 0 { 4185 setAllocHealth := func(id string, healthy bool, ts time.Time) error { 4186 existing, err := txn.First("allocs", "id", id) 4187 if err != nil { 4188 return fmt.Errorf("alloc %q lookup failed: %v", id, err) 4189 } 4190 if existing == nil { 4191 return fmt.Errorf("unknown alloc %q", id) 4192 } 4193 4194 old := existing.(*structs.Allocation) 4195 if old.DeploymentID != req.DeploymentID { 4196 return fmt.Errorf("alloc %q is not part of deployment %q", id, req.DeploymentID) 4197 } 4198 4199 // Set the health 4200 copy := old.Copy() 4201 if copy.DeploymentStatus == nil { 4202 copy.DeploymentStatus = &structs.AllocDeploymentStatus{} 4203 } 4204 copy.DeploymentStatus.Healthy = helper.BoolToPtr(healthy) 4205 copy.DeploymentStatus.Timestamp = ts 4206 copy.DeploymentStatus.ModifyIndex = index 4207 copy.ModifyIndex = index 4208 4209 if err := s.updateDeploymentWithAlloc(index, copy, old, txn); err != nil { 4210 return fmt.Errorf("error updating deployment: %v", err) 4211 } 4212 4213 if err := txn.Insert("allocs", copy); err != nil { 4214 return fmt.Errorf("alloc insert failed: %v", err) 4215 } 4216 4217 return nil 4218 } 4219 4220 for _, id := range req.HealthyAllocationIDs { 4221 if err := setAllocHealth(id, true, req.Timestamp); err != nil { 4222 return err 4223 } 4224 } 4225 for _, id := range req.UnhealthyAllocationIDs { 4226 if err := setAllocHealth(id, false, req.Timestamp); err != nil { 4227 return err 4228 } 4229 } 4230 4231 // Update the indexes 4232 if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil { 4233 return fmt.Errorf("index update failed: %v", err) 4234 } 4235 } 4236 4237 // Update the deployment status as needed. 4238 if req.DeploymentUpdate != nil { 4239 if err := s.updateDeploymentStatusImpl(index, req.DeploymentUpdate, txn); err != nil { 4240 return err 4241 } 4242 } 4243 4244 // Upsert the job if necessary 4245 if req.Job != nil { 4246 if err := s.upsertJobImpl(index, req.Job, false, txn); err != nil { 4247 return err 4248 } 4249 } 4250 4251 // Upsert the optional eval 4252 if req.Eval != nil { 4253 if err := s.nestedUpsertEval(txn, index, req.Eval); err != nil { 4254 return err 4255 } 4256 } 4257 4258 return txn.Commit() 4259} 4260 4261// LastIndex returns the greatest index value for all indexes 4262func (s *StateStore) LatestIndex() (uint64, error) { 4263 indexes, err := s.Indexes() 4264 if err != nil { 4265 return 0, err 4266 } 4267 4268 var max uint64 = 0 4269 for { 4270 raw := indexes.Next() 4271 if raw == nil { 4272 break 4273 } 4274 4275 // Prepare the request struct 4276 idx := raw.(*IndexEntry) 4277 4278 // Determine the max 4279 if idx.Value > max { 4280 max = idx.Value 4281 } 4282 } 4283 4284 return max, nil 4285} 4286 4287// Index finds the matching index value 4288func (s *StateStore) Index(name string) (uint64, error) { 4289 txn := s.db.ReadTxn() 4290 4291 // Lookup the first matching index 4292 out, err := txn.First("index", "id", name) 4293 if err != nil { 4294 return 0, err 4295 } 4296 if out == nil { 4297 return 0, nil 4298 } 4299 return out.(*IndexEntry).Value, nil 4300} 4301 4302// Indexes returns an iterator over all the indexes 4303func (s *StateStore) Indexes() (memdb.ResultIterator, error) { 4304 txn := s.db.ReadTxn() 4305 4306 // Walk the entire nodes table 4307 iter, err := txn.Get("index", "id") 4308 if err != nil { 4309 return nil, err 4310 } 4311 return iter, nil 4312} 4313 4314// ReconcileJobSummaries re-creates summaries for all jobs present in the state 4315// store 4316func (s *StateStore) ReconcileJobSummaries(index uint64) error { 4317 txn := s.db.WriteTxn(index) 4318 defer txn.Abort() 4319 4320 // Get all the jobs 4321 iter, err := txn.Get("jobs", "id") 4322 if err != nil { 4323 return err 4324 } 4325 // COMPAT: Remove after 0.11 4326 // Iterate over jobs to build a list of parent jobs and their children 4327 parentMap := make(map[string][]*structs.Job) 4328 for { 4329 rawJob := iter.Next() 4330 if rawJob == nil { 4331 break 4332 } 4333 job := rawJob.(*structs.Job) 4334 if job.ParentID != "" { 4335 children := parentMap[job.ParentID] 4336 children = append(children, job) 4337 parentMap[job.ParentID] = children 4338 } 4339 } 4340 4341 // Get all the jobs again 4342 iter, err = txn.Get("jobs", "id") 4343 if err != nil { 4344 return err 4345 } 4346 4347 for { 4348 rawJob := iter.Next() 4349 if rawJob == nil { 4350 break 4351 } 4352 job := rawJob.(*structs.Job) 4353 4354 if job.IsParameterized() || job.IsPeriodic() { 4355 // COMPAT: Remove after 0.11 4356 4357 // The following block of code fixes incorrect child summaries due to a bug 4358 // See https://github.com/hashicorp/nomad/issues/3886 for details 4359 rawSummary, err := txn.First("job_summary", "id", job.Namespace, job.ID) 4360 if err != nil { 4361 return err 4362 } 4363 if rawSummary == nil { 4364 continue 4365 } 4366 4367 oldSummary := rawSummary.(*structs.JobSummary) 4368 4369 // Create an empty summary 4370 summary := &structs.JobSummary{ 4371 JobID: job.ID, 4372 Namespace: job.Namespace, 4373 Summary: make(map[string]structs.TaskGroupSummary), 4374 Children: &structs.JobChildrenSummary{}, 4375 } 4376 4377 // Iterate over children of this job if any to fix summary counts 4378 children := parentMap[job.ID] 4379 for _, childJob := range children { 4380 switch childJob.Status { 4381 case structs.JobStatusPending: 4382 summary.Children.Pending++ 4383 case structs.JobStatusDead: 4384 summary.Children.Dead++ 4385 case structs.JobStatusRunning: 4386 summary.Children.Running++ 4387 } 4388 } 4389 4390 // Insert the job summary if its different 4391 if !reflect.DeepEqual(summary, oldSummary) { 4392 // Set the create index of the summary same as the job's create index 4393 // and the modify index to the current index 4394 summary.CreateIndex = job.CreateIndex 4395 summary.ModifyIndex = index 4396 4397 if err := txn.Insert("job_summary", summary); err != nil { 4398 return fmt.Errorf("error inserting job summary: %v", err) 4399 } 4400 } 4401 4402 // Done with handling a parent job, continue to next 4403 continue 4404 } 4405 4406 // Create a job summary for the job 4407 summary := &structs.JobSummary{ 4408 JobID: job.ID, 4409 Namespace: job.Namespace, 4410 Summary: make(map[string]structs.TaskGroupSummary), 4411 } 4412 for _, tg := range job.TaskGroups { 4413 summary.Summary[tg.Name] = structs.TaskGroupSummary{} 4414 } 4415 4416 // Find all the allocations for the jobs 4417 iterAllocs, err := txn.Get("allocs", "job", job.Namespace, job.ID) 4418 if err != nil { 4419 return err 4420 } 4421 4422 // Calculate the summary for the job 4423 for { 4424 rawAlloc := iterAllocs.Next() 4425 if rawAlloc == nil { 4426 break 4427 } 4428 alloc := rawAlloc.(*structs.Allocation) 4429 4430 // Ignore the allocation if it doesn't belong to the currently 4431 // registered job. The allocation is checked because of issue #2304 4432 if alloc.Job == nil || alloc.Job.CreateIndex != job.CreateIndex { 4433 continue 4434 } 4435 4436 tg := summary.Summary[alloc.TaskGroup] 4437 switch alloc.ClientStatus { 4438 case structs.AllocClientStatusFailed: 4439 tg.Failed += 1 4440 case structs.AllocClientStatusLost: 4441 tg.Lost += 1 4442 case structs.AllocClientStatusComplete: 4443 tg.Complete += 1 4444 case structs.AllocClientStatusRunning: 4445 tg.Running += 1 4446 case structs.AllocClientStatusPending: 4447 tg.Starting += 1 4448 default: 4449 s.logger.Error("invalid client status set on allocation", "client_status", alloc.ClientStatus, "alloc_id", alloc.ID) 4450 } 4451 summary.Summary[alloc.TaskGroup] = tg 4452 } 4453 4454 // Set the create index of the summary same as the job's create index 4455 // and the modify index to the current index 4456 summary.CreateIndex = job.CreateIndex 4457 summary.ModifyIndex = index 4458 4459 // Insert the job summary 4460 if err := txn.Insert("job_summary", summary); err != nil { 4461 return fmt.Errorf("error inserting job summary: %v", err) 4462 } 4463 } 4464 4465 // Update the indexes table for job summary 4466 if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { 4467 return fmt.Errorf("index update failed: %v", err) 4468 } 4469 return txn.Commit() 4470} 4471 4472// setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID. 4473// It takes a map of job IDs to an optional forceStatus string. It returns an 4474// error if the job doesn't exist or setJobStatus fails. 4475func (s *StateStore) setJobStatuses(index uint64, txn *txn, 4476 jobs map[structs.NamespacedID]string, evalDelete bool) error { 4477 for tuple, forceStatus := range jobs { 4478 4479 existing, err := txn.First("jobs", "id", tuple.Namespace, tuple.ID) 4480 if err != nil { 4481 return fmt.Errorf("job lookup failed: %v", err) 4482 } 4483 4484 if existing == nil { 4485 continue 4486 } 4487 4488 if err := s.setJobStatus(index, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil { 4489 return err 4490 } 4491 4492 } 4493 4494 return nil 4495} 4496 4497// setJobStatus sets the status of the job by looking up associated evaluations 4498// and allocations. evalDelete should be set to true if setJobStatus is being 4499// called because an evaluation is being deleted (potentially because of garbage 4500// collection). If forceStatus is non-empty, the job's status will be set to the 4501// passed status. 4502func (s *StateStore) setJobStatus(index uint64, txn *txn, 4503 job *structs.Job, evalDelete bool, forceStatus string) error { 4504 4505 // Capture the current status so we can check if there is a change 4506 oldStatus := job.Status 4507 newStatus := forceStatus 4508 4509 // If forceStatus is not set, compute the jobs status. 4510 if forceStatus == "" { 4511 var err error 4512 newStatus, err = s.getJobStatus(txn, job, evalDelete) 4513 if err != nil { 4514 return err 4515 } 4516 } 4517 4518 // Fast-path if the job has not changed. 4519 if oldStatus == newStatus { 4520 return nil 4521 } 4522 4523 // Copy and update the existing job 4524 updated := job.Copy() 4525 updated.Status = newStatus 4526 updated.ModifyIndex = index 4527 4528 // Insert the job 4529 if err := txn.Insert("jobs", updated); err != nil { 4530 return fmt.Errorf("job insert failed: %v", err) 4531 } 4532 if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil { 4533 return fmt.Errorf("index update failed: %v", err) 4534 } 4535 4536 // Update the children summary 4537 if err := s.setJobSummary(txn, updated, index, oldStatus, newStatus); err != nil { 4538 return fmt.Errorf("job summary update failed %w", err) 4539 } 4540 return nil 4541} 4542 4543func (s *StateStore) setJobSummary(txn *txn, updated *structs.Job, index uint64, oldStatus, newStatus string) error { 4544 if updated.ParentID == "" { 4545 return nil 4546 } 4547 4548 // Try to update the summary of the parent job summary 4549 summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID) 4550 if err != nil { 4551 return fmt.Errorf("unable to retrieve summary for parent job: %v", err) 4552 } 4553 4554 // Only continue if the summary exists. It could not exist if the parent 4555 // job was removed 4556 if summaryRaw != nil { 4557 existing := summaryRaw.(*structs.JobSummary) 4558 pSummary := existing.Copy() 4559 if pSummary.Children == nil { 4560 pSummary.Children = new(structs.JobChildrenSummary) 4561 } 4562 4563 // Determine the transition and update the correct fields 4564 children := pSummary.Children 4565 4566 // Decrement old status 4567 if oldStatus != "" { 4568 switch oldStatus { 4569 case structs.JobStatusPending: 4570 children.Pending-- 4571 case structs.JobStatusRunning: 4572 children.Running-- 4573 case structs.JobStatusDead: 4574 children.Dead-- 4575 default: 4576 return fmt.Errorf("unknown old job status %q", oldStatus) 4577 } 4578 } 4579 4580 // Increment new status 4581 switch newStatus { 4582 case structs.JobStatusPending: 4583 children.Pending++ 4584 case structs.JobStatusRunning: 4585 children.Running++ 4586 case structs.JobStatusDead: 4587 children.Dead++ 4588 default: 4589 return fmt.Errorf("unknown new job status %q", newStatus) 4590 } 4591 4592 // Update the index 4593 pSummary.ModifyIndex = index 4594 4595 // Insert the summary 4596 if err := txn.Insert("job_summary", pSummary); err != nil { 4597 return fmt.Errorf("job summary insert failed: %v", err) 4598 } 4599 if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { 4600 return fmt.Errorf("index update failed: %v", err) 4601 } 4602 } 4603 return nil 4604} 4605 4606func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) (string, error) { 4607 // System, Periodic and Parameterized jobs are running until explicitly 4608 // stopped 4609 if job.Type == structs.JobTypeSystem || job.IsParameterized() || job.IsPeriodic() { 4610 if job.Stop { 4611 return structs.JobStatusDead, nil 4612 } 4613 4614 return structs.JobStatusRunning, nil 4615 } 4616 4617 allocs, err := txn.Get("allocs", "job", job.Namespace, job.ID) 4618 if err != nil { 4619 return "", err 4620 } 4621 4622 // If there is a non-terminal allocation, the job is running. 4623 hasAlloc := false 4624 for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() { 4625 hasAlloc = true 4626 if !alloc.(*structs.Allocation).TerminalStatus() { 4627 return structs.JobStatusRunning, nil 4628 } 4629 } 4630 4631 evals, err := txn.Get("evals", "job_prefix", job.Namespace, job.ID) 4632 if err != nil { 4633 return "", err 4634 } 4635 4636 hasEval := false 4637 for raw := evals.Next(); raw != nil; raw = evals.Next() { 4638 e := raw.(*structs.Evaluation) 4639 4640 // Filter non-exact matches 4641 if e.JobID != job.ID { 4642 continue 4643 } 4644 4645 hasEval = true 4646 if !e.TerminalStatus() { 4647 return structs.JobStatusPending, nil 4648 } 4649 } 4650 4651 // The job is dead if all the allocations and evals are terminal or if there 4652 // are no evals because of garbage collection. 4653 if evalDelete || hasEval || hasAlloc { 4654 return structs.JobStatusDead, nil 4655 } 4656 4657 return structs.JobStatusPending, nil 4658} 4659 4660// updateSummaryWithJob creates or updates job summaries when new jobs are 4661// upserted or existing ones are updated 4662func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, 4663 txn *txn) error { 4664 4665 // Update the job summary 4666 summaryRaw, err := txn.First("job_summary", "id", job.Namespace, job.ID) 4667 if err != nil { 4668 return fmt.Errorf("job summary lookup failed: %v", err) 4669 } 4670 4671 // Get the summary or create if necessary 4672 var summary *structs.JobSummary 4673 hasSummaryChanged := false 4674 if summaryRaw != nil { 4675 summary = summaryRaw.(*structs.JobSummary).Copy() 4676 } else { 4677 summary = &structs.JobSummary{ 4678 JobID: job.ID, 4679 Namespace: job.Namespace, 4680 Summary: make(map[string]structs.TaskGroupSummary), 4681 Children: new(structs.JobChildrenSummary), 4682 CreateIndex: index, 4683 } 4684 hasSummaryChanged = true 4685 } 4686 4687 for _, tg := range job.TaskGroups { 4688 if _, ok := summary.Summary[tg.Name]; !ok { 4689 newSummary := structs.TaskGroupSummary{ 4690 Complete: 0, 4691 Failed: 0, 4692 Running: 0, 4693 Starting: 0, 4694 } 4695 summary.Summary[tg.Name] = newSummary 4696 hasSummaryChanged = true 4697 } 4698 } 4699 4700 // The job summary has changed, so update the modify index. 4701 if hasSummaryChanged { 4702 summary.ModifyIndex = index 4703 4704 // Update the indexes table for job summary 4705 if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { 4706 return fmt.Errorf("index update failed: %v", err) 4707 } 4708 if err := txn.Insert("job_summary", summary); err != nil { 4709 return err 4710 } 4711 } 4712 4713 return nil 4714} 4715 4716// updateJobScalingPolicies upserts any scaling policies contained in the job and removes 4717// any previous scaling policies that were removed from the job 4718func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, txn *txn) error { 4719 4720 ws := memdb.NewWatchSet() 4721 4722 scalingPolicies := job.GetScalingPolicies() 4723 newTargets := map[string]bool{} 4724 for _, p := range scalingPolicies { 4725 newTargets[p.JobKey()] = true 4726 } 4727 // find existing policies that need to be deleted 4728 deletedPolicies := []string{} 4729 iter, err := s.ScalingPoliciesByJobTxn(ws, job.Namespace, job.ID, txn) 4730 if err != nil { 4731 return fmt.Errorf("ScalingPoliciesByJob lookup failed: %v", err) 4732 } 4733 for raw := iter.Next(); raw != nil; raw = iter.Next() { 4734 oldPolicy := raw.(*structs.ScalingPolicy) 4735 if !newTargets[oldPolicy.JobKey()] { 4736 deletedPolicies = append(deletedPolicies, oldPolicy.ID) 4737 } 4738 } 4739 err = s.DeleteScalingPoliciesTxn(index, deletedPolicies, txn) 4740 if err != nil { 4741 return fmt.Errorf("DeleteScalingPolicies of removed policies failed: %v", err) 4742 } 4743 4744 err = s.UpsertScalingPoliciesTxn(index, scalingPolicies, txn) 4745 if err != nil { 4746 return fmt.Errorf("UpsertScalingPolicies of policies failed: %v", err) 4747 } 4748 4749 return nil 4750} 4751 4752// updateJobCSIPlugins runs on job update, and indexes the job in the plugin 4753func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, txn *txn) error { 4754 plugIns := make(map[string]*structs.CSIPlugin) 4755 4756 loop := func(job *structs.Job, delete bool) error { 4757 for _, tg := range job.TaskGroups { 4758 for _, t := range tg.Tasks { 4759 if t.CSIPluginConfig == nil { 4760 continue 4761 } 4762 4763 plugIn, ok := plugIns[t.CSIPluginConfig.ID] 4764 if !ok { 4765 p, err := s.CSIPluginByIDTxn(txn, nil, t.CSIPluginConfig.ID) 4766 if err != nil { 4767 return err 4768 } 4769 if p == nil { 4770 plugIn = structs.NewCSIPlugin(t.CSIPluginConfig.ID, index) 4771 } else { 4772 plugIn = p.Copy() 4773 plugIn.ModifyIndex = index 4774 } 4775 plugIns[plugIn.ID] = plugIn 4776 } 4777 4778 if delete { 4779 plugIn.DeleteJob(job, nil) 4780 } else { 4781 plugIn.AddJob(job, nil) 4782 } 4783 } 4784 } 4785 4786 return nil 4787 } 4788 4789 if prev != nil { 4790 err := loop(prev, true) 4791 if err != nil { 4792 return err 4793 } 4794 } 4795 4796 err := loop(job, false) 4797 if err != nil { 4798 return err 4799 } 4800 4801 for _, plugIn := range plugIns { 4802 err = txn.Insert("csi_plugins", plugIn) 4803 if err != nil { 4804 return fmt.Errorf("csi_plugins insert error: %v", err) 4805 } 4806 } 4807 4808 if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { 4809 return fmt.Errorf("index update failed: %v", err) 4810 } 4811 4812 return nil 4813} 4814 4815// updateDeploymentWithAlloc is used to update the deployment state associated 4816// with the given allocation. The passed alloc may be updated if the deployment 4817// status has changed to capture the modify index at which it has changed. 4818func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *structs.Allocation, txn *txn) error { 4819 // Nothing to do if the allocation is not associated with a deployment 4820 if alloc.DeploymentID == "" { 4821 return nil 4822 } 4823 4824 // Get the deployment 4825 ws := memdb.NewWatchSet() 4826 deployment, err := s.deploymentByIDImpl(ws, alloc.DeploymentID, txn) 4827 if err != nil { 4828 return err 4829 } 4830 if deployment == nil { 4831 return nil 4832 } 4833 4834 // Retrieve the deployment state object 4835 _, ok := deployment.TaskGroups[alloc.TaskGroup] 4836 if !ok { 4837 // If the task group isn't part of the deployment, the task group wasn't 4838 // part of a rolling update so nothing to do 4839 return nil 4840 } 4841 4842 // Do not modify in-place. Instead keep track of what must be done 4843 placed := 0 4844 healthy := 0 4845 unhealthy := 0 4846 4847 // If there was no existing allocation, this is a placement and we increment 4848 // the placement 4849 existingHealthSet := existing != nil && existing.DeploymentStatus.HasHealth() 4850 allocHealthSet := alloc.DeploymentStatus.HasHealth() 4851 if existing == nil || existing.DeploymentID != alloc.DeploymentID { 4852 placed++ 4853 } else if !existingHealthSet && allocHealthSet { 4854 if *alloc.DeploymentStatus.Healthy { 4855 healthy++ 4856 } else { 4857 unhealthy++ 4858 } 4859 } else if existingHealthSet && allocHealthSet { 4860 // See if it has gone from healthy to unhealthy 4861 if *existing.DeploymentStatus.Healthy && !*alloc.DeploymentStatus.Healthy { 4862 healthy-- 4863 unhealthy++ 4864 } 4865 } 4866 4867 // Nothing to do 4868 if placed == 0 && healthy == 0 && unhealthy == 0 { 4869 return nil 4870 } 4871 4872 // Update the allocation's deployment status modify index 4873 if alloc.DeploymentStatus != nil && healthy+unhealthy != 0 { 4874 alloc.DeploymentStatus.ModifyIndex = index 4875 } 4876 4877 // Create a copy of the deployment object 4878 deploymentCopy := deployment.Copy() 4879 deploymentCopy.ModifyIndex = index 4880 4881 dstate := deploymentCopy.TaskGroups[alloc.TaskGroup] 4882 dstate.PlacedAllocs += placed 4883 dstate.HealthyAllocs += healthy 4884 dstate.UnhealthyAllocs += unhealthy 4885 4886 // Ensure PlacedCanaries accurately reflects the alloc canary status 4887 if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Canary { 4888 found := false 4889 for _, canary := range dstate.PlacedCanaries { 4890 if alloc.ID == canary { 4891 found = true 4892 break 4893 } 4894 } 4895 if !found { 4896 dstate.PlacedCanaries = append(dstate.PlacedCanaries, alloc.ID) 4897 } 4898 } 4899 4900 // Update the progress deadline 4901 if pd := dstate.ProgressDeadline; pd != 0 { 4902 // If we are the first placed allocation for the deployment start the progress deadline. 4903 if placed != 0 && dstate.RequireProgressBy.IsZero() { 4904 // Use modify time instead of create time because we may in-place 4905 // update the allocation to be part of a new deployment. 4906 dstate.RequireProgressBy = time.Unix(0, alloc.ModifyTime).Add(pd) 4907 } else if healthy != 0 { 4908 if d := alloc.DeploymentStatus.Timestamp.Add(pd); d.After(dstate.RequireProgressBy) { 4909 dstate.RequireProgressBy = d 4910 } 4911 } 4912 } 4913 4914 // Upsert the deployment 4915 if err := s.upsertDeploymentImpl(index, deploymentCopy, txn); err != nil { 4916 return err 4917 } 4918 4919 return nil 4920} 4921 4922// updateSummaryWithAlloc updates the job summary when allocations are updated 4923// or inserted 4924func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation, 4925 existingAlloc *structs.Allocation, txn *txn) error { 4926 4927 // We don't have to update the summary if the job is missing 4928 if alloc.Job == nil { 4929 return nil 4930 } 4931 4932 summaryRaw, err := txn.First("job_summary", "id", alloc.Namespace, alloc.JobID) 4933 if err != nil { 4934 return fmt.Errorf("unable to lookup job summary for job id %q in namespace %q: %v", alloc.JobID, alloc.Namespace, err) 4935 } 4936 4937 if summaryRaw == nil { 4938 // Check if the job is de-registered 4939 rawJob, err := txn.First("jobs", "id", alloc.Namespace, alloc.JobID) 4940 if err != nil { 4941 return fmt.Errorf("unable to query job: %v", err) 4942 } 4943 4944 // If the job is de-registered then we skip updating it's summary 4945 if rawJob == nil { 4946 return nil 4947 } 4948 4949 return fmt.Errorf("job summary for job %q in namespace %q is not present", alloc.JobID, alloc.Namespace) 4950 } 4951 4952 // Get a copy of the existing summary 4953 jobSummary := summaryRaw.(*structs.JobSummary).Copy() 4954 4955 // Not updating the job summary because the allocation doesn't belong to the 4956 // currently registered job 4957 if jobSummary.CreateIndex != alloc.Job.CreateIndex { 4958 return nil 4959 } 4960 4961 tgSummary, ok := jobSummary.Summary[alloc.TaskGroup] 4962 if !ok { 4963 return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup) 4964 } 4965 4966 summaryChanged := false 4967 if existingAlloc == nil { 4968 switch alloc.DesiredStatus { 4969 case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: 4970 s.logger.Error("new allocation inserted into state store with bad desired status", 4971 "alloc_id", alloc.ID, "desired_status", alloc.DesiredStatus) 4972 } 4973 switch alloc.ClientStatus { 4974 case structs.AllocClientStatusPending: 4975 tgSummary.Starting += 1 4976 if tgSummary.Queued > 0 { 4977 tgSummary.Queued -= 1 4978 } 4979 summaryChanged = true 4980 case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed, 4981 structs.AllocClientStatusComplete: 4982 s.logger.Error("new allocation inserted into state store with bad client status", 4983 "alloc_id", alloc.ID, "client_status", alloc.ClientStatus) 4984 } 4985 } else if existingAlloc.ClientStatus != alloc.ClientStatus { 4986 // Incrementing the client of the bin of the current state 4987 switch alloc.ClientStatus { 4988 case structs.AllocClientStatusRunning: 4989 tgSummary.Running += 1 4990 case structs.AllocClientStatusFailed: 4991 tgSummary.Failed += 1 4992 case structs.AllocClientStatusPending: 4993 tgSummary.Starting += 1 4994 case structs.AllocClientStatusComplete: 4995 tgSummary.Complete += 1 4996 case structs.AllocClientStatusLost: 4997 tgSummary.Lost += 1 4998 } 4999 5000 // Decrementing the count of the bin of the last state 5001 switch existingAlloc.ClientStatus { 5002 case structs.AllocClientStatusRunning: 5003 if tgSummary.Running > 0 { 5004 tgSummary.Running -= 1 5005 } 5006 case structs.AllocClientStatusPending: 5007 if tgSummary.Starting > 0 { 5008 tgSummary.Starting -= 1 5009 } 5010 case structs.AllocClientStatusLost: 5011 if tgSummary.Lost > 0 { 5012 tgSummary.Lost -= 1 5013 } 5014 case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete: 5015 default: 5016 s.logger.Error("invalid old client status for allocation", 5017 "alloc_id", existingAlloc.ID, "client_status", existingAlloc.ClientStatus) 5018 } 5019 summaryChanged = true 5020 } 5021 jobSummary.Summary[alloc.TaskGroup] = tgSummary 5022 5023 if summaryChanged { 5024 jobSummary.ModifyIndex = index 5025 5026 s.updatePluginWithJobSummary(index, jobSummary, alloc, txn) 5027 5028 // Update the indexes table for job summary 5029 if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { 5030 return fmt.Errorf("index update failed: %v", err) 5031 } 5032 5033 if err := txn.Insert("job_summary", jobSummary); err != nil { 5034 return fmt.Errorf("updating job summary failed: %v", err) 5035 } 5036 } 5037 5038 return nil 5039} 5040 5041// updatePluginWithAlloc updates the CSI plugins for an alloc when the 5042// allocation is updated or inserted with a terminal server status. 5043func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation, 5044 txn *txn) error { 5045 if !alloc.ServerTerminalStatus() { 5046 return nil 5047 } 5048 5049 tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) 5050 for _, t := range tg.Tasks { 5051 if t.CSIPluginConfig != nil { 5052 pluginID := t.CSIPluginConfig.ID 5053 plug, err := s.CSIPluginByIDTxn(txn, nil, pluginID) 5054 if err != nil { 5055 return err 5056 } 5057 if plug == nil { 5058 // plugin may not have been created because it never 5059 // became healthy, just move on 5060 return nil 5061 } 5062 plug = plug.Copy() 5063 err = plug.DeleteAlloc(alloc.ID, alloc.NodeID) 5064 if err != nil { 5065 return err 5066 } 5067 err = updateOrGCPlugin(index, txn, plug) 5068 if err != nil { 5069 return err 5070 } 5071 } 5072 } 5073 5074 return nil 5075} 5076 5077// updatePluginWithJobSummary updates the CSI plugins for a job when the 5078// job summary is updated by an alloc 5079func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.JobSummary, alloc *structs.Allocation, 5080 txn *txn) error { 5081 5082 tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) 5083 if tg == nil { 5084 return nil 5085 } 5086 5087 for _, t := range tg.Tasks { 5088 if t.CSIPluginConfig != nil { 5089 pluginID := t.CSIPluginConfig.ID 5090 plug, err := s.CSIPluginByIDTxn(txn, nil, pluginID) 5091 if err != nil { 5092 return err 5093 } 5094 if plug == nil { 5095 plug = structs.NewCSIPlugin(pluginID, index) 5096 } else { 5097 plug = plug.Copy() 5098 } 5099 5100 plug.UpdateExpectedWithJob(alloc.Job, summary, alloc.ServerTerminalStatus()) 5101 err = updateOrGCPlugin(index, txn, plug) 5102 if err != nil { 5103 return err 5104 } 5105 } 5106 } 5107 5108 return nil 5109} 5110 5111// UpsertACLPolicies is used to create or update a set of ACL policies 5112func (s *StateStore) UpsertACLPolicies(msgType structs.MessageType, index uint64, policies []*structs.ACLPolicy) error { 5113 txn := s.db.WriteTxnMsgT(msgType, index) 5114 defer txn.Abort() 5115 5116 for _, policy := range policies { 5117 // Ensure the policy hash is non-nil. This should be done outside the state store 5118 // for performance reasons, but we check here for defense in depth. 5119 if len(policy.Hash) == 0 { 5120 policy.SetHash() 5121 } 5122 5123 // Check if the policy already exists 5124 existing, err := txn.First("acl_policy", "id", policy.Name) 5125 if err != nil { 5126 return fmt.Errorf("policy lookup failed: %v", err) 5127 } 5128 5129 // Update all the indexes 5130 if existing != nil { 5131 policy.CreateIndex = existing.(*structs.ACLPolicy).CreateIndex 5132 policy.ModifyIndex = index 5133 } else { 5134 policy.CreateIndex = index 5135 policy.ModifyIndex = index 5136 } 5137 5138 // Update the policy 5139 if err := txn.Insert("acl_policy", policy); err != nil { 5140 return fmt.Errorf("upserting policy failed: %v", err) 5141 } 5142 } 5143 5144 // Update the indexes tabl 5145 if err := txn.Insert("index", &IndexEntry{"acl_policy", index}); err != nil { 5146 return fmt.Errorf("index update failed: %v", err) 5147 } 5148 5149 return txn.Commit() 5150} 5151 5152// DeleteACLPolicies deletes the policies with the given names 5153func (s *StateStore) DeleteACLPolicies(msgType structs.MessageType, index uint64, names []string) error { 5154 txn := s.db.WriteTxnMsgT(msgType, index) 5155 defer txn.Abort() 5156 5157 // Delete the policy 5158 for _, name := range names { 5159 if _, err := txn.DeleteAll("acl_policy", "id", name); err != nil { 5160 return fmt.Errorf("deleting acl policy failed: %v", err) 5161 } 5162 } 5163 if err := txn.Insert("index", &IndexEntry{"acl_policy", index}); err != nil { 5164 return fmt.Errorf("index update failed: %v", err) 5165 } 5166 return txn.Commit() 5167} 5168 5169// ACLPolicyByName is used to lookup a policy by name 5170func (s *StateStore) ACLPolicyByName(ws memdb.WatchSet, name string) (*structs.ACLPolicy, error) { 5171 txn := s.db.ReadTxn() 5172 5173 watchCh, existing, err := txn.FirstWatch("acl_policy", "id", name) 5174 if err != nil { 5175 return nil, fmt.Errorf("acl policy lookup failed: %v", err) 5176 } 5177 ws.Add(watchCh) 5178 5179 if existing != nil { 5180 return existing.(*structs.ACLPolicy), nil 5181 } 5182 return nil, nil 5183} 5184 5185// ACLPolicyByNamePrefix is used to lookup policies by prefix 5186func (s *StateStore) ACLPolicyByNamePrefix(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) { 5187 txn := s.db.ReadTxn() 5188 5189 iter, err := txn.Get("acl_policy", "id_prefix", prefix) 5190 if err != nil { 5191 return nil, fmt.Errorf("acl policy lookup failed: %v", err) 5192 } 5193 ws.Add(iter.WatchCh()) 5194 5195 return iter, nil 5196} 5197 5198// ACLPolicies returns an iterator over all the acl policies 5199func (s *StateStore) ACLPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error) { 5200 txn := s.db.ReadTxn() 5201 5202 // Walk the entire table 5203 iter, err := txn.Get("acl_policy", "id") 5204 if err != nil { 5205 return nil, err 5206 } 5207 ws.Add(iter.WatchCh()) 5208 return iter, nil 5209} 5210 5211// UpsertACLTokens is used to create or update a set of ACL tokens 5212func (s *StateStore) UpsertACLTokens(msgType structs.MessageType, index uint64, tokens []*structs.ACLToken) error { 5213 txn := s.db.WriteTxnMsgT(msgType, index) 5214 defer txn.Abort() 5215 5216 for _, token := range tokens { 5217 // Ensure the policy hash is non-nil. This should be done outside the state store 5218 // for performance reasons, but we check here for defense in depth. 5219 if len(token.Hash) == 0 { 5220 token.SetHash() 5221 } 5222 5223 // Check if the token already exists 5224 existing, err := txn.First("acl_token", "id", token.AccessorID) 5225 if err != nil { 5226 return fmt.Errorf("token lookup failed: %v", err) 5227 } 5228 5229 // Update all the indexes 5230 if existing != nil { 5231 existTK := existing.(*structs.ACLToken) 5232 token.CreateIndex = existTK.CreateIndex 5233 token.ModifyIndex = index 5234 5235 // Do not allow SecretID or create time to change 5236 token.SecretID = existTK.SecretID 5237 token.CreateTime = existTK.CreateTime 5238 5239 } else { 5240 token.CreateIndex = index 5241 token.ModifyIndex = index 5242 } 5243 5244 // Update the token 5245 if err := txn.Insert("acl_token", token); err != nil { 5246 return fmt.Errorf("upserting token failed: %v", err) 5247 } 5248 } 5249 5250 // Update the indexes table 5251 if err := txn.Insert("index", &IndexEntry{"acl_token", index}); err != nil { 5252 return fmt.Errorf("index update failed: %v", err) 5253 } 5254 return txn.Commit() 5255} 5256 5257// DeleteACLTokens deletes the tokens with the given accessor ids 5258func (s *StateStore) DeleteACLTokens(msgType structs.MessageType, index uint64, ids []string) error { 5259 txn := s.db.WriteTxnMsgT(msgType, index) 5260 defer txn.Abort() 5261 5262 // Delete the tokens 5263 for _, id := range ids { 5264 if _, err := txn.DeleteAll("acl_token", "id", id); err != nil { 5265 return fmt.Errorf("deleting acl token failed: %v", err) 5266 } 5267 } 5268 if err := txn.Insert("index", &IndexEntry{"acl_token", index}); err != nil { 5269 return fmt.Errorf("index update failed: %v", err) 5270 } 5271 return txn.Commit() 5272} 5273 5274// ACLTokenByAccessorID is used to lookup a token by accessor ID 5275func (s *StateStore) ACLTokenByAccessorID(ws memdb.WatchSet, id string) (*structs.ACLToken, error) { 5276 if id == "" { 5277 return nil, fmt.Errorf("acl token lookup failed: missing accessor id") 5278 } 5279 5280 txn := s.db.ReadTxn() 5281 5282 watchCh, existing, err := txn.FirstWatch("acl_token", "id", id) 5283 if err != nil { 5284 return nil, fmt.Errorf("acl token lookup failed: %v", err) 5285 } 5286 ws.Add(watchCh) 5287 5288 if existing != nil { 5289 return existing.(*structs.ACLToken), nil 5290 } 5291 return nil, nil 5292} 5293 5294// ACLTokenBySecretID is used to lookup a token by secret ID 5295func (s *StateStore) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error) { 5296 if secretID == "" { 5297 return nil, fmt.Errorf("acl token lookup failed: missing secret id") 5298 } 5299 5300 txn := s.db.ReadTxn() 5301 5302 watchCh, existing, err := txn.FirstWatch("acl_token", "secret", secretID) 5303 if err != nil { 5304 return nil, fmt.Errorf("acl token lookup failed: %v", err) 5305 } 5306 ws.Add(watchCh) 5307 5308 if existing != nil { 5309 return existing.(*structs.ACLToken), nil 5310 } 5311 return nil, nil 5312} 5313 5314// ACLTokenByAccessorIDPrefix is used to lookup tokens by prefix 5315func (s *StateStore) ACLTokenByAccessorIDPrefix(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) { 5316 txn := s.db.ReadTxn() 5317 5318 iter, err := txn.Get("acl_token", "id_prefix", prefix) 5319 if err != nil { 5320 return nil, fmt.Errorf("acl token lookup failed: %v", err) 5321 } 5322 ws.Add(iter.WatchCh()) 5323 return iter, nil 5324} 5325 5326// ACLTokens returns an iterator over all the tokens 5327func (s *StateStore) ACLTokens(ws memdb.WatchSet) (memdb.ResultIterator, error) { 5328 txn := s.db.ReadTxn() 5329 5330 // Walk the entire table 5331 iter, err := txn.Get("acl_token", "id") 5332 if err != nil { 5333 return nil, err 5334 } 5335 ws.Add(iter.WatchCh()) 5336 return iter, nil 5337} 5338 5339// ACLTokensByGlobal returns an iterator over all the tokens filtered by global value 5340func (s *StateStore) ACLTokensByGlobal(ws memdb.WatchSet, globalVal bool) (memdb.ResultIterator, error) { 5341 txn := s.db.ReadTxn() 5342 5343 // Walk the entire table 5344 iter, err := txn.Get("acl_token", "global", globalVal) 5345 if err != nil { 5346 return nil, err 5347 } 5348 ws.Add(iter.WatchCh()) 5349 return iter, nil 5350} 5351 5352// CanBootstrapACLToken checks if bootstrapping is possible and returns the reset index 5353func (s *StateStore) CanBootstrapACLToken() (bool, uint64, error) { 5354 txn := s.db.ReadTxn() 5355 5356 // Lookup the bootstrap sentinel 5357 out, err := txn.First("index", "id", "acl_token_bootstrap") 5358 if err != nil { 5359 return false, 0, err 5360 } 5361 5362 // No entry, we haven't bootstrapped yet 5363 if out == nil { 5364 return true, 0, nil 5365 } 5366 5367 // Return the reset index if we've already bootstrapped 5368 return false, out.(*IndexEntry).Value, nil 5369} 5370 5371// BootstrapACLToken is used to create an initial ACL token 5372func (s *StateStore) BootstrapACLTokens(msgType structs.MessageType, index uint64, resetIndex uint64, token *structs.ACLToken) error { 5373 txn := s.db.WriteTxnMsgT(msgType, index) 5374 defer txn.Abort() 5375 5376 // Check if we have already done a bootstrap 5377 existing, err := txn.First("index", "id", "acl_token_bootstrap") 5378 if err != nil { 5379 return fmt.Errorf("bootstrap check failed: %v", err) 5380 } 5381 if existing != nil { 5382 if resetIndex == 0 { 5383 return fmt.Errorf("ACL bootstrap already done") 5384 } else if resetIndex != existing.(*IndexEntry).Value { 5385 return fmt.Errorf("Invalid reset index for ACL bootstrap") 5386 } 5387 } 5388 5389 // Update the Create/Modify time 5390 token.CreateIndex = index 5391 token.ModifyIndex = index 5392 5393 // Insert the token 5394 if err := txn.Insert("acl_token", token); err != nil { 5395 return fmt.Errorf("upserting token failed: %v", err) 5396 } 5397 5398 // Update the indexes table, prevents future bootstrap until reset 5399 if err := txn.Insert("index", &IndexEntry{"acl_token", index}); err != nil { 5400 return fmt.Errorf("index update failed: %v", err) 5401 } 5402 if err := txn.Insert("index", &IndexEntry{"acl_token_bootstrap", index}); err != nil { 5403 return fmt.Errorf("index update failed: %v", err) 5404 } 5405 return txn.Commit() 5406} 5407 5408// UpsertOneTimeToken is used to create or update a set of ACL 5409// tokens. Validating that we're not upserting an already-expired token is 5410// made the responsibility of the caller to facilitate testing. 5411func (s *StateStore) UpsertOneTimeToken(msgType structs.MessageType, index uint64, token *structs.OneTimeToken) error { 5412 txn := s.db.WriteTxnMsgT(msgType, index) 5413 defer txn.Abort() 5414 5415 // we expect the RPC call to set the ExpiresAt 5416 if token.ExpiresAt.IsZero() { 5417 return fmt.Errorf("one-time token must have an ExpiresAt time") 5418 } 5419 5420 // Update all the indexes 5421 token.CreateIndex = index 5422 token.ModifyIndex = index 5423 5424 // Create the token 5425 if err := txn.Insert("one_time_token", token); err != nil { 5426 return fmt.Errorf("upserting one-time token failed: %v", err) 5427 } 5428 5429 // Update the indexes table 5430 if err := txn.Insert("index", &IndexEntry{"one_time_token", index}); err != nil { 5431 return fmt.Errorf("index update failed: %v", err) 5432 } 5433 return txn.Commit() 5434} 5435 5436// DeleteOneTimeTokens deletes the tokens with the given ACLToken Accessor IDs 5437func (s *StateStore) DeleteOneTimeTokens(msgType structs.MessageType, index uint64, ids []string) error { 5438 txn := s.db.WriteTxnMsgT(msgType, index) 5439 defer txn.Abort() 5440 5441 var deleted int 5442 for _, id := range ids { 5443 d, err := txn.DeleteAll("one_time_token", "id", id) 5444 if err != nil { 5445 return fmt.Errorf("deleting one-time token failed: %v", err) 5446 } 5447 deleted += d 5448 } 5449 5450 if deleted > 0 { 5451 if err := txn.Insert("index", &IndexEntry{"one_time_token", index}); err != nil { 5452 return fmt.Errorf("index update failed: %v", err) 5453 } 5454 } 5455 return txn.Commit() 5456} 5457 5458// ExpireOneTimeTokens deletes tokens that have expired 5459func (s *StateStore) ExpireOneTimeTokens(msgType structs.MessageType, index uint64) error { 5460 txn := s.db.WriteTxnMsgT(msgType, index) 5461 defer txn.Abort() 5462 5463 iter, err := s.oneTimeTokensExpiredTxn(txn, nil) 5464 if err != nil { 5465 return err 5466 } 5467 5468 var deleted int 5469 for { 5470 raw := iter.Next() 5471 if raw == nil { 5472 break 5473 } 5474 ott, ok := raw.(*structs.OneTimeToken) 5475 if !ok || ott == nil { 5476 return fmt.Errorf("could not decode one-time token") 5477 } 5478 d, err := txn.DeleteAll("one_time_token", "secret", ott.OneTimeSecretID) 5479 if err != nil { 5480 return fmt.Errorf("deleting one-time token failed: %v", err) 5481 } 5482 deleted += d 5483 } 5484 5485 if deleted > 0 { 5486 if err := txn.Insert("index", &IndexEntry{"one_time_token", index}); err != nil { 5487 return fmt.Errorf("index update failed: %v", err) 5488 } 5489 } 5490 return txn.Commit() 5491} 5492 5493// oneTimeTokensExpiredTxn returns an iterator over all expired one-time tokens 5494func (s *StateStore) oneTimeTokensExpiredTxn(txn *txn, ws memdb.WatchSet) (memdb.ResultIterator, error) { 5495 iter, err := txn.Get("one_time_token", "id") 5496 if err != nil { 5497 return nil, fmt.Errorf("one-time token lookup failed: %v", err) 5498 } 5499 5500 ws.Add(iter.WatchCh()) 5501 iter = memdb.NewFilterIterator(iter, expiredOneTimeTokenFilter(time.Now())) 5502 return iter, nil 5503} 5504 5505// OneTimeTokenBySecret is used to lookup a token by secret 5506func (s *StateStore) OneTimeTokenBySecret(ws memdb.WatchSet, secret string) (*structs.OneTimeToken, error) { 5507 if secret == "" { 5508 return nil, fmt.Errorf("one-time token lookup failed: missing secret") 5509 } 5510 5511 txn := s.db.ReadTxn() 5512 5513 watchCh, existing, err := txn.FirstWatch("one_time_token", "secret", secret) 5514 if err != nil { 5515 return nil, fmt.Errorf("one-time token lookup failed: %v", err) 5516 } 5517 ws.Add(watchCh) 5518 5519 if existing != nil { 5520 return existing.(*structs.OneTimeToken), nil 5521 } 5522 return nil, nil 5523} 5524 5525// expiredOneTimeTokenFilter returns a filter function that returns only 5526// expired one-time tokens 5527func expiredOneTimeTokenFilter(now time.Time) func(interface{}) bool { 5528 return func(raw interface{}) bool { 5529 ott, ok := raw.(*structs.OneTimeToken) 5530 if !ok { 5531 return true 5532 } 5533 5534 return ott.ExpiresAt.After(now) 5535 } 5536} 5537 5538// SchedulerConfig is used to get the current Scheduler configuration. 5539func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) { 5540 tx := s.db.ReadTxn() 5541 defer tx.Abort() 5542 5543 // Get the scheduler config 5544 c, err := tx.First("scheduler_config", "id") 5545 if err != nil { 5546 return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err) 5547 } 5548 5549 config, ok := c.(*structs.SchedulerConfiguration) 5550 if !ok { 5551 return 0, nil, nil 5552 } 5553 5554 return config.ModifyIndex, config, nil 5555} 5556 5557// SchedulerSetConfig is used to set the current Scheduler configuration. 5558func (s *StateStore) SchedulerSetConfig(index uint64, config *structs.SchedulerConfiguration) error { 5559 tx := s.db.WriteTxn(index) 5560 defer tx.Abort() 5561 5562 s.schedulerSetConfigTxn(index, tx, config) 5563 5564 return tx.Commit() 5565} 5566 5567func (s *StateStore) ClusterMetadata(ws memdb.WatchSet) (*structs.ClusterMetadata, error) { 5568 txn := s.db.ReadTxn() 5569 defer txn.Abort() 5570 5571 // Get the cluster metadata 5572 watchCh, m, err := txn.FirstWatch("cluster_meta", "id") 5573 if err != nil { 5574 return nil, errors.Wrap(err, "failed cluster metadata lookup") 5575 } 5576 ws.Add(watchCh) 5577 5578 if m != nil { 5579 return m.(*structs.ClusterMetadata), nil 5580 } 5581 5582 return nil, nil 5583} 5584 5585func (s *StateStore) ClusterSetMetadata(index uint64, meta *structs.ClusterMetadata) error { 5586 txn := s.db.WriteTxn(index) 5587 defer txn.Abort() 5588 5589 if err := s.setClusterMetadata(txn, meta); err != nil { 5590 return errors.Wrap(err, "set cluster metadata failed") 5591 } 5592 5593 return txn.Commit() 5594} 5595 5596// WithWriteTransaction executes the passed function within a write transaction, 5597// and returns its result. If the invocation returns no error, the transaction 5598// is committed; otherwise, it's aborted. 5599func (s *StateStore) WithWriteTransaction(msgType structs.MessageType, index uint64, fn func(Txn) error) error { 5600 tx := s.db.WriteTxnMsgT(msgType, index) 5601 defer tx.Abort() 5602 5603 err := fn(tx) 5604 if err == nil { 5605 return tx.Commit() 5606 } 5607 return err 5608} 5609 5610// SchedulerCASConfig is used to update the scheduler configuration with a 5611// given Raft index. If the CAS index specified is not equal to the last observed index 5612// for the config, then the call is a noop. 5613func (s *StateStore) SchedulerCASConfig(index, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) { 5614 tx := s.db.WriteTxn(index) 5615 defer tx.Abort() 5616 5617 // Check for an existing config 5618 existing, err := tx.First("scheduler_config", "id") 5619 if err != nil { 5620 return false, fmt.Errorf("failed scheduler config lookup: %s", err) 5621 } 5622 5623 // If the existing index does not match the provided CAS 5624 // index arg, then we shouldn't update anything and can safely 5625 // return early here. 5626 e, ok := existing.(*structs.SchedulerConfiguration) 5627 if !ok || (e != nil && e.ModifyIndex != cidx) { 5628 return false, nil 5629 } 5630 5631 s.schedulerSetConfigTxn(index, tx, config) 5632 5633 if err := tx.Commit(); err != nil { 5634 return false, err 5635 } 5636 return true, nil 5637} 5638 5639func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *txn, config *structs.SchedulerConfiguration) error { 5640 // Check for an existing config 5641 existing, err := tx.First("scheduler_config", "id") 5642 if err != nil { 5643 return fmt.Errorf("failed scheduler config lookup: %s", err) 5644 } 5645 5646 // Set the indexes. 5647 if existing != nil { 5648 config.CreateIndex = existing.(*structs.SchedulerConfiguration).CreateIndex 5649 } else { 5650 config.CreateIndex = idx 5651 } 5652 config.ModifyIndex = idx 5653 5654 if err := tx.Insert("scheduler_config", config); err != nil { 5655 return fmt.Errorf("failed updating scheduler config: %s", err) 5656 } 5657 return nil 5658} 5659 5660func (s *StateStore) setClusterMetadata(txn *txn, meta *structs.ClusterMetadata) error { 5661 // Check for an existing config, if it exists, verify that the cluster ID matches 5662 existing, err := txn.First("cluster_meta", "id") 5663 if err != nil { 5664 return fmt.Errorf("failed cluster meta lookup: %v", err) 5665 } 5666 5667 if existing != nil { 5668 existingClusterID := existing.(*structs.ClusterMetadata).ClusterID 5669 if meta.ClusterID != existingClusterID && existingClusterID != "" { 5670 // there is a bug in cluster ID detection 5671 return fmt.Errorf("refusing to set new cluster id, previous: %s, new: %s", existingClusterID, meta.ClusterID) 5672 } 5673 } 5674 5675 // update is technically a noop, unless someday we add more / mutable fields 5676 if err := txn.Insert("cluster_meta", meta); err != nil { 5677 return fmt.Errorf("set cluster metadata failed: %v", err) 5678 } 5679 5680 return nil 5681} 5682 5683// UpsertScalingPolicy is used to insert a new scaling policy. 5684func (s *StateStore) UpsertScalingPolicies(index uint64, scalingPolicies []*structs.ScalingPolicy) error { 5685 txn := s.db.WriteTxn(index) 5686 defer txn.Abort() 5687 5688 if err := s.UpsertScalingPoliciesTxn(index, scalingPolicies, txn); err != nil { 5689 return err 5690 } 5691 5692 return txn.Commit() 5693} 5694 5695// upsertScalingPolicy is used to insert a new scaling policy. 5696func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*structs.ScalingPolicy, 5697 txn *txn) error { 5698 5699 hadUpdates := false 5700 5701 for _, policy := range scalingPolicies { 5702 // Check if the scaling policy already exists 5703 // Policy uniqueness is based on target and type 5704 it, err := txn.Get("scaling_policy", "target", 5705 policy.Target[structs.ScalingTargetNamespace], 5706 policy.Target[structs.ScalingTargetJob], 5707 policy.Target[structs.ScalingTargetGroup], 5708 policy.Target[structs.ScalingTargetTask], 5709 ) 5710 if err != nil { 5711 return fmt.Errorf("scaling policy lookup failed: %v", err) 5712 } 5713 5714 // Check if type matches 5715 var existing *structs.ScalingPolicy 5716 for raw := it.Next(); raw != nil; raw = it.Next() { 5717 p := raw.(*structs.ScalingPolicy) 5718 if p.Type == policy.Type { 5719 existing = p 5720 break 5721 } 5722 } 5723 5724 // Setup the indexes correctly 5725 if existing != nil { 5726 if !existing.Diff(policy) { 5727 continue 5728 } 5729 policy.ID = existing.ID 5730 policy.CreateIndex = existing.CreateIndex 5731 } else { 5732 // policy.ID must have been set already in Job.Register before log apply 5733 policy.CreateIndex = index 5734 } 5735 policy.ModifyIndex = index 5736 5737 // Insert the scaling policy 5738 hadUpdates = true 5739 if err := txn.Insert("scaling_policy", policy); err != nil { 5740 return err 5741 } 5742 } 5743 5744 // Update the indexes table for scaling policy if we updated any policies 5745 if hadUpdates { 5746 if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { 5747 return fmt.Errorf("index update failed: %v", err) 5748 } 5749 } 5750 5751 return nil 5752} 5753 5754// NamespaceByName is used to lookup a namespace by name 5755func (s *StateStore) NamespaceByName(ws memdb.WatchSet, name string) (*structs.Namespace, error) { 5756 txn := s.db.ReadTxn() 5757 return s.namespaceByNameImpl(ws, txn, name) 5758} 5759 5760// namespaceByNameImpl is used to lookup a namespace by name 5761func (s *StateStore) namespaceByNameImpl(ws memdb.WatchSet, txn *txn, name string) (*structs.Namespace, error) { 5762 watchCh, existing, err := txn.FirstWatch(TableNamespaces, "id", name) 5763 if err != nil { 5764 return nil, fmt.Errorf("namespace lookup failed: %v", err) 5765 } 5766 ws.Add(watchCh) 5767 5768 if existing != nil { 5769 return existing.(*structs.Namespace), nil 5770 } 5771 return nil, nil 5772} 5773 5774// namespaceExists returns whether a namespace exists 5775func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) { 5776 if namespace == structs.DefaultNamespace { 5777 return true, nil 5778 } 5779 5780 existing, err := txn.First(TableNamespaces, "id", namespace) 5781 if err != nil { 5782 return false, fmt.Errorf("namespace lookup failed: %v", err) 5783 } 5784 5785 return existing != nil, nil 5786} 5787 5788// NamespacesByNamePrefix is used to lookup namespaces by prefix 5789func (s *StateStore) NamespacesByNamePrefix(ws memdb.WatchSet, namePrefix string) (memdb.ResultIterator, error) { 5790 txn := s.db.ReadTxn() 5791 5792 iter, err := txn.Get(TableNamespaces, "id_prefix", namePrefix) 5793 if err != nil { 5794 return nil, fmt.Errorf("namespaces lookup failed: %v", err) 5795 } 5796 ws.Add(iter.WatchCh()) 5797 5798 return iter, nil 5799} 5800 5801// Namespaces returns an iterator over all the namespaces 5802func (s *StateStore) Namespaces(ws memdb.WatchSet) (memdb.ResultIterator, error) { 5803 txn := s.db.ReadTxn() 5804 5805 // Walk the entire namespace table 5806 iter, err := txn.Get(TableNamespaces, "id") 5807 if err != nil { 5808 return nil, err 5809 } 5810 ws.Add(iter.WatchCh()) 5811 return iter, nil 5812} 5813 5814func (s *StateStore) NamespaceNames() ([]string, error) { 5815 it, err := s.Namespaces(nil) 5816 if err != nil { 5817 return nil, err 5818 } 5819 5820 nses := []string{} 5821 for { 5822 next := it.Next() 5823 if next == nil { 5824 break 5825 } 5826 ns := next.(*structs.Namespace) 5827 nses = append(nses, ns.Name) 5828 } 5829 5830 return nses, nil 5831} 5832 5833// UpsertNamespace is used to register or update a set of namespaces 5834func (s *StateStore) UpsertNamespaces(index uint64, namespaces []*structs.Namespace) error { 5835 txn := s.db.WriteTxn(index) 5836 defer txn.Abort() 5837 5838 for _, ns := range namespaces { 5839 if err := s.upsertNamespaceImpl(index, txn, ns); err != nil { 5840 return err 5841 } 5842 } 5843 5844 if err := txn.Insert("index", &IndexEntry{TableNamespaces, index}); err != nil { 5845 return fmt.Errorf("index update failed: %v", err) 5846 } 5847 5848 return txn.Commit() 5849} 5850 5851// upsertNamespaceImpl is used to upsert a namespace 5852func (s *StateStore) upsertNamespaceImpl(index uint64, txn *txn, namespace *structs.Namespace) error { 5853 // Ensure the namespace hash is non-nil. This should be done outside the state store 5854 // for performance reasons, but we check here for defense in depth. 5855 ns := namespace 5856 if len(ns.Hash) == 0 { 5857 ns.SetHash() 5858 } 5859 5860 // Check if the namespace already exists 5861 existing, err := txn.First(TableNamespaces, "id", ns.Name) 5862 if err != nil { 5863 return fmt.Errorf("namespace lookup failed: %v", err) 5864 } 5865 5866 // Setup the indexes correctly and determine which quotas need to be 5867 // reconciled 5868 var oldQuota string 5869 if existing != nil { 5870 exist := existing.(*structs.Namespace) 5871 ns.CreateIndex = exist.CreateIndex 5872 ns.ModifyIndex = index 5873 5874 // Grab the old quota on the namespace 5875 oldQuota = exist.Quota 5876 } else { 5877 ns.CreateIndex = index 5878 ns.ModifyIndex = index 5879 } 5880 5881 // Validate that the quota on the new namespace exists 5882 if ns.Quota != "" { 5883 exists, err := s.quotaSpecExists(txn, ns.Quota) 5884 if err != nil { 5885 return fmt.Errorf("looking up namespace quota %q failed: %v", ns.Quota, err) 5886 } else if !exists { 5887 return fmt.Errorf("namespace %q using non-existent quota %q", ns.Name, ns.Quota) 5888 } 5889 } 5890 5891 // Insert the namespace 5892 if err := txn.Insert(TableNamespaces, ns); err != nil { 5893 return fmt.Errorf("namespace insert failed: %v", err) 5894 } 5895 5896 // Reconcile changed quotas 5897 return s.quotaReconcile(index, txn, ns.Quota, oldQuota) 5898} 5899 5900// DeleteNamespaces is used to remove a set of namespaces 5901func (s *StateStore) DeleteNamespaces(index uint64, names []string) error { 5902 txn := s.db.WriteTxn(index) 5903 defer txn.Abort() 5904 5905 for _, name := range names { 5906 // Lookup the namespace 5907 existing, err := txn.First(TableNamespaces, "id", name) 5908 if err != nil { 5909 return fmt.Errorf("namespace lookup failed: %v", err) 5910 } 5911 if existing == nil { 5912 return fmt.Errorf("namespace not found") 5913 } 5914 5915 ns := existing.(*structs.Namespace) 5916 if ns.Name == structs.DefaultNamespace { 5917 return fmt.Errorf("default namespace can not be deleted") 5918 } 5919 5920 // Ensure that the namespace doesn't have any non-terminal jobs 5921 iter, err := s.jobsByNamespaceImpl(nil, name, txn) 5922 if err != nil { 5923 return err 5924 } 5925 5926 for { 5927 raw := iter.Next() 5928 if raw == nil { 5929 break 5930 } 5931 job := raw.(*structs.Job) 5932 5933 if job.Status != structs.JobStatusDead { 5934 return fmt.Errorf("namespace %q contains at least one non-terminal job %q. "+ 5935 "All jobs must be terminal in namespace before it can be deleted", name, job.ID) 5936 } 5937 } 5938 5939 // Delete the namespace 5940 if err := txn.Delete(TableNamespaces, existing); err != nil { 5941 return fmt.Errorf("namespace deletion failed: %v", err) 5942 } 5943 } 5944 5945 if err := txn.Insert("index", &IndexEntry{TableNamespaces, index}); err != nil { 5946 return fmt.Errorf("index update failed: %v", err) 5947 } 5948 5949 return txn.Commit() 5950} 5951 5952func (s *StateStore) DeleteScalingPolicies(index uint64, ids []string) error { 5953 txn := s.db.WriteTxn(index) 5954 defer txn.Abort() 5955 5956 err := s.DeleteScalingPoliciesTxn(index, ids, txn) 5957 if err == nil { 5958 return txn.Commit() 5959 } 5960 5961 return err 5962} 5963 5964// DeleteScalingPolicies is used to delete a set of scaling policies by ID 5965func (s *StateStore) DeleteScalingPoliciesTxn(index uint64, ids []string, txn *txn) error { 5966 if len(ids) == 0 { 5967 return nil 5968 } 5969 5970 for _, id := range ids { 5971 // Lookup the scaling policy 5972 existing, err := txn.First("scaling_policy", "id", id) 5973 if err != nil { 5974 return fmt.Errorf("scaling policy lookup failed: %v", err) 5975 } 5976 if existing == nil { 5977 return fmt.Errorf("scaling policy not found") 5978 } 5979 5980 // Delete the scaling policy 5981 if err := txn.Delete("scaling_policy", existing); err != nil { 5982 return fmt.Errorf("scaling policy delete failed: %v", err) 5983 } 5984 } 5985 5986 if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { 5987 return fmt.Errorf("index update failed: %v", err) 5988 } 5989 5990 return nil 5991} 5992 5993// ScalingPolicies returns an iterator over all the scaling policies 5994func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error) { 5995 txn := s.db.ReadTxn() 5996 5997 // Walk the entire scaling_policy table 5998 iter, err := txn.Get("scaling_policy", "id") 5999 if err != nil { 6000 return nil, err 6001 } 6002 6003 ws.Add(iter.WatchCh()) 6004 6005 return iter, nil 6006} 6007 6008// ScalingPoliciesByTypePrefix returns an iterator over scaling policies with a certain type prefix. 6009func (s *StateStore) ScalingPoliciesByTypePrefix(ws memdb.WatchSet, t string) (memdb.ResultIterator, error) { 6010 txn := s.db.ReadTxn() 6011 6012 iter, err := txn.Get("scaling_policy", "type_prefix", t) 6013 if err != nil { 6014 return nil, err 6015 } 6016 6017 ws.Add(iter.WatchCh()) 6018 return iter, nil 6019} 6020 6021func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace, typ string) (memdb.ResultIterator, error) { 6022 txn := s.db.ReadTxn() 6023 6024 iter, err := txn.Get("scaling_policy", "target_prefix", namespace) 6025 if err != nil { 6026 return nil, err 6027 } 6028 6029 ws.Add(iter.WatchCh()) 6030 6031 // Wrap the iterator in a filter to exact match the namespace 6032 iter = memdb.NewFilterIterator(iter, scalingPolicyNamespaceFilter(namespace)) 6033 6034 // If policy type is specified as well, wrap again 6035 if typ != "" { 6036 iter = memdb.NewFilterIterator(iter, func(raw interface{}) bool { 6037 p, ok := raw.(*structs.ScalingPolicy) 6038 if !ok { 6039 return true 6040 } 6041 return !strings.HasPrefix(p.Type, typ) 6042 }) 6043 } 6044 6045 return iter, nil 6046} 6047 6048func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID, policyType string) (memdb.ResultIterator, 6049 error) { 6050 txn := s.db.ReadTxn() 6051 iter, err := s.ScalingPoliciesByJobTxn(ws, namespace, jobID, txn) 6052 if err != nil { 6053 return nil, err 6054 } 6055 6056 if policyType == "" { 6057 return iter, nil 6058 } 6059 6060 filter := func(raw interface{}) bool { 6061 p, ok := raw.(*structs.ScalingPolicy) 6062 if !ok { 6063 return true 6064 } 6065 return policyType != p.Type 6066 } 6067 6068 return memdb.NewFilterIterator(iter, filter), nil 6069} 6070 6071func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID string, 6072 txn *txn) (memdb.ResultIterator, error) { 6073 6074 iter, err := txn.Get("scaling_policy", "target_prefix", namespace, jobID) 6075 if err != nil { 6076 return nil, err 6077 } 6078 6079 ws.Add(iter.WatchCh()) 6080 6081 filter := func(raw interface{}) bool { 6082 d, ok := raw.(*structs.ScalingPolicy) 6083 if !ok { 6084 return true 6085 } 6086 6087 return d.Target[structs.ScalingTargetJob] != jobID 6088 } 6089 6090 // Wrap the iterator in a filter 6091 wrap := memdb.NewFilterIterator(iter, filter) 6092 return wrap, nil 6093} 6094 6095func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.ScalingPolicy, error) { 6096 txn := s.db.ReadTxn() 6097 6098 watchCh, existing, err := txn.FirstWatch("scaling_policy", "id", id) 6099 if err != nil { 6100 return nil, fmt.Errorf("scaling_policy lookup failed: %v", err) 6101 } 6102 ws.Add(watchCh) 6103 6104 if existing != nil { 6105 return existing.(*structs.ScalingPolicy), nil 6106 } 6107 6108 return nil, nil 6109} 6110 6111// ScalingPolicyByTargetAndType returns a fully-qualified policy against a target and policy type, 6112// or nil if it does not exist. This method does not honor the watchset on the policy type, just the target. 6113func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[string]string, typ string) (*structs.ScalingPolicy, 6114 error) { 6115 txn := s.db.ReadTxn() 6116 6117 namespace := target[structs.ScalingTargetNamespace] 6118 job := target[structs.ScalingTargetJob] 6119 group := target[structs.ScalingTargetGroup] 6120 task := target[structs.ScalingTargetTask] 6121 6122 it, err := txn.Get("scaling_policy", "target", namespace, job, group, task) 6123 if err != nil { 6124 return nil, fmt.Errorf("scaling_policy lookup failed: %v", err) 6125 } 6126 6127 ws.Add(it.WatchCh()) 6128 6129 // Check for type 6130 var existing *structs.ScalingPolicy 6131 for raw := it.Next(); raw != nil; raw = it.Next() { 6132 p := raw.(*structs.ScalingPolicy) 6133 if p.Type == typ { 6134 existing = p 6135 break 6136 } 6137 } 6138 6139 if existing != nil { 6140 return existing, nil 6141 } 6142 6143 return nil, nil 6144} 6145 6146func (s *StateStore) ScalingPoliciesByIDPrefix(ws memdb.WatchSet, namespace string, prefix string) (memdb.ResultIterator, error) { 6147 txn := s.db.ReadTxn() 6148 6149 iter, err := txn.Get("scaling_policy", "id_prefix", prefix) 6150 if err != nil { 6151 return nil, fmt.Errorf("scaling policy lookup failed: %v", err) 6152 } 6153 6154 ws.Add(iter.WatchCh()) 6155 6156 iter = memdb.NewFilterIterator(iter, scalingPolicyNamespaceFilter(namespace)) 6157 6158 return iter, nil 6159} 6160 6161// scalingPolicyNamespaceFilter returns a filter function that filters all 6162// scaling policies not targeting the given namespace. 6163func scalingPolicyNamespaceFilter(namespace string) func(interface{}) bool { 6164 return func(raw interface{}) bool { 6165 p, ok := raw.(*structs.ScalingPolicy) 6166 if !ok { 6167 return true 6168 } 6169 6170 return p.Target[structs.ScalingTargetNamespace] != namespace 6171 } 6172} 6173 6174// StateSnapshot is used to provide a point-in-time snapshot 6175type StateSnapshot struct { 6176 StateStore 6177} 6178 6179// DenormalizeAllocationsMap takes in a map of nodes to allocations, and queries the 6180// Allocation for each of the Allocation diffs and merges the updated attributes with 6181// the existing Allocation, and attaches the Job provided 6182func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation) error { 6183 for nodeID, allocs := range nodeAllocations { 6184 denormalizedAllocs, err := s.DenormalizeAllocationSlice(allocs) 6185 if err != nil { 6186 return err 6187 } 6188 6189 nodeAllocations[nodeID] = denormalizedAllocs 6190 } 6191 return nil 6192} 6193 6194// DenormalizeAllocationSlice queries the Allocation for each allocation diff 6195// represented as an Allocation and merges the updated attributes with the existing 6196// Allocation, and attaches the Job provided. 6197// 6198// This should only be called on terminal allocs, particularly stopped or preempted allocs 6199func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation) ([]*structs.Allocation, error) { 6200 allocDiffs := make([]*structs.AllocationDiff, len(allocs)) 6201 for i, alloc := range allocs { 6202 allocDiffs[i] = alloc.AllocationDiff() 6203 } 6204 6205 return s.DenormalizeAllocationDiffSlice(allocDiffs) 6206} 6207 6208// DenormalizeAllocationDiffSlice queries the Allocation for each AllocationDiff and merges 6209// the updated attributes with the existing Allocation, and attaches the Job provided. 6210// 6211// This should only be called on terminal alloc, particularly stopped or preempted allocs 6212func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.AllocationDiff) ([]*structs.Allocation, error) { 6213 // Output index for denormalized Allocations 6214 j := 0 6215 6216 denormalizedAllocs := make([]*structs.Allocation, len(allocDiffs)) 6217 for _, allocDiff := range allocDiffs { 6218 alloc, err := s.AllocByID(nil, allocDiff.ID) 6219 if err != nil { 6220 return nil, fmt.Errorf("alloc lookup failed: %v", err) 6221 } 6222 if alloc == nil { 6223 return nil, fmt.Errorf("alloc %v doesn't exist", allocDiff.ID) 6224 } 6225 6226 // Merge the updates to the Allocation. Don't update alloc.Job for terminal allocs 6227 // so alloc refers to the latest Job view before destruction and to ease handler implementations 6228 allocCopy := alloc.Copy() 6229 6230 if allocDiff.PreemptedByAllocation != "" { 6231 allocCopy.PreemptedByAllocation = allocDiff.PreemptedByAllocation 6232 allocCopy.DesiredDescription = getPreemptedAllocDesiredDescription(allocDiff.PreemptedByAllocation) 6233 allocCopy.DesiredStatus = structs.AllocDesiredStatusEvict 6234 } else { 6235 // If alloc is a stopped alloc 6236 allocCopy.DesiredDescription = allocDiff.DesiredDescription 6237 allocCopy.DesiredStatus = structs.AllocDesiredStatusStop 6238 if allocDiff.ClientStatus != "" { 6239 allocCopy.ClientStatus = allocDiff.ClientStatus 6240 } 6241 if allocDiff.FollowupEvalID != "" { 6242 allocCopy.FollowupEvalID = allocDiff.FollowupEvalID 6243 } 6244 } 6245 if allocDiff.ModifyTime != 0 { 6246 allocCopy.ModifyTime = allocDiff.ModifyTime 6247 } 6248 6249 // Update the allocDiff in the slice to equal the denormalized alloc 6250 denormalizedAllocs[j] = allocCopy 6251 j++ 6252 } 6253 // Retain only the denormalized Allocations in the slice 6254 denormalizedAllocs = denormalizedAllocs[:j] 6255 return denormalizedAllocs, nil 6256} 6257 6258func getPreemptedAllocDesiredDescription(preemptedByAllocID string) string { 6259 return fmt.Sprintf("Preempted by alloc ID %v", preemptedByAllocID) 6260} 6261 6262// StateRestore is used to optimize the performance when 6263// restoring state by only using a single large transaction 6264// instead of thousands of sub transactions 6265type StateRestore struct { 6266 txn *txn 6267} 6268 6269// Abort is used to abort the restore operation 6270func (s *StateRestore) Abort() { 6271 s.txn.Abort() 6272} 6273 6274// Commit is used to commit the restore operation 6275func (s *StateRestore) Commit() error { 6276 return s.txn.Commit() 6277} 6278 6279// NodeRestore is used to restore a node 6280func (r *StateRestore) NodeRestore(node *structs.Node) error { 6281 if err := r.txn.Insert("nodes", node); err != nil { 6282 return fmt.Errorf("node insert failed: %v", err) 6283 } 6284 return nil 6285} 6286 6287// JobRestore is used to restore a job 6288func (r *StateRestore) JobRestore(job *structs.Job) error { 6289 if err := r.txn.Insert("jobs", job); err != nil { 6290 return fmt.Errorf("job insert failed: %v", err) 6291 } 6292 return nil 6293} 6294 6295// EvalRestore is used to restore an evaluation 6296func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error { 6297 if err := r.txn.Insert("evals", eval); err != nil { 6298 return fmt.Errorf("eval insert failed: %v", err) 6299 } 6300 return nil 6301} 6302 6303// AllocRestore is used to restore an allocation 6304func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error { 6305 if err := r.txn.Insert("allocs", alloc); err != nil { 6306 return fmt.Errorf("alloc insert failed: %v", err) 6307 } 6308 return nil 6309} 6310 6311// IndexRestore is used to restore an index 6312func (r *StateRestore) IndexRestore(idx *IndexEntry) error { 6313 if err := r.txn.Insert("index", idx); err != nil { 6314 return fmt.Errorf("index insert failed: %v", err) 6315 } 6316 return nil 6317} 6318 6319// PeriodicLaunchRestore is used to restore a periodic launch. 6320func (r *StateRestore) PeriodicLaunchRestore(launch *structs.PeriodicLaunch) error { 6321 if err := r.txn.Insert("periodic_launch", launch); err != nil { 6322 return fmt.Errorf("periodic launch insert failed: %v", err) 6323 } 6324 return nil 6325} 6326 6327// JobSummaryRestore is used to restore a job summary 6328func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error { 6329 if err := r.txn.Insert("job_summary", jobSummary); err != nil { 6330 return fmt.Errorf("job summary insert failed: %v", err) 6331 } 6332 return nil 6333} 6334 6335// JobVersionRestore is used to restore a job version 6336func (r *StateRestore) JobVersionRestore(version *structs.Job) error { 6337 if err := r.txn.Insert("job_version", version); err != nil { 6338 return fmt.Errorf("job version insert failed: %v", err) 6339 } 6340 return nil 6341} 6342 6343// DeploymentRestore is used to restore a deployment 6344func (r *StateRestore) DeploymentRestore(deployment *structs.Deployment) error { 6345 if err := r.txn.Insert("deployment", deployment); err != nil { 6346 return fmt.Errorf("deployment insert failed: %v", err) 6347 } 6348 return nil 6349} 6350 6351// VaultAccessorRestore is used to restore a vault accessor 6352func (r *StateRestore) VaultAccessorRestore(accessor *structs.VaultAccessor) error { 6353 if err := r.txn.Insert("vault_accessors", accessor); err != nil { 6354 return fmt.Errorf("vault accessor insert failed: %v", err) 6355 } 6356 return nil 6357} 6358 6359// SITokenAccessorRestore is used to restore an SI token accessor 6360func (r *StateRestore) SITokenAccessorRestore(accessor *structs.SITokenAccessor) error { 6361 if err := r.txn.Insert(siTokenAccessorTable, accessor); err != nil { 6362 return errors.Wrap(err, "si token accessor insert failed") 6363 } 6364 return nil 6365} 6366 6367// ACLPolicyRestore is used to restore an ACL policy 6368func (r *StateRestore) ACLPolicyRestore(policy *structs.ACLPolicy) error { 6369 if err := r.txn.Insert("acl_policy", policy); err != nil { 6370 return fmt.Errorf("inserting acl policy failed: %v", err) 6371 } 6372 return nil 6373} 6374 6375// ACLTokenRestore is used to restore an ACL token 6376func (r *StateRestore) ACLTokenRestore(token *structs.ACLToken) error { 6377 if err := r.txn.Insert("acl_token", token); err != nil { 6378 return fmt.Errorf("inserting acl token failed: %v", err) 6379 } 6380 return nil 6381} 6382 6383// OneTimeTokenRestore is used to restore a one-time token 6384func (r *StateRestore) OneTimeTokenRestore(token *structs.OneTimeToken) error { 6385 if err := r.txn.Insert("one_time_token", token); err != nil { 6386 return fmt.Errorf("inserting one-time token failed: %v", err) 6387 } 6388 return nil 6389} 6390 6391func (r *StateRestore) SchedulerConfigRestore(schedConfig *structs.SchedulerConfiguration) error { 6392 if err := r.txn.Insert("scheduler_config", schedConfig); err != nil { 6393 return fmt.Errorf("inserting scheduler config failed: %s", err) 6394 } 6395 return nil 6396} 6397 6398func (r *StateRestore) ClusterMetadataRestore(meta *structs.ClusterMetadata) error { 6399 if err := r.txn.Insert("cluster_meta", meta); err != nil { 6400 return fmt.Errorf("inserting cluster meta failed: %v", err) 6401 } 6402 return nil 6403} 6404 6405// ScalingPolicyRestore is used to restore a scaling policy 6406func (r *StateRestore) ScalingPolicyRestore(scalingPolicy *structs.ScalingPolicy) error { 6407 if err := r.txn.Insert("scaling_policy", scalingPolicy); err != nil { 6408 return fmt.Errorf("scaling policy insert failed: %v", err) 6409 } 6410 return nil 6411} 6412 6413// CSIPluginRestore is used to restore a CSI plugin 6414func (r *StateRestore) CSIPluginRestore(plugin *structs.CSIPlugin) error { 6415 if err := r.txn.Insert("csi_plugins", plugin); err != nil { 6416 return fmt.Errorf("csi plugin insert failed: %v", err) 6417 } 6418 return nil 6419} 6420 6421// CSIVolumeRestore is used to restore a CSI volume 6422func (r *StateRestore) CSIVolumeRestore(volume *structs.CSIVolume) error { 6423 if err := r.txn.Insert("csi_volumes", volume); err != nil { 6424 return fmt.Errorf("csi volume insert failed: %v", err) 6425 } 6426 return nil 6427} 6428 6429// ScalingEventsRestore is used to restore scaling events for a job 6430func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) error { 6431 if err := r.txn.Insert("scaling_event", jobEvents); err != nil { 6432 return fmt.Errorf("scaling event insert failed: %v", err) 6433 } 6434 return nil 6435} 6436 6437// NamespaceRestore is used to restore a namespace 6438func (r *StateRestore) NamespaceRestore(ns *structs.Namespace) error { 6439 if err := r.txn.Insert(TableNamespaces, ns); err != nil { 6440 return fmt.Errorf("namespace insert failed: %v", err) 6441 } 6442 return nil 6443} 6444