1package state
2
3import (
4	"context"
5	"fmt"
6	"reflect"
7	"sort"
8	"strings"
9	"time"
10
11	log "github.com/hashicorp/go-hclog"
12	memdb "github.com/hashicorp/go-memdb"
13	multierror "github.com/hashicorp/go-multierror"
14	"github.com/pkg/errors"
15
16	"github.com/hashicorp/nomad/helper"
17	"github.com/hashicorp/nomad/nomad/stream"
18	"github.com/hashicorp/nomad/nomad/structs"
19)
20
21// Txn is a transaction against a state store.
22// This can be a read or write transaction.
23type Txn = *txn
24
25const (
26	// NodeRegisterEventReregistered is the message used when the node becomes
27	// reregistered.
28	NodeRegisterEventRegistered = "Node registered"
29
30	// NodeRegisterEventReregistered is the message used when the node becomes
31	// reregistered.
32	NodeRegisterEventReregistered = "Node re-registered"
33)
34
35// IndexEntry is used with the "index" table
36// for managing the latest Raft index affecting a table.
37type IndexEntry struct {
38	Key   string
39	Value uint64
40}
41
42// StateStoreConfig is used to configure a new state store
43type StateStoreConfig struct {
44	// Logger is used to output the state store's logs
45	Logger log.Logger
46
47	// Region is the region of the server embedding the state store.
48	Region string
49
50	// EnablePublisher is used to enable or disable the event publisher
51	EnablePublisher bool
52
53	// EventBufferSize configures the amount of events to hold in memory
54	EventBufferSize int64
55}
56
57// The StateStore is responsible for maintaining all the Nomad
58// state. It is manipulated by the FSM which maintains consistency
59// through the use of Raft. The goals of the StateStore are to provide
60// high concurrency for read operations without blocking writes, and
61// to provide write availability in the face of reads. EVERY object
62// returned as a result of a read against the state store should be
63// considered a constant and NEVER modified in place.
64type StateStore struct {
65	logger log.Logger
66	db     *changeTrackerDB
67
68	// config is the passed in configuration
69	config *StateStoreConfig
70
71	// abandonCh is used to signal watchers that this state store has been
72	// abandoned (usually during a restore). This is only ever closed.
73	abandonCh chan struct{}
74
75	// TODO: refactor abandonCh to use a context so that both can use the same
76	// cancel mechanism.
77	stopEventBroker func()
78}
79
80type streamACLDelegate struct {
81	s *StateStore
82}
83
84func (a *streamACLDelegate) TokenProvider() stream.ACLTokenProvider {
85	resolver, _ := a.s.Snapshot()
86	return resolver
87}
88
89// NewStateStore is used to create a new state store
90func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
91	// Create the MemDB
92	db, err := memdb.NewMemDB(stateStoreSchema())
93	if err != nil {
94		return nil, fmt.Errorf("state store setup failed: %v", err)
95	}
96
97	// Create the state store
98	ctx, cancel := context.WithCancel(context.TODO())
99	s := &StateStore{
100		logger:          config.Logger.Named("state_store"),
101		config:          config,
102		abandonCh:       make(chan struct{}),
103		stopEventBroker: cancel,
104	}
105
106	if config.EnablePublisher {
107		// Create new event publisher using provided config
108		broker, err := stream.NewEventBroker(ctx, &streamACLDelegate{s}, stream.EventBrokerCfg{
109			EventBufferSize: config.EventBufferSize,
110			Logger:          config.Logger,
111		})
112		if err != nil {
113			return nil, fmt.Errorf("creating state store event broker %w", err)
114		}
115		s.db = NewChangeTrackerDB(db, broker, eventsFromChanges)
116	} else {
117		s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges)
118	}
119
120	// Initialize the state store with the default namespace.
121	if err := s.namespaceInit(); err != nil {
122		return nil, fmt.Errorf("enterprise state store initialization failed: %v", err)
123	}
124
125	return s, nil
126}
127
128// NewWatchSet returns a new memdb.WatchSet that adds the state stores abandonCh
129// as a watcher. This is important in that it will notify when this specific
130// state store is no longer valid, usually due to a new snapshot being loaded
131func (s *StateStore) NewWatchSet() memdb.WatchSet {
132	ws := memdb.NewWatchSet()
133	ws.Add(s.AbandonCh())
134	return ws
135}
136
137func (s *StateStore) EventBroker() (*stream.EventBroker, error) {
138	if s.db.publisher == nil {
139		return nil, fmt.Errorf("EventBroker not configured")
140	}
141	return s.db.publisher, nil
142}
143
144// namespaceInit ensures the default namespace exists.
145func (s *StateStore) namespaceInit() error {
146	// Create the default namespace. This is safe to do every time we create the
147	// state store. There are two main cases, a brand new cluster in which case
148	// each server will have the same default namespace object, or a new cluster
149	// in which case if the default namespace has been modified, it will be
150	// overridden by the restore code path.
151	defaultNs := &structs.Namespace{
152		Name:        structs.DefaultNamespace,
153		Description: structs.DefaultNamespaceDescription,
154	}
155
156	if err := s.UpsertNamespaces(1, []*structs.Namespace{defaultNs}); err != nil {
157		return fmt.Errorf("inserting default namespace failed: %v", err)
158	}
159
160	return nil
161}
162
163// Config returns the state store configuration.
164func (s *StateStore) Config() *StateStoreConfig {
165	return s.config
166}
167
168// Snapshot is used to create a point in time snapshot. Because
169// we use MemDB, we just need to snapshot the state of the underlying
170// database.
171func (s *StateStore) Snapshot() (*StateSnapshot, error) {
172	memDBSnap := s.db.memdb.Snapshot()
173
174	store := StateStore{
175		logger: s.logger,
176		config: s.config,
177	}
178
179	// Create a new change tracker DB that does not publish or track changes
180	store.db = NewChangeTrackerDB(memDBSnap, nil, noOpProcessChanges)
181
182	snap := &StateSnapshot{
183		StateStore: store,
184	}
185	return snap, nil
186}
187
188// SnapshotMinIndex is used to create a state snapshot where the index is
189// guaranteed to be greater than or equal to the index parameter.
190//
191// Some server operations (such as scheduling) exchange objects via RPC
192// concurrent with Raft log application, so they must ensure the state store
193// snapshot they are operating on is at or after the index the objects
194// retrieved via RPC were applied to the Raft log at.
195//
196// Callers should maintain their own timer metric as the time this method
197// blocks indicates Raft log application latency relative to scheduling.
198func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*StateSnapshot, error) {
199	// Ported from work.go:waitForIndex prior to 0.9
200
201	const backoffBase = 20 * time.Millisecond
202	const backoffLimit = 1 * time.Second
203	var retries uint
204	var retryTimer *time.Timer
205
206	// XXX: Potential optimization is to set up a watch on the state
207	// store's index table and only unblock via a trigger rather than
208	// polling.
209	for {
210		// Get the states current index
211		snapshotIndex, err := s.LatestIndex()
212		if err != nil {
213			return nil, fmt.Errorf("failed to determine state store's index: %v", err)
214		}
215
216		// We only need the FSM state to be as recent as the given index
217		if snapshotIndex >= index {
218			return s.Snapshot()
219		}
220
221		// Exponential back off
222		retries++
223		if retryTimer == nil {
224			// First retry, start at baseline
225			retryTimer = time.NewTimer(backoffBase)
226		} else {
227			// Subsequent retry, reset timer
228			deadline := 1 << (2 * retries) * backoffBase
229			if deadline > backoffLimit {
230				deadline = backoffLimit
231			}
232			retryTimer.Reset(deadline)
233		}
234
235		select {
236		case <-ctx.Done():
237			return nil, ctx.Err()
238		case <-retryTimer.C:
239		}
240	}
241}
242
243// Restore is used to optimize the efficiency of rebuilding
244// state by minimizing the number of transactions and checking
245// overhead.
246func (s *StateStore) Restore() (*StateRestore, error) {
247	txn := s.db.WriteTxnRestore()
248	r := &StateRestore{
249		txn: txn,
250	}
251	return r, nil
252}
253
254// AbandonCh returns a channel you can wait on to know if the state store was
255// abandoned.
256func (s *StateStore) AbandonCh() <-chan struct{} {
257	return s.abandonCh
258}
259
260// Abandon is used to signal that the given state store has been abandoned.
261// Calling this more than one time will panic.
262func (s *StateStore) Abandon() {
263	s.StopEventBroker()
264	close(s.abandonCh)
265}
266
267// StopStopEventBroker calls the cancel func for the state stores event
268// publisher. It should be called during server shutdown.
269func (s *StateStore) StopEventBroker() {
270	s.stopEventBroker()
271}
272
273// QueryFn is the definition of a function that can be used to implement a basic
274// blocking query against the state store.
275type QueryFn func(memdb.WatchSet, *StateStore) (resp interface{}, index uint64, err error)
276
277// BlockingQuery takes a query function and runs the function until the minimum
278// query index is met or until the passed context is cancelled.
279func (s *StateStore) BlockingQuery(query QueryFn, minIndex uint64, ctx context.Context) (
280	resp interface{}, index uint64, err error) {
281
282RUN_QUERY:
283	// We capture the state store and its abandon channel but pass a snapshot to
284	// the blocking query function. We operate on the snapshot to allow separate
285	// calls to the state store not all wrapped within the same transaction.
286	abandonCh := s.AbandonCh()
287	snap, _ := s.Snapshot()
288	stateSnap := &snap.StateStore
289
290	// We can skip all watch tracking if this isn't a blocking query.
291	var ws memdb.WatchSet
292	if minIndex > 0 {
293		ws = memdb.NewWatchSet()
294
295		// This channel will be closed if a snapshot is restored and the
296		// whole state store is abandoned.
297		ws.Add(abandonCh)
298	}
299
300	resp, index, err = query(ws, stateSnap)
301	if err != nil {
302		return nil, index, err
303	}
304
305	// We haven't reached the min-index yet.
306	if minIndex > 0 && index <= minIndex {
307		if err := ws.WatchCtx(ctx); err != nil {
308			return nil, index, err
309		}
310
311		goto RUN_QUERY
312	}
313
314	return resp, index, nil
315}
316
317// UpsertPlanResults is used to upsert the results of a plan.
318func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64, results *structs.ApplyPlanResultsRequest) error {
319	snapshot, err := s.Snapshot()
320	if err != nil {
321		return err
322	}
323
324	allocsStopped, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsStopped)
325	if err != nil {
326		return err
327	}
328
329	allocsPreempted, err := snapshot.DenormalizeAllocationDiffSlice(results.AllocsPreempted)
330	if err != nil {
331		return err
332	}
333
334	// COMPAT 0.11: Remove this denormalization when NodePreemptions is removed
335	results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions)
336	if err != nil {
337		return err
338	}
339
340	txn := s.db.WriteTxnMsgT(msgType, index)
341	defer txn.Abort()
342
343	// Upsert the newly created or updated deployment
344	if results.Deployment != nil {
345		if err := s.upsertDeploymentImpl(index, results.Deployment, txn); err != nil {
346			return err
347		}
348	}
349
350	// Update the status of deployments effected by the plan.
351	if len(results.DeploymentUpdates) != 0 {
352		s.upsertDeploymentUpdates(index, results.DeploymentUpdates, txn)
353	}
354
355	if results.EvalID != "" {
356		// Update the modify index of the eval id
357		if err := s.updateEvalModifyIndex(txn, index, results.EvalID); err != nil {
358			return err
359		}
360	}
361
362	numAllocs := 0
363	if len(results.Alloc) > 0 || len(results.NodePreemptions) > 0 {
364		// COMPAT 0.11: This branch will be removed, when Alloc is removed
365		// Attach the job to all the allocations. It is pulled out in the payload to
366		// avoid the redundancy of encoding, but should be denormalized prior to
367		// being inserted into MemDB.
368		addComputedAllocAttrs(results.Alloc, results.Job)
369		numAllocs = len(results.Alloc) + len(results.NodePreemptions)
370	} else {
371		// Attach the job to all the allocations. It is pulled out in the payload to
372		// avoid the redundancy of encoding, but should be denormalized prior to
373		// being inserted into MemDB.
374		addComputedAllocAttrs(results.AllocsUpdated, results.Job)
375		numAllocs = len(allocsStopped) + len(results.AllocsUpdated) + len(allocsPreempted)
376	}
377
378	allocsToUpsert := make([]*structs.Allocation, 0, numAllocs)
379
380	// COMPAT 0.11: Both these appends should be removed when Alloc and NodePreemptions are removed
381	allocsToUpsert = append(allocsToUpsert, results.Alloc...)
382	allocsToUpsert = append(allocsToUpsert, results.NodePreemptions...)
383
384	allocsToUpsert = append(allocsToUpsert, allocsStopped...)
385	allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...)
386	allocsToUpsert = append(allocsToUpsert, allocsPreempted...)
387
388	// handle upgrade path
389	for _, alloc := range allocsToUpsert {
390		alloc.Canonicalize()
391	}
392
393	if err := s.upsertAllocsImpl(index, allocsToUpsert, txn); err != nil {
394		return err
395	}
396
397	// Upsert followup evals for allocs that were preempted
398	for _, eval := range results.PreemptionEvals {
399		if err := s.nestedUpsertEval(txn, index, eval); err != nil {
400			return err
401		}
402	}
403
404	return txn.Commit()
405}
406
407// addComputedAllocAttrs adds the computed/derived attributes to the allocation.
408// This method is used when an allocation is being denormalized.
409func addComputedAllocAttrs(allocs []*structs.Allocation, job *structs.Job) {
410	structs.DenormalizeAllocationJobs(job, allocs)
411
412	// COMPAT(0.11): Remove in 0.11
413	// Calculate the total resources of allocations. It is pulled out in the
414	// payload to avoid encoding something that can be computed, but should be
415	// denormalized prior to being inserted into MemDB.
416	for _, alloc := range allocs {
417		if alloc.Resources != nil {
418			continue
419		}
420
421		alloc.Resources = new(structs.Resources)
422		for _, task := range alloc.TaskResources {
423			alloc.Resources.Add(task)
424		}
425
426		// Add the shared resources
427		alloc.Resources.Add(alloc.SharedResources)
428	}
429}
430
431// upsertDeploymentUpdates updates the deployments given the passed status
432// updates.
433func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *txn) error {
434	for _, u := range updates {
435		if err := s.updateDeploymentStatusImpl(index, u, txn); err != nil {
436			return err
437		}
438	}
439
440	return nil
441}
442
443// UpsertJobSummary upserts a job summary into the state store.
444func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error {
445	txn := s.db.WriteTxn(index)
446	defer txn.Abort()
447
448	// Check if the job summary already exists
449	existing, err := txn.First("job_summary", "id", jobSummary.Namespace, jobSummary.JobID)
450	if err != nil {
451		return fmt.Errorf("job summary lookup failed: %v", err)
452	}
453
454	// Setup the indexes correctly
455	if existing != nil {
456		jobSummary.CreateIndex = existing.(*structs.JobSummary).CreateIndex
457		jobSummary.ModifyIndex = index
458	} else {
459		jobSummary.CreateIndex = index
460		jobSummary.ModifyIndex = index
461	}
462
463	// Update the index
464	if err := txn.Insert("job_summary", jobSummary); err != nil {
465		return err
466	}
467
468	// Update the indexes table for job summary
469	if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
470		return fmt.Errorf("index update failed: %v", err)
471	}
472
473	return txn.Commit()
474}
475
476// DeleteJobSummary deletes the job summary with the given ID. This is for
477// testing purposes only.
478func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error {
479	txn := s.db.WriteTxn(index)
480	defer txn.Abort()
481
482	// Delete the job summary
483	if _, err := txn.DeleteAll("job_summary", "id", namespace, id); err != nil {
484		return fmt.Errorf("deleting job summary failed: %v", err)
485	}
486	if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
487		return fmt.Errorf("index update failed: %v", err)
488	}
489	return txn.Commit()
490}
491
492// UpsertDeployment is used to insert a new deployment. If cancelPrior is set to
493// true, all prior deployments for the same job will be cancelled.
494func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment) error {
495	txn := s.db.WriteTxn(index)
496	defer txn.Abort()
497	if err := s.upsertDeploymentImpl(index, deployment, txn); err != nil {
498		return err
499	}
500	return txn.Commit()
501}
502
503func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Deployment, txn *txn) error {
504	// Check if the deployment already exists
505	existing, err := txn.First("deployment", "id", deployment.ID)
506	if err != nil {
507		return fmt.Errorf("deployment lookup failed: %v", err)
508	}
509
510	// Setup the indexes correctly
511	if existing != nil {
512		deployment.CreateIndex = existing.(*structs.Deployment).CreateIndex
513		deployment.ModifyIndex = index
514	} else {
515		deployment.CreateIndex = index
516		deployment.ModifyIndex = index
517	}
518
519	// Insert the deployment
520	if err := txn.Insert("deployment", deployment); err != nil {
521		return err
522	}
523
524	// Update the indexes table for deployment
525	if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil {
526		return fmt.Errorf("index update failed: %v", err)
527	}
528
529	// If the deployment is being marked as complete, set the job to stable.
530	if deployment.Status == structs.DeploymentStatusSuccessful {
531		if err := s.updateJobStabilityImpl(index, deployment.Namespace, deployment.JobID, deployment.JobVersion, true, txn); err != nil {
532			return fmt.Errorf("failed to update job stability: %v", err)
533		}
534	}
535
536	return nil
537}
538
539func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error) {
540	txn := s.db.ReadTxn()
541
542	// Walk the entire deployments table
543	iter, err := txn.Get("deployment", "id")
544	if err != nil {
545		return nil, err
546	}
547
548	ws.Add(iter.WatchCh())
549	return iter, nil
550}
551
552func (s *StateStore) DeploymentsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
553	txn := s.db.ReadTxn()
554
555	// Walk the entire deployments table
556	iter, err := txn.Get("deployment", "namespace", namespace)
557	if err != nil {
558		return nil, err
559	}
560
561	ws.Add(iter.WatchCh())
562	return iter, nil
563}
564
565func (s *StateStore) DeploymentsByIDPrefix(ws memdb.WatchSet, namespace, deploymentID string) (memdb.ResultIterator, error) {
566	txn := s.db.ReadTxn()
567
568	// Walk the entire deployments table
569	iter, err := txn.Get("deployment", "id_prefix", deploymentID)
570	if err != nil {
571		return nil, err
572	}
573
574	ws.Add(iter.WatchCh())
575
576	// Wrap the iterator in a filter
577	wrap := memdb.NewFilterIterator(iter, deploymentNamespaceFilter(namespace))
578	return wrap, nil
579}
580
581// deploymentNamespaceFilter returns a filter function that filters all
582// deployment not in the given namespace.
583func deploymentNamespaceFilter(namespace string) func(interface{}) bool {
584	return func(raw interface{}) bool {
585		d, ok := raw.(*structs.Deployment)
586		if !ok {
587			return true
588		}
589
590		return d.Namespace != namespace
591	}
592}
593
594func (s *StateStore) DeploymentByID(ws memdb.WatchSet, deploymentID string) (*structs.Deployment, error) {
595	txn := s.db.ReadTxn()
596	return s.deploymentByIDImpl(ws, deploymentID, txn)
597}
598
599func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, txn *txn) (*structs.Deployment, error) {
600	watchCh, existing, err := txn.FirstWatch("deployment", "id", deploymentID)
601	if err != nil {
602		return nil, fmt.Errorf("deployment lookup failed: %v", err)
603	}
604	ws.Add(watchCh)
605
606	if existing != nil {
607		return existing.(*structs.Deployment), nil
608	}
609
610	return nil, nil
611}
612
613func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Deployment, error) {
614	txn := s.db.ReadTxn()
615
616	var job *structs.Job
617	// Read job from state store
618	_, existing, err := txn.FirstWatch("jobs", "id", namespace, jobID)
619	if err != nil {
620		return nil, fmt.Errorf("job lookup failed: %v", err)
621	}
622	if existing != nil {
623		job = existing.(*structs.Job)
624	}
625
626	// Get an iterator over the deployments
627	iter, err := txn.Get("deployment", "job", namespace, jobID)
628	if err != nil {
629		return nil, err
630	}
631
632	ws.Add(iter.WatchCh())
633
634	var out []*structs.Deployment
635	for {
636		raw := iter.Next()
637		if raw == nil {
638			break
639		}
640		d := raw.(*structs.Deployment)
641
642		// If the allocation belongs to a job with the same ID but a different
643		// create index and we are not getting all the allocations whose Jobs
644		// matches the same Job ID then we skip it
645		if !all && job != nil && d.JobCreateIndex != job.CreateIndex {
646			continue
647		}
648		out = append(out, d)
649	}
650
651	return out, nil
652}
653
654// LatestDeploymentByJobID returns the latest deployment for the given job. The
655// latest is determined strictly by CreateIndex.
656func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID string) (*structs.Deployment, error) {
657	txn := s.db.ReadTxn()
658
659	// Get an iterator over the deployments
660	iter, err := txn.Get("deployment", "job", namespace, jobID)
661	if err != nil {
662		return nil, err
663	}
664
665	ws.Add(iter.WatchCh())
666
667	var out *structs.Deployment
668	for {
669		raw := iter.Next()
670		if raw == nil {
671			break
672		}
673
674		d := raw.(*structs.Deployment)
675		if out == nil || out.CreateIndex < d.CreateIndex {
676			out = d
677		}
678	}
679
680	return out, nil
681}
682
683// DeleteDeployment is used to delete a set of deployments by ID
684func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) error {
685	txn := s.db.WriteTxn(index)
686	defer txn.Abort()
687
688	if len(deploymentIDs) == 0 {
689		return nil
690	}
691
692	for _, deploymentID := range deploymentIDs {
693		// Lookup the deployment
694		existing, err := txn.First("deployment", "id", deploymentID)
695		if err != nil {
696			return fmt.Errorf("deployment lookup failed: %v", err)
697		}
698		if existing == nil {
699			return fmt.Errorf("deployment not found")
700		}
701
702		// Delete the deployment
703		if err := txn.Delete("deployment", existing); err != nil {
704			return fmt.Errorf("deployment delete failed: %v", err)
705		}
706	}
707
708	if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil {
709		return fmt.Errorf("index update failed: %v", err)
710	}
711
712	return txn.Commit()
713}
714
715// UpsertScalingEvent is used to insert a new scaling event.
716// Only the most recent JobTrackedScalingEvents will be kept.
717func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventRequest) error {
718	txn := s.db.WriteTxn(index)
719	defer txn.Abort()
720
721	// Get the existing events
722	existing, err := txn.First("scaling_event", "id", req.Namespace, req.JobID)
723	if err != nil {
724		return fmt.Errorf("scaling event lookup failed: %v", err)
725	}
726
727	var jobEvents *structs.JobScalingEvents
728	if existing != nil {
729		jobEvents = existing.(*structs.JobScalingEvents)
730	} else {
731		jobEvents = &structs.JobScalingEvents{
732			Namespace:     req.Namespace,
733			JobID:         req.JobID,
734			ScalingEvents: make(map[string][]*structs.ScalingEvent),
735		}
736	}
737
738	jobEvents.ModifyIndex = index
739	req.ScalingEvent.CreateIndex = index
740
741	events := jobEvents.ScalingEvents[req.TaskGroup]
742	// Prepend this latest event
743	events = append(
744		[]*structs.ScalingEvent{req.ScalingEvent},
745		events...,
746	)
747	// Truncate older events
748	if len(events) > structs.JobTrackedScalingEvents {
749		events = events[0:structs.JobTrackedScalingEvents]
750	}
751	jobEvents.ScalingEvents[req.TaskGroup] = events
752
753	// Insert the new event
754	if err := txn.Insert("scaling_event", jobEvents); err != nil {
755		return fmt.Errorf("scaling event insert failed: %v", err)
756	}
757
758	// Update the indexes table for scaling_event
759	if err := txn.Insert("index", &IndexEntry{"scaling_event", index}); err != nil {
760		return fmt.Errorf("index update failed: %v", err)
761	}
762
763	return txn.Commit()
764}
765
766// ScalingEvents returns an iterator over all the job scaling events
767func (s *StateStore) ScalingEvents(ws memdb.WatchSet) (memdb.ResultIterator, error) {
768	txn := s.db.ReadTxn()
769
770	// Walk the entire scaling_event table
771	iter, err := txn.Get("scaling_event", "id")
772	if err != nil {
773		return nil, err
774	}
775
776	ws.Add(iter.WatchCh())
777
778	return iter, nil
779}
780
781func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID string) (map[string][]*structs.ScalingEvent, uint64, error) {
782	txn := s.db.ReadTxn()
783
784	watchCh, existing, err := txn.FirstWatch("scaling_event", "id", namespace, jobID)
785	if err != nil {
786		return nil, 0, fmt.Errorf("job scaling events lookup failed: %v", err)
787	}
788	ws.Add(watchCh)
789
790	if existing != nil {
791		events := existing.(*structs.JobScalingEvents)
792		return events.ScalingEvents, events.ModifyIndex, nil
793	}
794	return nil, 0, nil
795}
796
797// UpsertNode is used to register a node or update a node definition
798// This is assumed to be triggered by the client, so we retain the value
799// of drain/eligibility which is set by the scheduler.
800func (s *StateStore) UpsertNode(msgType structs.MessageType, index uint64, node *structs.Node) error {
801	txn := s.db.WriteTxnMsgT(msgType, index)
802	defer txn.Abort()
803
804	err := upsertNodeTxn(txn, index, node)
805	if err != nil {
806		return nil
807	}
808	return txn.Commit()
809}
810
811func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
812	// Check if the node already exists
813	existing, err := txn.First("nodes", "id", node.ID)
814	if err != nil {
815		return fmt.Errorf("node lookup failed: %v", err)
816	}
817
818	// Setup the indexes correctly
819	if existing != nil {
820		exist := existing.(*structs.Node)
821		node.CreateIndex = exist.CreateIndex
822		node.ModifyIndex = index
823
824		// Retain node events that have already been set on the node
825		node.Events = exist.Events
826
827		// If we are transitioning from down, record the re-registration
828		if exist.Status == structs.NodeStatusDown && node.Status != structs.NodeStatusDown {
829			appendNodeEvents(index, node, []*structs.NodeEvent{
830				structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).
831					SetMessage(NodeRegisterEventReregistered).
832					SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))})
833		}
834
835		node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility
836		node.DrainStrategy = exist.DrainStrategy                 // Retain the drain strategy
837		node.LastDrain = exist.LastDrain                         // Retain the drain metadata
838	} else {
839		// Because this is the first time the node is being registered, we should
840		// also create a node registration event
841		nodeEvent := structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).
842			SetMessage(NodeRegisterEventRegistered).
843			SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))
844		node.Events = []*structs.NodeEvent{nodeEvent}
845		node.CreateIndex = index
846		node.ModifyIndex = index
847	}
848
849	// Insert the node
850	if err := txn.Insert("nodes", node); err != nil {
851		return fmt.Errorf("node insert failed: %v", err)
852	}
853	if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
854		return fmt.Errorf("index update failed: %v", err)
855	}
856	if err := upsertNodeCSIPlugins(txn, node, index); err != nil {
857		return fmt.Errorf("csi plugin update failed: %v", err)
858	}
859
860	return nil
861}
862
863// DeleteNode deregisters a batch of nodes
864func (s *StateStore) DeleteNode(msgType structs.MessageType, index uint64, nodes []string) error {
865	txn := s.db.WriteTxn(index)
866	defer txn.Abort()
867
868	err := deleteNodeTxn(txn, index, nodes)
869	if err != nil {
870		return nil
871	}
872	return txn.Commit()
873}
874
875func deleteNodeTxn(txn *txn, index uint64, nodes []string) error {
876	if len(nodes) == 0 {
877		return fmt.Errorf("node ids missing")
878	}
879
880	for _, nodeID := range nodes {
881		existing, err := txn.First("nodes", "id", nodeID)
882		if err != nil {
883			return fmt.Errorf("node lookup failed: %s: %v", nodeID, err)
884		}
885		if existing == nil {
886			return fmt.Errorf("node not found: %s", nodeID)
887		}
888
889		// Delete the node
890		if err := txn.Delete("nodes", existing); err != nil {
891			return fmt.Errorf("node delete failed: %s: %v", nodeID, err)
892		}
893
894		node := existing.(*structs.Node)
895		if err := deleteNodeCSIPlugins(txn, node, index); err != nil {
896			return fmt.Errorf("csi plugin delete failed: %v", err)
897		}
898	}
899
900	if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
901		return fmt.Errorf("index update failed: %v", err)
902	}
903
904	return nil
905}
906
907// UpdateNodeStatus is used to update the status of a node
908func (s *StateStore) UpdateNodeStatus(msgType structs.MessageType, index uint64, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error {
909	txn := s.db.WriteTxnMsgT(msgType, index)
910	defer txn.Abort()
911
912	if err := s.updateNodeStatusTxn(txn, nodeID, status, updatedAt, event); err != nil {
913		return err
914	}
915
916	return txn.Commit()
917}
918
919func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, updatedAt int64, event *structs.NodeEvent) error {
920
921	// Lookup the node
922	existing, err := txn.First("nodes", "id", nodeID)
923	if err != nil {
924		return fmt.Errorf("node lookup failed: %v", err)
925	}
926	if existing == nil {
927		return fmt.Errorf("node not found")
928	}
929
930	// Copy the existing node
931	existingNode := existing.(*structs.Node)
932	copyNode := existingNode.Copy()
933	copyNode.StatusUpdatedAt = updatedAt
934
935	// Add the event if given
936	if event != nil {
937		appendNodeEvents(txn.Index, copyNode, []*structs.NodeEvent{event})
938	}
939
940	// Update the status in the copy
941	copyNode.Status = status
942	copyNode.ModifyIndex = txn.Index
943
944	// Insert the node
945	if err := txn.Insert("nodes", copyNode); err != nil {
946		return fmt.Errorf("node update failed: %v", err)
947	}
948	if err := txn.Insert("index", &IndexEntry{"nodes", txn.Index}); err != nil {
949		return fmt.Errorf("index update failed: %v", err)
950	}
951	return nil
952}
953
954// BatchUpdateNodeDrain is used to update the drain of a node set of nodes.
955// This is currently only called when node drain is completed by the drainer.
956func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uint64, updatedAt int64,
957	updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error {
958	txn := s.db.WriteTxnMsgT(msgType, index)
959	defer txn.Abort()
960	for node, update := range updates {
961		if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, updatedAt,
962			events[node], nil, "", true); err != nil {
963			return err
964		}
965	}
966	return txn.Commit()
967}
968
969// UpdateNodeDrain is used to update the drain of a node
970func (s *StateStore) UpdateNodeDrain(msgType structs.MessageType, index uint64, nodeID string,
971	drain *structs.DrainStrategy, markEligible bool, updatedAt int64,
972	event *structs.NodeEvent, drainMeta map[string]string, accessorId string) error {
973
974	txn := s.db.WriteTxnMsgT(msgType, index)
975	defer txn.Abort()
976	if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event,
977		drainMeta, accessorId, false); err != nil {
978
979		return err
980	}
981	return txn.Commit()
982}
983
984func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string,
985	drain *structs.DrainStrategy, markEligible bool, updatedAt int64,
986	event *structs.NodeEvent, drainMeta map[string]string, accessorId string,
987	drainCompleted bool) error {
988
989	// Lookup the node
990	existing, err := txn.First("nodes", "id", nodeID)
991	if err != nil {
992		return fmt.Errorf("node lookup failed: %v", err)
993	}
994	if existing == nil {
995		return fmt.Errorf("node not found")
996	}
997
998	// Copy the existing node
999	existingNode := existing.(*structs.Node)
1000	updatedNode := existingNode.Copy()
1001	updatedNode.StatusUpdatedAt = updatedAt
1002
1003	// Add the event if given
1004	if event != nil {
1005		appendNodeEvents(index, updatedNode, []*structs.NodeEvent{event})
1006	}
1007
1008	// Update the drain in the copy
1009	updatedNode.DrainStrategy = drain
1010	if drain != nil {
1011		updatedNode.SchedulingEligibility = structs.NodeSchedulingIneligible
1012	} else if markEligible {
1013		updatedNode.SchedulingEligibility = structs.NodeSchedulingEligible
1014	}
1015
1016	// Update LastDrain
1017	updateTime := time.Unix(updatedAt, 0)
1018
1019	// if drain strategy isn't set before or after, this wasn't a drain operation
1020	// in that case, we don't care about .LastDrain
1021	drainNoop := existingNode.DrainStrategy == nil && updatedNode.DrainStrategy == nil
1022	// otherwise, when done with this method, updatedNode.LastDrain should be set
1023	// if starting a new drain operation, create a new LastDrain. otherwise, update the existing one.
1024	startedDraining := existingNode.DrainStrategy == nil && updatedNode.DrainStrategy != nil
1025	if !drainNoop {
1026		if startedDraining {
1027			updatedNode.LastDrain = &structs.DrainMetadata{
1028				StartedAt: updateTime,
1029				Meta:      drainMeta,
1030			}
1031		} else if updatedNode.LastDrain == nil {
1032			// if already draining and LastDrain doesn't exist, we need to create a new one
1033			// this could happen if we upgraded to 1.1.x during a drain
1034			updatedNode.LastDrain = &structs.DrainMetadata{
1035				// we don't have sub-second accuracy on these fields, so truncate this
1036				StartedAt: time.Unix(existingNode.DrainStrategy.StartedAt.Unix(), 0),
1037				Meta:      drainMeta,
1038			}
1039		}
1040
1041		updatedNode.LastDrain.UpdatedAt = updateTime
1042
1043		// won't have new metadata on drain complete; keep the existing operator-provided metadata
1044		// also, keep existing if they didn't provide it
1045		if len(drainMeta) != 0 {
1046			updatedNode.LastDrain.Meta = drainMeta
1047		}
1048
1049		// we won't have an accessor ID on drain complete, so don't overwrite the existing one
1050		if accessorId != "" {
1051			updatedNode.LastDrain.AccessorID = accessorId
1052		}
1053
1054		if updatedNode.DrainStrategy != nil {
1055			updatedNode.LastDrain.Status = structs.DrainStatusDraining
1056		} else if drainCompleted {
1057			updatedNode.LastDrain.Status = structs.DrainStatusComplete
1058		} else {
1059			updatedNode.LastDrain.Status = structs.DrainStatusCanceled
1060		}
1061	}
1062
1063	updatedNode.ModifyIndex = index
1064
1065	// Insert the node
1066	if err := txn.Insert("nodes", updatedNode); err != nil {
1067		return fmt.Errorf("node update failed: %v", err)
1068	}
1069	if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
1070		return fmt.Errorf("index update failed: %v", err)
1071	}
1072
1073	return nil
1074}
1075
1076// UpdateNodeEligibility is used to update the scheduling eligibility of a node
1077func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error {
1078
1079	txn := s.db.WriteTxnMsgT(msgType, index)
1080	defer txn.Abort()
1081
1082	// Lookup the node
1083	existing, err := txn.First("nodes", "id", nodeID)
1084	if err != nil {
1085		return fmt.Errorf("node lookup failed: %v", err)
1086	}
1087	if existing == nil {
1088		return fmt.Errorf("node not found")
1089	}
1090
1091	// Copy the existing node
1092	existingNode := existing.(*structs.Node)
1093	copyNode := existingNode.Copy()
1094	copyNode.StatusUpdatedAt = updatedAt
1095
1096	// Add the event if given
1097	if event != nil {
1098		appendNodeEvents(index, copyNode, []*structs.NodeEvent{event})
1099	}
1100
1101	// Check if this is a valid action
1102	if copyNode.DrainStrategy != nil && eligibility == structs.NodeSchedulingEligible {
1103		return fmt.Errorf("can not set node's scheduling eligibility to eligible while it is draining")
1104	}
1105
1106	// Update the eligibility in the copy
1107	copyNode.SchedulingEligibility = eligibility
1108	copyNode.ModifyIndex = index
1109
1110	// Insert the node
1111	if err := txn.Insert("nodes", copyNode); err != nil {
1112		return fmt.Errorf("node update failed: %v", err)
1113	}
1114	if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
1115		return fmt.Errorf("index update failed: %v", err)
1116	}
1117
1118	return txn.Commit()
1119}
1120
1121// UpsertNodeEvents adds the node events to the nodes, rotating events as
1122// necessary.
1123func (s *StateStore) UpsertNodeEvents(msgType structs.MessageType, index uint64, nodeEvents map[string][]*structs.NodeEvent) error {
1124	txn := s.db.WriteTxnMsgT(msgType, index)
1125	defer txn.Abort()
1126
1127	for nodeID, events := range nodeEvents {
1128		if err := s.upsertNodeEvents(index, nodeID, events, txn); err != nil {
1129			return err
1130		}
1131	}
1132
1133	return txn.Commit()
1134}
1135
1136// upsertNodeEvent upserts a node event for a respective node. It also maintains
1137// that a fixed number of node events are ever stored simultaneously, deleting
1138// older events once this bound has been reached.
1139func (s *StateStore) upsertNodeEvents(index uint64, nodeID string, events []*structs.NodeEvent, txn *txn) error {
1140	// Lookup the node
1141	existing, err := txn.First("nodes", "id", nodeID)
1142	if err != nil {
1143		return fmt.Errorf("node lookup failed: %v", err)
1144	}
1145	if existing == nil {
1146		return fmt.Errorf("node not found")
1147	}
1148
1149	// Copy the existing node
1150	existingNode := existing.(*structs.Node)
1151	copyNode := existingNode.Copy()
1152	appendNodeEvents(index, copyNode, events)
1153
1154	// Insert the node
1155	if err := txn.Insert("nodes", copyNode); err != nil {
1156		return fmt.Errorf("node update failed: %v", err)
1157	}
1158	if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
1159		return fmt.Errorf("index update failed: %v", err)
1160	}
1161
1162	return nil
1163}
1164
1165// appendNodeEvents is a helper that takes a node and new events and appends
1166// them, pruning older events as needed.
1167func appendNodeEvents(index uint64, node *structs.Node, events []*structs.NodeEvent) {
1168	// Add the events, updating the indexes
1169	for _, e := range events {
1170		e.CreateIndex = index
1171		node.Events = append(node.Events, e)
1172	}
1173
1174	// Keep node events pruned to not exceed the max allowed
1175	if l := len(node.Events); l > structs.MaxRetainedNodeEvents {
1176		delta := l - structs.MaxRetainedNodeEvents
1177		node.Events = node.Events[delta:]
1178	}
1179}
1180
1181// upsertNodeCSIPlugins indexes csi plugins for volume retrieval, with health. It's called
1182// on upsertNodeEvents, so that event driven health changes are updated
1183func upsertNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
1184
1185	loop := func(info *structs.CSIInfo) error {
1186		raw, err := txn.First("csi_plugins", "id", info.PluginID)
1187		if err != nil {
1188			return fmt.Errorf("csi_plugin lookup error: %s %v", info.PluginID, err)
1189		}
1190
1191		var plug *structs.CSIPlugin
1192		if raw != nil {
1193			plug = raw.(*structs.CSIPlugin).Copy()
1194		} else {
1195			if !info.Healthy {
1196				// we don't want to create new plugins for unhealthy
1197				// allocs, otherwise we'd recreate the plugin when we
1198				// get the update for the alloc becoming terminal
1199				return nil
1200			}
1201			plug = structs.NewCSIPlugin(info.PluginID, index)
1202		}
1203
1204		// the plugin may have been created by the job being updated, in which case
1205		// this data will not be configured, it's only available to the fingerprint
1206		// system
1207		plug.Provider = info.Provider
1208		plug.Version = info.ProviderVersion
1209
1210		err = plug.AddPlugin(node.ID, info)
1211		if err != nil {
1212			return err
1213		}
1214
1215		plug.ModifyIndex = index
1216
1217		err = txn.Insert("csi_plugins", plug)
1218		if err != nil {
1219			return fmt.Errorf("csi_plugins insert error: %v", err)
1220		}
1221
1222		return nil
1223	}
1224
1225	inUseController := map[string]struct{}{}
1226	inUseNode := map[string]struct{}{}
1227
1228	for _, info := range node.CSIControllerPlugins {
1229		err := loop(info)
1230		if err != nil {
1231			return err
1232		}
1233		inUseController[info.PluginID] = struct{}{}
1234	}
1235
1236	for _, info := range node.CSINodePlugins {
1237		err := loop(info)
1238		if err != nil {
1239			return err
1240		}
1241		inUseNode[info.PluginID] = struct{}{}
1242	}
1243
1244	// remove the client node from any plugin that's not
1245	// running on it.
1246	iter, err := txn.Get("csi_plugins", "id")
1247	if err != nil {
1248		return fmt.Errorf("csi_plugins lookup failed: %v", err)
1249	}
1250	for {
1251		raw := iter.Next()
1252		if raw == nil {
1253			break
1254		}
1255		plug, ok := raw.(*structs.CSIPlugin)
1256		if !ok {
1257			continue
1258		}
1259		plug = plug.Copy()
1260
1261		var hadDelete bool
1262		if _, ok := inUseController[plug.ID]; !ok {
1263			if _, asController := plug.Controllers[node.ID]; asController {
1264				err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeController)
1265				if err != nil {
1266					return err
1267				}
1268				hadDelete = true
1269			}
1270		}
1271		if _, ok := inUseNode[plug.ID]; !ok {
1272			if _, asNode := plug.Nodes[node.ID]; asNode {
1273				err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeNode)
1274				if err != nil {
1275					return err
1276				}
1277				hadDelete = true
1278			}
1279		}
1280		// we check this flag both for performance and to make sure we
1281		// don't delete a plugin when registering a node plugin but
1282		// no controller
1283		if hadDelete {
1284			err = updateOrGCPlugin(index, txn, plug)
1285			if err != nil {
1286				return err
1287			}
1288		}
1289	}
1290
1291	if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
1292		return fmt.Errorf("index update failed: %v", err)
1293	}
1294
1295	return nil
1296}
1297
1298// deleteNodeCSIPlugins cleans up CSIInfo node health status, called in DeleteNode
1299func deleteNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
1300	if len(node.CSIControllerPlugins) == 0 && len(node.CSINodePlugins) == 0 {
1301		return nil
1302	}
1303
1304	names := map[string]struct{}{}
1305	for _, info := range node.CSIControllerPlugins {
1306		names[info.PluginID] = struct{}{}
1307	}
1308	for _, info := range node.CSINodePlugins {
1309		names[info.PluginID] = struct{}{}
1310	}
1311
1312	for id := range names {
1313		raw, err := txn.First("csi_plugins", "id", id)
1314		if err != nil {
1315			return fmt.Errorf("csi_plugins lookup error %s: %v", id, err)
1316		}
1317		if raw == nil {
1318			// plugin may have been deregistered but we didn't
1319			// update the fingerprint yet
1320			continue
1321		}
1322
1323		plug := raw.(*structs.CSIPlugin).Copy()
1324		err = plug.DeleteNode(node.ID)
1325		if err != nil {
1326			return err
1327		}
1328		err = updateOrGCPlugin(index, txn, plug)
1329		if err != nil {
1330			return err
1331		}
1332	}
1333
1334	if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
1335		return fmt.Errorf("index update failed: %v", err)
1336	}
1337
1338	return nil
1339}
1340
1341// updateOrGCPlugin updates a plugin but will delete it if the plugin is empty
1342func updateOrGCPlugin(index uint64, txn Txn, plug *structs.CSIPlugin) error {
1343	plug.ModifyIndex = index
1344
1345	if plug.IsEmpty() {
1346		err := txn.Delete("csi_plugins", plug)
1347		if err != nil {
1348			return fmt.Errorf("csi_plugins delete error: %v", err)
1349		}
1350	} else {
1351		err := txn.Insert("csi_plugins", plug)
1352		if err != nil {
1353			return fmt.Errorf("csi_plugins update error %s: %v", plug.ID, err)
1354		}
1355	}
1356	return nil
1357}
1358
1359// deleteJobFromPlugins removes the allocations of this job from any plugins the job is
1360// running, possibly deleting the plugin if it's no longer in use. It's called in DeleteJobTxn
1361func (s *StateStore) deleteJobFromPlugins(index uint64, txn Txn, job *structs.Job) error {
1362	ws := memdb.NewWatchSet()
1363	summary, err := s.JobSummaryByID(ws, job.Namespace, job.ID)
1364	if err != nil {
1365		return fmt.Errorf("error getting job summary: %v", err)
1366	}
1367
1368	allocs, err := s.AllocsByJob(ws, job.Namespace, job.ID, false)
1369	if err != nil {
1370		return fmt.Errorf("error getting allocations: %v", err)
1371	}
1372
1373	type pair struct {
1374		pluginID string
1375		alloc    *structs.Allocation
1376	}
1377
1378	plugAllocs := []*pair{}
1379	found := map[string]struct{}{}
1380
1381	// Find plugins for allocs that belong to this job
1382	for _, a := range allocs {
1383		tg := a.Job.LookupTaskGroup(a.TaskGroup)
1384		found[tg.Name] = struct{}{}
1385		for _, t := range tg.Tasks {
1386			if t.CSIPluginConfig == nil {
1387				continue
1388			}
1389			plugAllocs = append(plugAllocs, &pair{
1390				pluginID: t.CSIPluginConfig.ID,
1391				alloc:    a,
1392			})
1393		}
1394	}
1395
1396	// Find any plugins that do not yet have allocs for this job
1397	for _, tg := range job.TaskGroups {
1398		if _, ok := found[tg.Name]; ok {
1399			continue
1400		}
1401
1402		for _, t := range tg.Tasks {
1403			if t.CSIPluginConfig == nil {
1404				continue
1405			}
1406			plugAllocs = append(plugAllocs, &pair{
1407				pluginID: t.CSIPluginConfig.ID,
1408			})
1409		}
1410	}
1411
1412	plugins := map[string]*structs.CSIPlugin{}
1413
1414	for _, x := range plugAllocs {
1415		plug, ok := plugins[x.pluginID]
1416
1417		if !ok {
1418			plug, err = s.CSIPluginByIDTxn(txn, nil, x.pluginID)
1419			if err != nil {
1420				return fmt.Errorf("error getting plugin: %s, %v", x.pluginID, err)
1421			}
1422			if plug == nil {
1423				return fmt.Errorf("plugin missing: %s %v", x.pluginID, err)
1424			}
1425			// only copy once, so we update the same plugin on each alloc
1426			plugins[x.pluginID] = plug.Copy()
1427			plug = plugins[x.pluginID]
1428		}
1429
1430		if x.alloc == nil {
1431			continue
1432		}
1433		err := plug.DeleteAlloc(x.alloc.ID, x.alloc.NodeID)
1434		if err != nil {
1435			return err
1436		}
1437	}
1438
1439	for _, plug := range plugins {
1440		plug.DeleteJob(job, summary)
1441		err = updateOrGCPlugin(index, txn, plug)
1442		if err != nil {
1443			return err
1444		}
1445	}
1446
1447	if err = txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
1448		return fmt.Errorf("index update failed: %v", err)
1449	}
1450
1451	return nil
1452}
1453
1454// NodeByID is used to lookup a node by ID
1455func (s *StateStore) NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error) {
1456	txn := s.db.ReadTxn()
1457
1458	watchCh, existing, err := txn.FirstWatch("nodes", "id", nodeID)
1459	if err != nil {
1460		return nil, fmt.Errorf("node lookup failed: %v", err)
1461	}
1462	ws.Add(watchCh)
1463
1464	if existing != nil {
1465		return existing.(*structs.Node), nil
1466	}
1467	return nil, nil
1468}
1469
1470// NodesByIDPrefix is used to lookup nodes by prefix
1471func (s *StateStore) NodesByIDPrefix(ws memdb.WatchSet, nodeID string) (memdb.ResultIterator, error) {
1472	txn := s.db.ReadTxn()
1473
1474	iter, err := txn.Get("nodes", "id_prefix", nodeID)
1475	if err != nil {
1476		return nil, fmt.Errorf("node lookup failed: %v", err)
1477	}
1478	ws.Add(iter.WatchCh())
1479
1480	return iter, nil
1481}
1482
1483// NodeBySecretID is used to lookup a node by SecretID
1484func (s *StateStore) NodeBySecretID(ws memdb.WatchSet, secretID string) (*structs.Node, error) {
1485	txn := s.db.ReadTxn()
1486
1487	watchCh, existing, err := txn.FirstWatch("nodes", "secret_id", secretID)
1488	if err != nil {
1489		return nil, fmt.Errorf("node lookup by SecretID failed: %v", err)
1490	}
1491	ws.Add(watchCh)
1492
1493	if existing != nil {
1494		return existing.(*structs.Node), nil
1495	}
1496	return nil, nil
1497}
1498
1499// Nodes returns an iterator over all the nodes
1500func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) {
1501	txn := s.db.ReadTxn()
1502
1503	// Walk the entire nodes table
1504	iter, err := txn.Get("nodes", "id")
1505	if err != nil {
1506		return nil, err
1507	}
1508	ws.Add(iter.WatchCh())
1509	return iter, nil
1510}
1511
1512// UpsertJob is used to register a job or update a job definition
1513func (s *StateStore) UpsertJob(msgType structs.MessageType, index uint64, job *structs.Job) error {
1514	txn := s.db.WriteTxnMsgT(msgType, index)
1515	defer txn.Abort()
1516	if err := s.upsertJobImpl(index, job, false, txn); err != nil {
1517		return err
1518	}
1519	return txn.Commit()
1520}
1521
1522// UpsertJobTxn is used to register a job or update a job definition, like UpsertJob,
1523// but in a transaction.  Useful for when making multiple modifications atomically
1524func (s *StateStore) UpsertJobTxn(index uint64, job *structs.Job, txn Txn) error {
1525	return s.upsertJobImpl(index, job, false, txn)
1526}
1527
1528// upsertJobImpl is the implementation for registering a job or updating a job definition
1529func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *txn) error {
1530	// Assert the namespace exists
1531	if exists, err := s.namespaceExists(txn, job.Namespace); err != nil {
1532		return err
1533	} else if !exists {
1534		return fmt.Errorf("job %q is in nonexistent namespace %q", job.ID, job.Namespace)
1535	}
1536
1537	// Check if the job already exists
1538	existing, err := txn.First("jobs", "id", job.Namespace, job.ID)
1539	var existingJob *structs.Job
1540	if err != nil {
1541		return fmt.Errorf("job lookup failed: %v", err)
1542	}
1543
1544	// Setup the indexes correctly
1545	if existing != nil {
1546		job.CreateIndex = existing.(*structs.Job).CreateIndex
1547		job.ModifyIndex = index
1548
1549		existingJob = existing.(*structs.Job)
1550
1551		// Bump the version unless asked to keep it. This should only be done
1552		// when changing an internal field such as Stable. A spec change should
1553		// always come with a version bump
1554		if !keepVersion {
1555			job.JobModifyIndex = index
1556			if job.Version <= existingJob.Version {
1557				job.Version = existingJob.Version + 1
1558			}
1559		}
1560
1561		// Compute the job status
1562		var err error
1563		job.Status, err = s.getJobStatus(txn, job, false)
1564		if err != nil {
1565			return fmt.Errorf("setting job status for %q failed: %v", job.ID, err)
1566		}
1567	} else {
1568		job.CreateIndex = index
1569		job.ModifyIndex = index
1570		job.JobModifyIndex = index
1571
1572		if err := s.setJobStatus(index, txn, job, false, ""); err != nil {
1573			return fmt.Errorf("setting job status for %q failed: %v", job.ID, err)
1574		}
1575
1576		// Have to get the job again since it could have been updated
1577		updated, err := txn.First("jobs", "id", job.Namespace, job.ID)
1578		if err != nil {
1579			return fmt.Errorf("job lookup failed: %v", err)
1580		}
1581		if updated != nil {
1582			job = updated.(*structs.Job)
1583		}
1584	}
1585
1586	if err := s.updateSummaryWithJob(index, job, txn); err != nil {
1587		return fmt.Errorf("unable to create job summary: %v", err)
1588	}
1589
1590	if err := s.upsertJobVersion(index, job, txn); err != nil {
1591		return fmt.Errorf("unable to upsert job into job_version table: %v", err)
1592	}
1593
1594	if err := s.updateJobScalingPolicies(index, job, txn); err != nil {
1595		return fmt.Errorf("unable to update job scaling policies: %v", err)
1596	}
1597
1598	if err := s.updateJobRecommendations(index, txn, existingJob, job); err != nil {
1599		return fmt.Errorf("unable to update job recommendations: %v", err)
1600	}
1601
1602	if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil {
1603		return fmt.Errorf("unable to update job scaling policies: %v", err)
1604	}
1605
1606	// Insert the job
1607	if err := txn.Insert("jobs", job); err != nil {
1608		return fmt.Errorf("job insert failed: %v", err)
1609	}
1610	if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil {
1611		return fmt.Errorf("index update failed: %v", err)
1612	}
1613
1614	return nil
1615}
1616
1617// DeleteJob is used to deregister a job
1618func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error {
1619	txn := s.db.WriteTxn(index)
1620	defer txn.Abort()
1621
1622	err := s.DeleteJobTxn(index, namespace, jobID, txn)
1623	if err == nil {
1624		return txn.Commit()
1625	}
1626	return err
1627}
1628
1629// DeleteJobTxn is used to deregister a job, like DeleteJob,
1630// but in a transaction.  Useful for when making multiple modifications atomically
1631func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn) error {
1632	// Lookup the node
1633	existing, err := txn.First("jobs", "id", namespace, jobID)
1634	if err != nil {
1635		return fmt.Errorf("job lookup failed: %v", err)
1636	}
1637	if existing == nil {
1638		return fmt.Errorf("job not found")
1639	}
1640
1641	// Check if we should update a parent job summary
1642	job := existing.(*structs.Job)
1643	if job.ParentID != "" {
1644		summaryRaw, err := txn.First("job_summary", "id", namespace, job.ParentID)
1645		if err != nil {
1646			return fmt.Errorf("unable to retrieve summary for parent job: %v", err)
1647		}
1648
1649		// Only continue if the summary exists. It could not exist if the parent
1650		// job was removed
1651		if summaryRaw != nil {
1652			existing := summaryRaw.(*structs.JobSummary)
1653			pSummary := existing.Copy()
1654			if pSummary.Children != nil {
1655
1656				modified := false
1657				switch job.Status {
1658				case structs.JobStatusPending:
1659					pSummary.Children.Pending--
1660					pSummary.Children.Dead++
1661					modified = true
1662				case structs.JobStatusRunning:
1663					pSummary.Children.Running--
1664					pSummary.Children.Dead++
1665					modified = true
1666				case structs.JobStatusDead:
1667				default:
1668					return fmt.Errorf("unknown old job status %q", job.Status)
1669				}
1670
1671				if modified {
1672					// Update the modify index
1673					pSummary.ModifyIndex = index
1674
1675					// Insert the summary
1676					if err := txn.Insert("job_summary", pSummary); err != nil {
1677						return fmt.Errorf("job summary insert failed: %v", err)
1678					}
1679					if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
1680						return fmt.Errorf("index update failed: %v", err)
1681					}
1682				}
1683			}
1684		}
1685	}
1686
1687	// Delete the job
1688	if err := txn.Delete("jobs", existing); err != nil {
1689		return fmt.Errorf("job delete failed: %v", err)
1690	}
1691	if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil {
1692		return fmt.Errorf("index update failed: %v", err)
1693	}
1694
1695	// Delete the job versions
1696	if err := s.deleteJobVersions(index, job, txn); err != nil {
1697		return err
1698	}
1699
1700	// Cleanup plugins registered by this job, before we delete the summary
1701	err = s.deleteJobFromPlugins(index, txn, job)
1702	if err != nil {
1703		return fmt.Errorf("deleting job from plugin: %v", err)
1704	}
1705
1706	// Delete the job summary
1707	if _, err = txn.DeleteAll("job_summary", "id", namespace, jobID); err != nil {
1708		return fmt.Errorf("deleting job summary failed: %v", err)
1709	}
1710	if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
1711		return fmt.Errorf("index update failed: %v", err)
1712	}
1713
1714	// Delete any remaining job scaling policies
1715	if err := s.deleteJobScalingPolicies(index, job, txn); err != nil {
1716		return fmt.Errorf("deleting job scaling policies failed: %v", err)
1717	}
1718
1719	// Delete any job recommendations
1720	if err := s.deleteRecommendationsByJob(index, txn, job); err != nil {
1721		return fmt.Errorf("deleting job recommendatons failed: %v", err)
1722	}
1723
1724	// Delete the scaling events
1725	if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil {
1726		return fmt.Errorf("deleting job scaling events failed: %v", err)
1727	}
1728	if err := txn.Insert("index", &IndexEntry{"scaling_event", index}); err != nil {
1729		return fmt.Errorf("index update failed: %v", err)
1730	}
1731
1732	return nil
1733}
1734
1735// deleteJobScalingPolicies deletes any scaling policies associated with the job
1736func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *txn) error {
1737	iter, err := s.ScalingPoliciesByJobTxn(nil, job.Namespace, job.ID, txn)
1738	if err != nil {
1739		return fmt.Errorf("getting job scaling policies for deletion failed: %v", err)
1740	}
1741
1742	// Put them into a slice so there are no safety concerns while actually
1743	// performing the deletes
1744	policies := []interface{}{}
1745	for {
1746		raw := iter.Next()
1747		if raw == nil {
1748			break
1749		}
1750		policies = append(policies, raw)
1751	}
1752
1753	// Do the deletes
1754	for _, p := range policies {
1755		if err := txn.Delete("scaling_policy", p); err != nil {
1756			return fmt.Errorf("deleting scaling policy failed: %v", err)
1757		}
1758	}
1759
1760	if len(policies) > 0 {
1761		if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
1762			return fmt.Errorf("index update failed: %v", err)
1763		}
1764	}
1765	return nil
1766}
1767
1768// deleteJobVersions deletes all versions of the given job.
1769func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *txn) error {
1770	iter, err := txn.Get("job_version", "id_prefix", job.Namespace, job.ID)
1771	if err != nil {
1772		return err
1773	}
1774
1775	// Put them into a slice so there are no safety concerns while actually
1776	// performing the deletes
1777	jobs := []*structs.Job{}
1778	for {
1779		raw := iter.Next()
1780		if raw == nil {
1781			break
1782		}
1783
1784		// Ensure the ID is an exact match
1785		j := raw.(*structs.Job)
1786		if j.ID != job.ID {
1787			continue
1788		}
1789
1790		jobs = append(jobs, j)
1791	}
1792
1793	// Do the deletes
1794	for _, j := range jobs {
1795		if err := txn.Delete("job_version", j); err != nil {
1796			return fmt.Errorf("deleting job versions failed: %v", err)
1797		}
1798	}
1799
1800	if err := txn.Insert("index", &IndexEntry{"job_version", index}); err != nil {
1801		return fmt.Errorf("index update failed: %v", err)
1802	}
1803
1804	return nil
1805}
1806
1807// upsertJobVersion inserts a job into its historic version table and limits the
1808// number of job versions that are tracked.
1809func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) error {
1810	// Insert the job
1811	if err := txn.Insert("job_version", job); err != nil {
1812		return fmt.Errorf("failed to insert job into job_version table: %v", err)
1813	}
1814
1815	if err := txn.Insert("index", &IndexEntry{"job_version", index}); err != nil {
1816		return fmt.Errorf("index update failed: %v", err)
1817	}
1818
1819	// Get all the historic jobs for this ID
1820	all, err := s.jobVersionByID(txn, nil, job.Namespace, job.ID)
1821	if err != nil {
1822		return fmt.Errorf("failed to look up job versions for %q: %v", job.ID, err)
1823	}
1824
1825	// If we are below the limit there is no GCing to be done
1826	if len(all) <= structs.JobTrackedVersions {
1827		return nil
1828	}
1829
1830	// We have to delete a historic job to make room.
1831	// Find index of the highest versioned stable job
1832	stableIdx := -1
1833	for i, j := range all {
1834		if j.Stable {
1835			stableIdx = i
1836			break
1837		}
1838	}
1839
1840	// If the stable job is the oldest version, do a swap to bring it into the
1841	// keep set.
1842	max := structs.JobTrackedVersions
1843	if stableIdx == max {
1844		all[max-1], all[max] = all[max], all[max-1]
1845	}
1846
1847	// Delete the job outside of the set that are being kept.
1848	d := all[max]
1849	if err := txn.Delete("job_version", d); err != nil {
1850		return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version)
1851	}
1852
1853	return nil
1854}
1855
1856// JobByID is used to lookup a job by its ID. JobByID returns the current/latest job
1857// version.
1858func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error) {
1859	txn := s.db.ReadTxn()
1860	return s.JobByIDTxn(ws, namespace, id, txn)
1861}
1862
1863// JobByIDTxn is used to lookup a job by its ID, like  JobByID. JobByID returns the job version
1864// accessible through in the transaction
1865func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn) (*structs.Job, error) {
1866	watchCh, existing, err := txn.FirstWatch("jobs", "id", namespace, id)
1867	if err != nil {
1868		return nil, fmt.Errorf("job lookup failed: %v", err)
1869	}
1870	ws.Add(watchCh)
1871
1872	if existing != nil {
1873		return existing.(*structs.Job), nil
1874	}
1875	return nil, nil
1876}
1877
1878// JobsByIDPrefix is used to lookup a job by prefix
1879func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) {
1880	txn := s.db.ReadTxn()
1881
1882	iter, err := txn.Get("jobs", "id_prefix", namespace, id)
1883	if err != nil {
1884		return nil, fmt.Errorf("job lookup failed: %v", err)
1885	}
1886
1887	ws.Add(iter.WatchCh())
1888
1889	return iter, nil
1890}
1891
1892// JobVersionsByID returns all the tracked versions of a job.
1893func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) {
1894	txn := s.db.ReadTxn()
1895
1896	return s.jobVersionByID(txn, ws, namespace, id)
1897}
1898
1899// jobVersionByID is the underlying implementation for retrieving all tracked
1900// versions of a job and is called under an existing transaction. A watch set
1901// can optionally be passed in to add the job histories to the watch set.
1902func (s *StateStore) jobVersionByID(txn *txn, ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) {
1903	// Get all the historic jobs for this ID
1904	iter, err := txn.Get("job_version", "id_prefix", namespace, id)
1905	if err != nil {
1906		return nil, err
1907	}
1908
1909	ws.Add(iter.WatchCh())
1910
1911	var all []*structs.Job
1912	for {
1913		raw := iter.Next()
1914		if raw == nil {
1915			break
1916		}
1917
1918		// Ensure the ID is an exact match
1919		j := raw.(*structs.Job)
1920		if j.ID != id {
1921			continue
1922		}
1923
1924		all = append(all, j)
1925	}
1926
1927	// Sort in reverse order so that the highest version is first
1928	sort.Slice(all, func(i, j int) bool {
1929		return all[i].Version > all[j].Version
1930	})
1931
1932	return all, nil
1933}
1934
1935// JobByIDAndVersion returns the job identified by its ID and Version. The
1936// passed watchset may be nil.
1937func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, version uint64) (*structs.Job, error) {
1938	txn := s.db.ReadTxn()
1939	return s.jobByIDAndVersionImpl(ws, namespace, id, version, txn)
1940}
1941
1942// jobByIDAndVersionImpl returns the job identified by its ID and Version. The
1943// passed watchset may be nil.
1944func (s *StateStore) jobByIDAndVersionImpl(ws memdb.WatchSet, namespace, id string,
1945	version uint64, txn *txn) (*structs.Job, error) {
1946
1947	watchCh, existing, err := txn.FirstWatch("job_version", "id", namespace, id, version)
1948	if err != nil {
1949		return nil, err
1950	}
1951
1952	ws.Add(watchCh)
1953
1954	if existing != nil {
1955		job := existing.(*structs.Job)
1956		return job, nil
1957	}
1958
1959	return nil, nil
1960}
1961
1962func (s *StateStore) JobVersions(ws memdb.WatchSet) (memdb.ResultIterator, error) {
1963	txn := s.db.ReadTxn()
1964
1965	// Walk the entire deployments table
1966	iter, err := txn.Get("job_version", "id")
1967	if err != nil {
1968		return nil, err
1969	}
1970
1971	ws.Add(iter.WatchCh())
1972	return iter, nil
1973}
1974
1975// Jobs returns an iterator over all the jobs
1976func (s *StateStore) Jobs(ws memdb.WatchSet) (memdb.ResultIterator, error) {
1977	txn := s.db.ReadTxn()
1978
1979	// Walk the entire jobs table
1980	iter, err := txn.Get("jobs", "id")
1981	if err != nil {
1982		return nil, err
1983	}
1984
1985	ws.Add(iter.WatchCh())
1986
1987	return iter, nil
1988}
1989
1990// JobsByNamespace returns an iterator over all the jobs for the given namespace
1991func (s *StateStore) JobsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
1992	txn := s.db.ReadTxn()
1993	return s.jobsByNamespaceImpl(ws, namespace, txn)
1994}
1995
1996// jobsByNamespaceImpl returns an iterator over all the jobs for the given namespace
1997func (s *StateStore) jobsByNamespaceImpl(ws memdb.WatchSet, namespace string, txn *txn) (memdb.ResultIterator, error) {
1998	// Walk the entire jobs table
1999	iter, err := txn.Get("jobs", "id_prefix", namespace, "")
2000	if err != nil {
2001		return nil, err
2002	}
2003
2004	ws.Add(iter.WatchCh())
2005
2006	return iter, nil
2007}
2008
2009// JobsByPeriodic returns an iterator over all the periodic or non-periodic jobs.
2010func (s *StateStore) JobsByPeriodic(ws memdb.WatchSet, periodic bool) (memdb.ResultIterator, error) {
2011	txn := s.db.ReadTxn()
2012
2013	iter, err := txn.Get("jobs", "periodic", periodic)
2014	if err != nil {
2015		return nil, err
2016	}
2017
2018	ws.Add(iter.WatchCh())
2019
2020	return iter, nil
2021}
2022
2023// JobsByScheduler returns an iterator over all the jobs with the specific
2024// scheduler type.
2025func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (memdb.ResultIterator, error) {
2026	txn := s.db.ReadTxn()
2027
2028	// Return an iterator for jobs with the specific type.
2029	iter, err := txn.Get("jobs", "type", schedulerType)
2030	if err != nil {
2031		return nil, err
2032	}
2033
2034	ws.Add(iter.WatchCh())
2035
2036	return iter, nil
2037}
2038
2039// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage
2040// collection.
2041func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, error) {
2042	txn := s.db.ReadTxn()
2043
2044	iter, err := txn.Get("jobs", "gc", gc)
2045	if err != nil {
2046		return nil, err
2047	}
2048
2049	ws.Add(iter.WatchCh())
2050
2051	return iter, nil
2052}
2053
2054// JobSummary returns a job summary object which matches a specific id.
2055func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) {
2056	txn := s.db.ReadTxn()
2057
2058	watchCh, existing, err := txn.FirstWatch("job_summary", "id", namespace, jobID)
2059	if err != nil {
2060		return nil, err
2061	}
2062
2063	ws.Add(watchCh)
2064
2065	if existing != nil {
2066		summary := existing.(*structs.JobSummary)
2067		return summary, nil
2068	}
2069
2070	return nil, nil
2071}
2072
2073// JobSummaries walks the entire job summary table and returns all the job
2074// summary objects
2075func (s *StateStore) JobSummaries(ws memdb.WatchSet) (memdb.ResultIterator, error) {
2076	txn := s.db.ReadTxn()
2077
2078	iter, err := txn.Get("job_summary", "id")
2079	if err != nil {
2080		return nil, err
2081	}
2082
2083	ws.Add(iter.WatchCh())
2084
2085	return iter, nil
2086}
2087
2088// JobSummaryByPrefix is used to look up Job Summary by id prefix
2089func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) {
2090	txn := s.db.ReadTxn()
2091
2092	iter, err := txn.Get("job_summary", "id_prefix", namespace, id)
2093	if err != nil {
2094		return nil, fmt.Errorf("job_summary lookup failed: %v", err)
2095	}
2096
2097	ws.Add(iter.WatchCh())
2098
2099	return iter, nil
2100}
2101
2102// CSIVolumeRegister adds a volume to the server store, failing if it already exists
2103func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolume) error {
2104	txn := s.db.WriteTxn(index)
2105	defer txn.Abort()
2106
2107	for _, v := range volumes {
2108		if exists, err := s.namespaceExists(txn, v.Namespace); err != nil {
2109			return err
2110		} else if !exists {
2111			return fmt.Errorf("volume %s is in nonexistent namespace %s", v.ID, v.Namespace)
2112		}
2113
2114		// Check for volume existence
2115		obj, err := txn.First("csi_volumes", "id", v.Namespace, v.ID)
2116		if err != nil {
2117			return fmt.Errorf("volume existence check error: %v", err)
2118		}
2119		if obj != nil {
2120			// Allow some properties of a volume to be updated in place, but
2121			// prevent accidentally overwriting important properties, or
2122			// overwriting a volume in use
2123			old, ok := obj.(*structs.CSIVolume)
2124			if ok &&
2125				old.InUse() ||
2126				old.ExternalID != v.ExternalID ||
2127				old.PluginID != v.PluginID ||
2128				old.Provider != v.Provider {
2129				return fmt.Errorf("volume exists: %s", v.ID)
2130			}
2131		}
2132
2133		if v.CreateIndex == 0 {
2134			v.CreateIndex = index
2135			v.ModifyIndex = index
2136		}
2137
2138		// Allocations are copy on write, so we want to keep the Allocation ID
2139		// but we need to clear the pointer so that we don't store it when we
2140		// write the volume to the state store. We'll get it from the db in
2141		// denormalize.
2142		for allocID := range v.ReadAllocs {
2143			v.ReadAllocs[allocID] = nil
2144		}
2145		for allocID := range v.WriteAllocs {
2146			v.WriteAllocs[allocID] = nil
2147		}
2148
2149		err = txn.Insert("csi_volumes", v)
2150		if err != nil {
2151			return fmt.Errorf("volume insert: %v", err)
2152		}
2153	}
2154
2155	if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil {
2156		return fmt.Errorf("index update failed: %v", err)
2157	}
2158
2159	return txn.Commit()
2160}
2161
2162// CSIVolumes returns the unfiltered list of all volumes. Caller should
2163// snapshot if it wants to also denormalize the plugins.
2164func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) {
2165	txn := s.db.ReadTxn()
2166	defer txn.Abort()
2167
2168	iter, err := txn.Get("csi_volumes", "id")
2169	if err != nil {
2170		return nil, fmt.Errorf("csi_volumes lookup failed: %v", err)
2171	}
2172
2173	ws.Add(iter.WatchCh())
2174
2175	return iter, nil
2176}
2177
2178// CSIVolumeByID is used to lookup a single volume. Returns a copy of the
2179// volume because its plugins and allocations are denormalized to provide
2180// accurate Health.
2181func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*structs.CSIVolume, error) {
2182	txn := s.db.ReadTxn()
2183
2184	watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", namespace, id)
2185	if err != nil {
2186		return nil, fmt.Errorf("volume lookup failed for %s: %v", id, err)
2187	}
2188	ws.Add(watchCh)
2189
2190	if obj == nil {
2191		return nil, nil
2192	}
2193	vol := obj.(*structs.CSIVolume)
2194
2195	// we return the volume with the plugins denormalized by default,
2196	// because the scheduler needs them for feasibility checking
2197	return s.CSIVolumeDenormalizePluginsTxn(txn, vol.Copy())
2198}
2199
2200// CSIVolumes looks up csi_volumes by pluginID. Caller should snapshot if it
2201// wants to also denormalize the plugins.
2202func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix, pluginID string) (memdb.ResultIterator, error) {
2203	txn := s.db.ReadTxn()
2204
2205	iter, err := txn.Get("csi_volumes", "plugin_id", pluginID)
2206	if err != nil {
2207		return nil, fmt.Errorf("volume lookup failed: %v", err)
2208	}
2209
2210	// Filter the iterator by namespace
2211	f := func(raw interface{}) bool {
2212		v, ok := raw.(*structs.CSIVolume)
2213		if !ok {
2214			return false
2215		}
2216		return v.Namespace != namespace && strings.HasPrefix(v.ID, prefix)
2217	}
2218
2219	wrap := memdb.NewFilterIterator(iter, f)
2220	return wrap, nil
2221}
2222
2223// CSIVolumesByIDPrefix supports search. Caller should snapshot if it wants to
2224// also denormalize the plugins.
2225func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID string) (memdb.ResultIterator, error) {
2226	txn := s.db.ReadTxn()
2227
2228	iter, err := txn.Get("csi_volumes", "id_prefix", namespace, volumeID)
2229	if err != nil {
2230		return nil, err
2231	}
2232
2233	ws.Add(iter.WatchCh())
2234
2235	return iter, nil
2236}
2237
2238// CSIVolumesByNodeID looks up CSIVolumes in use on a node. Caller should
2239// snapshot if it wants to also denormalize the plugins.
2240func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string) (memdb.ResultIterator, error) {
2241	allocs, err := s.AllocsByNode(ws, nodeID)
2242	if err != nil {
2243		return nil, fmt.Errorf("alloc lookup failed: %v", err)
2244	}
2245
2246	// Find volume ids for CSI volumes in running allocs, or allocs that we desire to run
2247	ids := map[string]string{} // Map volumeID to Namespace
2248	for _, a := range allocs {
2249		tg := a.Job.LookupTaskGroup(a.TaskGroup)
2250
2251		if !(a.DesiredStatus == structs.AllocDesiredStatusRun ||
2252			a.ClientStatus == structs.AllocClientStatusRunning) ||
2253			len(tg.Volumes) == 0 {
2254			continue
2255		}
2256
2257		for _, v := range tg.Volumes {
2258			if v.Type != structs.VolumeTypeCSI {
2259				continue
2260			}
2261			ids[v.Source] = a.Namespace
2262		}
2263	}
2264
2265	// Lookup the raw CSIVolumes to match the other list interfaces
2266	iter := NewSliceIterator()
2267	txn := s.db.ReadTxn()
2268	for id, namespace := range ids {
2269		if strings.HasPrefix(id, prefix) {
2270			watchCh, raw, err := txn.FirstWatch("csi_volumes", "id", namespace, id)
2271			if err != nil {
2272				return nil, fmt.Errorf("volume lookup failed: %s %v", id, err)
2273			}
2274			ws.Add(watchCh)
2275			iter.Add(raw)
2276		}
2277	}
2278
2279	return iter, nil
2280}
2281
2282// CSIVolumesByNamespace looks up the entire csi_volumes table
2283func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace, prefix string) (memdb.ResultIterator, error) {
2284	txn := s.db.ReadTxn()
2285
2286	iter, err := txn.Get("csi_volumes", "id_prefix", namespace, prefix)
2287	if err != nil {
2288		return nil, fmt.Errorf("volume lookup failed: %v", err)
2289	}
2290
2291	ws.Add(iter.WatchCh())
2292
2293	return iter, nil
2294}
2295
2296// CSIVolumeClaim updates the volume's claim count and allocation list
2297func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *structs.CSIVolumeClaim) error {
2298	txn := s.db.WriteTxn(index)
2299	defer txn.Abort()
2300
2301	row, err := txn.First("csi_volumes", "id", namespace, id)
2302	if err != nil {
2303		return fmt.Errorf("volume lookup failed: %s: %v", id, err)
2304	}
2305	if row == nil {
2306		return fmt.Errorf("volume not found: %s", id)
2307	}
2308
2309	orig, ok := row.(*structs.CSIVolume)
2310	if !ok {
2311		return fmt.Errorf("volume row conversion error")
2312	}
2313
2314	var alloc *structs.Allocation
2315	if claim.State == structs.CSIVolumeClaimStateTaken {
2316		alloc, err = s.allocByIDImpl(txn, nil, claim.AllocationID)
2317		if err != nil {
2318			s.logger.Error("AllocByID failed", "error", err)
2319			return fmt.Errorf(structs.ErrUnknownAllocationPrefix)
2320		}
2321		if alloc == nil {
2322			s.logger.Error("AllocByID failed to find alloc", "alloc_id", claim.AllocationID)
2323			if err != nil {
2324				return fmt.Errorf(structs.ErrUnknownAllocationPrefix)
2325			}
2326		}
2327	}
2328
2329	volume, err := s.CSIVolumeDenormalizePluginsTxn(txn, orig.Copy())
2330	if err != nil {
2331		return err
2332	}
2333	volume, err = s.CSIVolumeDenormalizeTxn(txn, nil, volume)
2334	if err != nil {
2335		return err
2336	}
2337
2338	// in the case of a job deregistration, there will be no allocation ID
2339	// for the claim but we still want to write an updated index to the volume
2340	// so that volume reaping is triggered
2341	if claim.AllocationID != "" {
2342		err = volume.Claim(claim, alloc)
2343		if err != nil {
2344			return err
2345		}
2346	}
2347
2348	volume.ModifyIndex = index
2349
2350	// Allocations are copy on write, so we want to keep the Allocation ID
2351	// but we need to clear the pointer so that we don't store it when we
2352	// write the volume to the state store. We'll get it from the db in
2353	// denormalize.
2354	for allocID := range volume.ReadAllocs {
2355		volume.ReadAllocs[allocID] = nil
2356	}
2357	for allocID := range volume.WriteAllocs {
2358		volume.WriteAllocs[allocID] = nil
2359	}
2360
2361	if err = txn.Insert("csi_volumes", volume); err != nil {
2362		return fmt.Errorf("volume update failed: %s: %v", id, err)
2363	}
2364
2365	if err = txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil {
2366		return fmt.Errorf("index update failed: %v", err)
2367	}
2368
2369	return txn.Commit()
2370}
2371
2372// CSIVolumeDeregister removes the volume from the server
2373func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []string, force bool) error {
2374	txn := s.db.WriteTxn(index)
2375	defer txn.Abort()
2376
2377	for _, id := range ids {
2378		existing, err := txn.First("csi_volumes", "id_prefix", namespace, id)
2379		if err != nil {
2380			return fmt.Errorf("volume lookup failed: %s: %v", id, err)
2381		}
2382
2383		if existing == nil {
2384			return fmt.Errorf("volume not found: %s", id)
2385		}
2386
2387		vol, ok := existing.(*structs.CSIVolume)
2388		if !ok {
2389			return fmt.Errorf("volume row conversion error: %s", id)
2390		}
2391
2392		// The common case for a volume deregister is when the volume is
2393		// unused, but we can also let an operator intervene in the case where
2394		// allocations have been stopped but claims can't be freed because
2395		// ex. the plugins have all been removed.
2396		if vol.InUse() {
2397			if !force || !s.volSafeToForce(txn, vol) {
2398				return fmt.Errorf("volume in use: %s", id)
2399			}
2400		}
2401
2402		if err = txn.Delete("csi_volumes", existing); err != nil {
2403			return fmt.Errorf("volume delete failed: %s: %v", id, err)
2404		}
2405	}
2406
2407	if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil {
2408		return fmt.Errorf("index update failed: %v", err)
2409	}
2410
2411	return txn.Commit()
2412}
2413
2414// volSafeToForce checks if the any of the remaining allocations
2415// are in a non-terminal state.
2416func (s *StateStore) volSafeToForce(txn Txn, v *structs.CSIVolume) bool {
2417	vol, err := s.CSIVolumeDenormalizeTxn(txn, nil, v)
2418	if err != nil {
2419		return false
2420	}
2421
2422	for _, alloc := range vol.ReadAllocs {
2423		if alloc != nil && !alloc.TerminalStatus() {
2424			return false
2425		}
2426	}
2427	for _, alloc := range vol.WriteAllocs {
2428		if alloc != nil && !alloc.TerminalStatus() {
2429			return false
2430		}
2431	}
2432	return true
2433}
2434
2435// CSIVolumeDenormalizePlugins returns a CSIVolume with current health and
2436// plugins, but without allocations.
2437// Use this for current volume metadata, handling lists of volumes.
2438// Use CSIVolumeDenormalize for volumes containing both health and current
2439// allocations.
2440func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
2441	if vol == nil {
2442		return nil, nil
2443	}
2444	txn := s.db.ReadTxn()
2445	defer txn.Abort()
2446	return s.CSIVolumeDenormalizePluginsTxn(txn, vol)
2447}
2448
2449// CSIVolumeDenormalizePluginsTxn returns a CSIVolume with current health and
2450// plugins, but without allocations.
2451// Use this for current volume metadata, handling lists of volumes.
2452// Use CSIVolumeDenormalize for volumes containing both health and current
2453// allocations.
2454func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
2455	if vol == nil {
2456		return nil, nil
2457	}
2458	plug, err := s.CSIPluginByIDTxn(txn, nil, vol.PluginID)
2459	if err != nil {
2460		return nil, fmt.Errorf("plugin lookup error: %s %v", vol.PluginID, err)
2461	}
2462	if plug == nil {
2463		vol.ControllersHealthy = 0
2464		vol.NodesHealthy = 0
2465		vol.Schedulable = false
2466		return vol, nil
2467	}
2468
2469	vol.Provider = plug.Provider
2470	vol.ProviderVersion = plug.Version
2471	vol.ControllerRequired = plug.ControllerRequired
2472	vol.ControllersHealthy = plug.ControllersHealthy
2473	vol.NodesHealthy = plug.NodesHealthy
2474
2475	// This value may be stale, but stale is ok
2476	vol.ControllersExpected = plug.ControllersExpected
2477	vol.NodesExpected = plug.NodesExpected
2478
2479	vol.Schedulable = vol.NodesHealthy > 0
2480	if vol.ControllerRequired {
2481		vol.Schedulable = vol.ControllersHealthy > 0 && vol.Schedulable
2482	}
2483
2484	return vol, nil
2485}
2486
2487// CSIVolumeDenormalize returns a CSIVolume with allocations
2488func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
2489	txn := s.db.ReadTxn()
2490	return s.CSIVolumeDenormalizeTxn(txn, ws, vol)
2491}
2492
2493// CSIVolumeDenormalizeTxn populates a CSIVolume with allocations
2494func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
2495	if vol == nil {
2496		return nil, nil
2497	}
2498	for id := range vol.ReadAllocs {
2499		a, err := s.allocByIDImpl(txn, ws, id)
2500		if err != nil {
2501			return nil, err
2502		}
2503		if a != nil {
2504			vol.ReadAllocs[id] = a
2505			// COMPAT(1.0): the CSIVolumeClaim fields were added
2506			// after 0.11.1, so claims made before that may be
2507			// missing this value. (same for WriteAlloc below)
2508			if _, ok := vol.ReadClaims[id]; !ok {
2509				vol.ReadClaims[id] = &structs.CSIVolumeClaim{
2510					AllocationID: a.ID,
2511					NodeID:       a.NodeID,
2512					Mode:         structs.CSIVolumeClaimRead,
2513					State:        structs.CSIVolumeClaimStateTaken,
2514				}
2515			}
2516		}
2517	}
2518
2519	for id := range vol.WriteAllocs {
2520		a, err := s.allocByIDImpl(txn, ws, id)
2521		if err != nil {
2522			return nil, err
2523		}
2524		if a != nil {
2525			vol.WriteAllocs[id] = a
2526			if _, ok := vol.WriteClaims[id]; !ok {
2527				vol.WriteClaims[id] = &structs.CSIVolumeClaim{
2528					AllocationID: a.ID,
2529					NodeID:       a.NodeID,
2530					Mode:         structs.CSIVolumeClaimWrite,
2531					State:        structs.CSIVolumeClaimStateTaken,
2532				}
2533			}
2534		}
2535	}
2536
2537	// COMPAT: the AccessMode and AttachmentMode fields were added to claims
2538	// in 1.1.0, so claims made before that may be missing this value. In this
2539	// case, the volume will already have AccessMode/AttachmentMode until it
2540	// no longer has any claims, so set from those values
2541	for _, claim := range vol.ReadClaims {
2542		if claim.AccessMode == "" || claim.AttachmentMode == "" {
2543			claim.AccessMode = vol.AccessMode
2544			claim.AttachmentMode = vol.AttachmentMode
2545		}
2546	}
2547	for _, claim := range vol.WriteClaims {
2548		if claim.AccessMode == "" || claim.AttachmentMode == "" {
2549			claim.AccessMode = vol.AccessMode
2550			claim.AttachmentMode = vol.AttachmentMode
2551		}
2552	}
2553
2554	return vol, nil
2555}
2556
2557// CSIPlugins returns the unfiltered list of all plugin health status
2558func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error) {
2559	txn := s.db.ReadTxn()
2560	defer txn.Abort()
2561
2562	iter, err := txn.Get("csi_plugins", "id")
2563	if err != nil {
2564		return nil, fmt.Errorf("csi_plugins lookup failed: %v", err)
2565	}
2566
2567	ws.Add(iter.WatchCh())
2568
2569	return iter, nil
2570}
2571
2572// CSIPluginsByIDPrefix supports search
2573func (s *StateStore) CSIPluginsByIDPrefix(ws memdb.WatchSet, pluginID string) (memdb.ResultIterator, error) {
2574	txn := s.db.ReadTxn()
2575
2576	iter, err := txn.Get("csi_plugins", "id_prefix", pluginID)
2577	if err != nil {
2578		return nil, err
2579	}
2580
2581	ws.Add(iter.WatchCh())
2582
2583	return iter, nil
2584}
2585
2586// CSIPluginByID returns a named CSIPlugin. This method creates a new
2587// transaction so you should not call it from within another transaction.
2588func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) {
2589	txn := s.db.ReadTxn()
2590	plugin, err := s.CSIPluginByIDTxn(txn, ws, id)
2591	if err != nil {
2592		return nil, err
2593	}
2594	return plugin, nil
2595}
2596
2597// CSIPluginByIDTxn returns a named CSIPlugin
2598func (s *StateStore) CSIPluginByIDTxn(txn Txn, ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) {
2599
2600	watchCh, obj, err := txn.FirstWatch("csi_plugins", "id_prefix", id)
2601	if err != nil {
2602		return nil, fmt.Errorf("csi_plugin lookup failed: %s %v", id, err)
2603	}
2604
2605	ws.Add(watchCh)
2606
2607	if obj != nil {
2608		return obj.(*structs.CSIPlugin), nil
2609	}
2610	return nil, nil
2611}
2612
2613// CSIPluginDenormalize returns a CSIPlugin with allocation details. Always called on a copy of the plugin.
2614func (s *StateStore) CSIPluginDenormalize(ws memdb.WatchSet, plug *structs.CSIPlugin) (*structs.CSIPlugin, error) {
2615	txn := s.db.ReadTxn()
2616	return s.CSIPluginDenormalizeTxn(txn, ws, plug)
2617}
2618
2619func (s *StateStore) CSIPluginDenormalizeTxn(txn Txn, ws memdb.WatchSet, plug *structs.CSIPlugin) (*structs.CSIPlugin, error) {
2620	if plug == nil {
2621		return nil, nil
2622	}
2623
2624	// Get the unique list of allocation ids
2625	ids := map[string]struct{}{}
2626	for _, info := range plug.Controllers {
2627		ids[info.AllocID] = struct{}{}
2628	}
2629	for _, info := range plug.Nodes {
2630		ids[info.AllocID] = struct{}{}
2631	}
2632
2633	for id := range ids {
2634		alloc, err := s.allocByIDImpl(txn, ws, id)
2635		if err != nil {
2636			return nil, err
2637		}
2638		if alloc == nil {
2639			continue
2640		}
2641		plug.Allocations = append(plug.Allocations, alloc.Stub(nil))
2642	}
2643
2644	return plug, nil
2645}
2646
2647// UpsertCSIPlugin writes the plugin to the state store. Note: there
2648// is currently no raft message for this, as it's intended to support
2649// testing use cases.
2650func (s *StateStore) UpsertCSIPlugin(index uint64, plug *structs.CSIPlugin) error {
2651	txn := s.db.WriteTxn(index)
2652	defer txn.Abort()
2653
2654	existing, err := txn.First("csi_plugins", "id", plug.ID)
2655	if err != nil {
2656		return fmt.Errorf("csi_plugin lookup error: %s %v", plug.ID, err)
2657	}
2658
2659	plug.ModifyIndex = index
2660	if existing != nil {
2661		plug.CreateIndex = existing.(*structs.CSIPlugin).CreateIndex
2662	}
2663
2664	err = txn.Insert("csi_plugins", plug)
2665	if err != nil {
2666		return fmt.Errorf("csi_plugins insert error: %v", err)
2667	}
2668	if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
2669		return fmt.Errorf("index update failed: %v", err)
2670	}
2671	return txn.Commit()
2672}
2673
2674// DeleteCSIPlugin deletes the plugin if it's not in use.
2675func (s *StateStore) DeleteCSIPlugin(index uint64, id string) error {
2676	txn := s.db.WriteTxn(index)
2677	defer txn.Abort()
2678
2679	plug, err := s.CSIPluginByIDTxn(txn, nil, id)
2680	if err != nil {
2681		return err
2682	}
2683
2684	if plug == nil {
2685		return nil
2686	}
2687
2688	plug, err = s.CSIPluginDenormalizeTxn(txn, nil, plug.Copy())
2689	if err != nil {
2690		return err
2691	}
2692	if !plug.IsEmpty() {
2693		return fmt.Errorf("plugin in use")
2694	}
2695
2696	err = txn.Delete("csi_plugins", plug)
2697	if err != nil {
2698		return fmt.Errorf("csi_plugins delete error: %v", err)
2699	}
2700	return txn.Commit()
2701}
2702
2703// UpsertPeriodicLaunch is used to register a launch or update it.
2704func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error {
2705	txn := s.db.WriteTxn(index)
2706	defer txn.Abort()
2707
2708	// Check if the job already exists
2709	existing, err := txn.First("periodic_launch", "id", launch.Namespace, launch.ID)
2710	if err != nil {
2711		return fmt.Errorf("periodic launch lookup failed: %v", err)
2712	}
2713
2714	// Setup the indexes correctly
2715	if existing != nil {
2716		launch.CreateIndex = existing.(*structs.PeriodicLaunch).CreateIndex
2717		launch.ModifyIndex = index
2718	} else {
2719		launch.CreateIndex = index
2720		launch.ModifyIndex = index
2721	}
2722
2723	// Insert the job
2724	if err := txn.Insert("periodic_launch", launch); err != nil {
2725		return fmt.Errorf("launch insert failed: %v", err)
2726	}
2727	if err := txn.Insert("index", &IndexEntry{"periodic_launch", index}); err != nil {
2728		return fmt.Errorf("index update failed: %v", err)
2729	}
2730
2731	return txn.Commit()
2732}
2733
2734// DeletePeriodicLaunch is used to delete the periodic launch
2735func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string) error {
2736	txn := s.db.WriteTxn(index)
2737	defer txn.Abort()
2738
2739	err := s.DeletePeriodicLaunchTxn(index, namespace, jobID, txn)
2740	if err == nil {
2741		return txn.Commit()
2742	}
2743	return err
2744}
2745
2746// DeletePeriodicLaunchTxn is used to delete the periodic launch, like DeletePeriodicLaunch
2747// but in a transaction.  Useful for when making multiple modifications atomically
2748func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID string, txn Txn) error {
2749	// Lookup the launch
2750	existing, err := txn.First("periodic_launch", "id", namespace, jobID)
2751	if err != nil {
2752		return fmt.Errorf("launch lookup failed: %v", err)
2753	}
2754	if existing == nil {
2755		return fmt.Errorf("launch not found")
2756	}
2757
2758	// Delete the launch
2759	if err := txn.Delete("periodic_launch", existing); err != nil {
2760		return fmt.Errorf("launch delete failed: %v", err)
2761	}
2762	if err := txn.Insert("index", &IndexEntry{"periodic_launch", index}); err != nil {
2763		return fmt.Errorf("index update failed: %v", err)
2764	}
2765
2766	return nil
2767}
2768
2769// PeriodicLaunchByID is used to lookup a periodic launch by the periodic job
2770// ID.
2771func (s *StateStore) PeriodicLaunchByID(ws memdb.WatchSet, namespace, id string) (*structs.PeriodicLaunch, error) {
2772	txn := s.db.ReadTxn()
2773
2774	watchCh, existing, err := txn.FirstWatch("periodic_launch", "id", namespace, id)
2775	if err != nil {
2776		return nil, fmt.Errorf("periodic launch lookup failed: %v", err)
2777	}
2778
2779	ws.Add(watchCh)
2780
2781	if existing != nil {
2782		return existing.(*structs.PeriodicLaunch), nil
2783	}
2784	return nil, nil
2785}
2786
2787// PeriodicLaunches returns an iterator over all the periodic launches
2788func (s *StateStore) PeriodicLaunches(ws memdb.WatchSet) (memdb.ResultIterator, error) {
2789	txn := s.db.ReadTxn()
2790
2791	// Walk the entire table
2792	iter, err := txn.Get("periodic_launch", "id")
2793	if err != nil {
2794		return nil, err
2795	}
2796
2797	ws.Add(iter.WatchCh())
2798
2799	return iter, nil
2800}
2801
2802// UpsertEvals is used to upsert a set of evaluations
2803func (s *StateStore) UpsertEvals(msgType structs.MessageType, index uint64, evals []*structs.Evaluation) error {
2804	txn := s.db.WriteTxnMsgT(msgType, index)
2805	defer txn.Abort()
2806
2807	err := s.UpsertEvalsTxn(index, evals, txn)
2808	if err == nil {
2809		return txn.Commit()
2810	}
2811	return err
2812}
2813
2814// UpsertEvals is used to upsert a set of evaluations, like UpsertEvals
2815// but in a transaction.  Useful for when making multiple modifications atomically
2816func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, txn Txn) error {
2817	// Do a nested upsert
2818	jobs := make(map[structs.NamespacedID]string, len(evals))
2819	for _, eval := range evals {
2820		if err := s.nestedUpsertEval(txn, index, eval); err != nil {
2821			return err
2822		}
2823
2824		tuple := structs.NamespacedID{
2825			ID:        eval.JobID,
2826			Namespace: eval.Namespace,
2827		}
2828		jobs[tuple] = ""
2829	}
2830
2831	// Set the job's status
2832	if err := s.setJobStatuses(index, txn, jobs, false); err != nil {
2833		return fmt.Errorf("setting job status failed: %v", err)
2834	}
2835
2836	return nil
2837}
2838
2839// nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction
2840func (s *StateStore) nestedUpsertEval(txn *txn, index uint64, eval *structs.Evaluation) error {
2841	// Lookup the evaluation
2842	existing, err := txn.First("evals", "id", eval.ID)
2843	if err != nil {
2844		return fmt.Errorf("eval lookup failed: %v", err)
2845	}
2846
2847	// Update the indexes
2848	if existing != nil {
2849		eval.CreateIndex = existing.(*structs.Evaluation).CreateIndex
2850		eval.ModifyIndex = index
2851	} else {
2852		eval.CreateIndex = index
2853		eval.ModifyIndex = index
2854	}
2855
2856	// Update the job summary
2857	summaryRaw, err := txn.First("job_summary", "id", eval.Namespace, eval.JobID)
2858	if err != nil {
2859		return fmt.Errorf("job summary lookup failed: %v", err)
2860	}
2861	if summaryRaw != nil {
2862		js := summaryRaw.(*structs.JobSummary).Copy()
2863		hasSummaryChanged := false
2864		for tg, num := range eval.QueuedAllocations {
2865			if summary, ok := js.Summary[tg]; ok {
2866				if summary.Queued != num {
2867					summary.Queued = num
2868					js.Summary[tg] = summary
2869					hasSummaryChanged = true
2870				}
2871			} else {
2872				s.logger.Error("unable to update queued for job and task group", "job_id", eval.JobID, "task_group", tg, "namespace", eval.Namespace)
2873			}
2874		}
2875
2876		// Insert the job summary
2877		if hasSummaryChanged {
2878			js.ModifyIndex = index
2879			if err := txn.Insert("job_summary", js); err != nil {
2880				return fmt.Errorf("job summary insert failed: %v", err)
2881			}
2882			if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
2883				return fmt.Errorf("index update failed: %v", err)
2884			}
2885		}
2886	}
2887
2888	// Check if the job has any blocked evaluations and cancel them
2889	if eval.Status == structs.EvalStatusComplete && len(eval.FailedTGAllocs) == 0 {
2890		// Get the blocked evaluation for a job if it exists
2891		iter, err := txn.Get("evals", "job", eval.Namespace, eval.JobID, structs.EvalStatusBlocked)
2892		if err != nil {
2893			return fmt.Errorf("failed to get blocked evals for job %q in namespace %q: %v", eval.JobID, eval.Namespace, err)
2894		}
2895
2896		var blocked []*structs.Evaluation
2897		for {
2898			raw := iter.Next()
2899			if raw == nil {
2900				break
2901			}
2902			blocked = append(blocked, raw.(*structs.Evaluation))
2903		}
2904
2905		// Go through and update the evals
2906		for _, eval := range blocked {
2907			newEval := eval.Copy()
2908			newEval.Status = structs.EvalStatusCancelled
2909			newEval.StatusDescription = fmt.Sprintf("evaluation %q successful", newEval.ID)
2910			newEval.ModifyIndex = index
2911
2912			if err := txn.Insert("evals", newEval); err != nil {
2913				return fmt.Errorf("eval insert failed: %v", err)
2914			}
2915		}
2916	}
2917
2918	// Insert the eval
2919	if err := txn.Insert("evals", eval); err != nil {
2920		return fmt.Errorf("eval insert failed: %v", err)
2921	}
2922	if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil {
2923		return fmt.Errorf("index update failed: %v", err)
2924	}
2925	return nil
2926}
2927
2928// updateEvalModifyIndex is used to update the modify index of an evaluation that has been
2929// through a scheduler pass. This is done as part of plan apply. It ensures that when a subsequent
2930// scheduler workers process a re-queued evaluation it sees any partial updates from the plan apply.
2931func (s *StateStore) updateEvalModifyIndex(txn *txn, index uint64, evalID string) error {
2932	// Lookup the evaluation
2933	existing, err := txn.First("evals", "id", evalID)
2934	if err != nil {
2935		return fmt.Errorf("eval lookup failed: %v", err)
2936	}
2937	if existing == nil {
2938		s.logger.Error("unable to find eval", "eval_id", evalID)
2939		return fmt.Errorf("unable to find eval id %q", evalID)
2940	}
2941	eval := existing.(*structs.Evaluation).Copy()
2942	// Update the indexes
2943	eval.ModifyIndex = index
2944
2945	// Insert the eval
2946	if err := txn.Insert("evals", eval); err != nil {
2947		return fmt.Errorf("eval insert failed: %v", err)
2948	}
2949	if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil {
2950		return fmt.Errorf("index update failed: %v", err)
2951	}
2952	return nil
2953}
2954
2955// DeleteEval is used to delete an evaluation
2956func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error {
2957	txn := s.db.WriteTxn(index)
2958	defer txn.Abort()
2959
2960	jobs := make(map[structs.NamespacedID]string, len(evals))
2961	for _, eval := range evals {
2962		existing, err := txn.First("evals", "id", eval)
2963		if err != nil {
2964			return fmt.Errorf("eval lookup failed: %v", err)
2965		}
2966		if existing == nil {
2967			continue
2968		}
2969		if err := txn.Delete("evals", existing); err != nil {
2970			return fmt.Errorf("eval delete failed: %v", err)
2971		}
2972		eval := existing.(*structs.Evaluation)
2973
2974		tuple := structs.NamespacedID{
2975			ID:        eval.JobID,
2976			Namespace: eval.Namespace,
2977		}
2978		jobs[tuple] = ""
2979	}
2980
2981	for _, alloc := range allocs {
2982		raw, err := txn.First("allocs", "id", alloc)
2983		if err != nil {
2984			return fmt.Errorf("alloc lookup failed: %v", err)
2985		}
2986		if raw == nil {
2987			continue
2988		}
2989		if err := txn.Delete("allocs", raw); err != nil {
2990			return fmt.Errorf("alloc delete failed: %v", err)
2991		}
2992	}
2993
2994	// Update the indexes
2995	if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil {
2996		return fmt.Errorf("index update failed: %v", err)
2997	}
2998	if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
2999		return fmt.Errorf("index update failed: %v", err)
3000	}
3001
3002	// Set the job's status
3003	if err := s.setJobStatuses(index, txn, jobs, true); err != nil {
3004		return fmt.Errorf("setting job status failed: %v", err)
3005	}
3006
3007	return txn.Commit()
3008}
3009
3010// EvalByID is used to lookup an eval by its ID
3011func (s *StateStore) EvalByID(ws memdb.WatchSet, id string) (*structs.Evaluation, error) {
3012	txn := s.db.ReadTxn()
3013
3014	watchCh, existing, err := txn.FirstWatch("evals", "id", id)
3015	if err != nil {
3016		return nil, fmt.Errorf("eval lookup failed: %v", err)
3017	}
3018
3019	ws.Add(watchCh)
3020
3021	if existing != nil {
3022		return existing.(*structs.Evaluation), nil
3023	}
3024	return nil, nil
3025}
3026
3027// EvalsByIDPrefix is used to lookup evaluations by prefix in a particular
3028// namespace
3029func (s *StateStore) EvalsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) {
3030	txn := s.db.ReadTxn()
3031
3032	// Get an iterator over all evals by the id prefix
3033	iter, err := txn.Get("evals", "id_prefix", id)
3034	if err != nil {
3035		return nil, fmt.Errorf("eval lookup failed: %v", err)
3036	}
3037
3038	ws.Add(iter.WatchCh())
3039
3040	// Wrap the iterator in a filter
3041	wrap := memdb.NewFilterIterator(iter, evalNamespaceFilter(namespace))
3042	return wrap, nil
3043}
3044
3045// evalNamespaceFilter returns a filter function that filters all evaluations
3046// not in the given namespace.
3047func evalNamespaceFilter(namespace string) func(interface{}) bool {
3048	return func(raw interface{}) bool {
3049		eval, ok := raw.(*structs.Evaluation)
3050		if !ok {
3051			return true
3052		}
3053
3054		return eval.Namespace != namespace
3055	}
3056}
3057
3058// EvalsByJob returns all the evaluations by job id
3059func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]*structs.Evaluation, error) {
3060	txn := s.db.ReadTxn()
3061
3062	// Get an iterator over the node allocations
3063	iter, err := txn.Get("evals", "job_prefix", namespace, jobID)
3064	if err != nil {
3065		return nil, err
3066	}
3067
3068	ws.Add(iter.WatchCh())
3069
3070	var out []*structs.Evaluation
3071	for {
3072		raw := iter.Next()
3073		if raw == nil {
3074			break
3075		}
3076
3077		e := raw.(*structs.Evaluation)
3078
3079		// Filter non-exact matches
3080		if e.JobID != jobID {
3081			continue
3082		}
3083
3084		out = append(out, e)
3085	}
3086	return out, nil
3087}
3088
3089// Evals returns an iterator over all the evaluations
3090func (s *StateStore) Evals(ws memdb.WatchSet) (memdb.ResultIterator, error) {
3091	txn := s.db.ReadTxn()
3092
3093	// Walk the entire table
3094	iter, err := txn.Get("evals", "id")
3095	if err != nil {
3096		return nil, err
3097	}
3098
3099	ws.Add(iter.WatchCh())
3100
3101	return iter, nil
3102}
3103
3104// EvalsByNamespace returns an iterator over all the evaluations in the given
3105// namespace
3106func (s *StateStore) EvalsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
3107	txn := s.db.ReadTxn()
3108
3109	// Walk the entire table
3110	iter, err := txn.Get("evals", "namespace", namespace)
3111	if err != nil {
3112		return nil, err
3113	}
3114
3115	ws.Add(iter.WatchCh())
3116
3117	return iter, nil
3118}
3119
3120// UpdateAllocsFromClient is used to update an allocation based on input
3121// from a client. While the schedulers are the authority on the allocation for
3122// most things, some updates are authoritative from the client. Specifically,
3123// the desired state comes from the schedulers, while the actual state comes
3124// from clients.
3125func (s *StateStore) UpdateAllocsFromClient(msgType structs.MessageType, index uint64, allocs []*structs.Allocation) error {
3126	txn := s.db.WriteTxnMsgT(msgType, index)
3127	defer txn.Abort()
3128
3129	// Handle each of the updated allocations
3130	for _, alloc := range allocs {
3131		if err := s.nestedUpdateAllocFromClient(txn, index, alloc); err != nil {
3132			return err
3133		}
3134	}
3135
3136	// Update the indexes
3137	if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
3138		return fmt.Errorf("index update failed: %v", err)
3139	}
3140
3141	return txn.Commit()
3142}
3143
3144// nestedUpdateAllocFromClient is used to nest an update of an allocation with client status
3145func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc *structs.Allocation) error {
3146	// Look for existing alloc
3147	existing, err := txn.First("allocs", "id", alloc.ID)
3148	if err != nil {
3149		return fmt.Errorf("alloc lookup failed: %v", err)
3150	}
3151
3152	// Nothing to do if this does not exist
3153	if existing == nil {
3154		return nil
3155	}
3156	exist := existing.(*structs.Allocation)
3157
3158	// Copy everything from the existing allocation
3159	copyAlloc := exist.Copy()
3160
3161	// Pull in anything the client is the authority on
3162	copyAlloc.ClientStatus = alloc.ClientStatus
3163	copyAlloc.ClientDescription = alloc.ClientDescription
3164	copyAlloc.TaskStates = alloc.TaskStates
3165	copyAlloc.NetworkStatus = alloc.NetworkStatus
3166
3167	// The client can only set its deployment health and timestamp, so just take
3168	// those
3169	if copyAlloc.DeploymentStatus != nil && alloc.DeploymentStatus != nil {
3170		oldHasHealthy := copyAlloc.DeploymentStatus.HasHealth()
3171		newHasHealthy := alloc.DeploymentStatus.HasHealth()
3172
3173		// We got new health information from the client
3174		if newHasHealthy && (!oldHasHealthy || *copyAlloc.DeploymentStatus.Healthy != *alloc.DeploymentStatus.Healthy) {
3175			// Updated deployment health and timestamp
3176			copyAlloc.DeploymentStatus.Healthy = helper.BoolToPtr(*alloc.DeploymentStatus.Healthy)
3177			copyAlloc.DeploymentStatus.Timestamp = alloc.DeploymentStatus.Timestamp
3178			copyAlloc.DeploymentStatus.ModifyIndex = index
3179		}
3180	} else if alloc.DeploymentStatus != nil {
3181		// First time getting a deployment status so copy everything and just
3182		// set the index
3183		copyAlloc.DeploymentStatus = alloc.DeploymentStatus.Copy()
3184		copyAlloc.DeploymentStatus.ModifyIndex = index
3185	}
3186
3187	// Update the modify index
3188	copyAlloc.ModifyIndex = index
3189
3190	// Update the modify time
3191	copyAlloc.ModifyTime = alloc.ModifyTime
3192
3193	if err := s.updateDeploymentWithAlloc(index, copyAlloc, exist, txn); err != nil {
3194		return fmt.Errorf("error updating deployment: %v", err)
3195	}
3196
3197	if err := s.updateSummaryWithAlloc(index, copyAlloc, exist, txn); err != nil {
3198		return fmt.Errorf("error updating job summary: %v", err)
3199	}
3200
3201	if err := s.updateEntWithAlloc(index, copyAlloc, exist, txn); err != nil {
3202		return err
3203	}
3204
3205	if err := s.updatePluginWithAlloc(index, copyAlloc, txn); err != nil {
3206		return err
3207	}
3208
3209	// Update the allocation
3210	if err := txn.Insert("allocs", copyAlloc); err != nil {
3211		return fmt.Errorf("alloc insert failed: %v", err)
3212	}
3213
3214	// Set the job's status
3215	forceStatus := ""
3216	if !copyAlloc.TerminalStatus() {
3217		forceStatus = structs.JobStatusRunning
3218	}
3219
3220	tuple := structs.NamespacedID{
3221		ID:        exist.JobID,
3222		Namespace: exist.Namespace,
3223	}
3224	jobs := map[structs.NamespacedID]string{tuple: forceStatus}
3225
3226	if err := s.setJobStatuses(index, txn, jobs, false); err != nil {
3227		return fmt.Errorf("setting job status failed: %v", err)
3228	}
3229	return nil
3230}
3231
3232// UpsertAllocs is used to evict a set of allocations and allocate new ones at
3233// the same time.
3234func (s *StateStore) UpsertAllocs(msgType structs.MessageType, index uint64, allocs []*structs.Allocation) error {
3235	txn := s.db.WriteTxn(index)
3236	defer txn.Abort()
3237	if err := s.upsertAllocsImpl(index, allocs, txn); err != nil {
3238		return err
3239	}
3240	return txn.Commit()
3241}
3242
3243// upsertAllocs is the actual implementation of UpsertAllocs so that it may be
3244// used with an existing transaction.
3245func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation, txn *txn) error {
3246	// Handle the allocations
3247	jobs := make(map[structs.NamespacedID]string, 1)
3248	for _, alloc := range allocs {
3249		existing, err := txn.First("allocs", "id", alloc.ID)
3250		if err != nil {
3251			return fmt.Errorf("alloc lookup failed: %v", err)
3252		}
3253		exist, _ := existing.(*structs.Allocation)
3254
3255		if exist == nil {
3256			alloc.CreateIndex = index
3257			alloc.ModifyIndex = index
3258			alloc.AllocModifyIndex = index
3259			if alloc.DeploymentStatus != nil {
3260				alloc.DeploymentStatus.ModifyIndex = index
3261			}
3262
3263			// Issue https://github.com/hashicorp/nomad/issues/2583 uncovered
3264			// the a race between a forced garbage collection and the scheduler
3265			// marking an allocation as terminal. The issue is that the
3266			// allocation from the scheduler has its job normalized and the FSM
3267			// will only denormalize if the allocation is not terminal.  However
3268			// if the allocation is garbage collected, that will result in a
3269			// allocation being upserted for the first time without a job
3270			// attached. By returning an error here, it will cause the FSM to
3271			// error, causing the plan_apply to error and thus causing the
3272			// evaluation to be failed. This will force an index refresh that
3273			// should solve this issue.
3274			if alloc.Job == nil {
3275				return fmt.Errorf("attempting to upsert allocation %q without a job", alloc.ID)
3276			}
3277		} else {
3278			alloc.CreateIndex = exist.CreateIndex
3279			alloc.ModifyIndex = index
3280			alloc.AllocModifyIndex = index
3281
3282			// Keep the clients task states
3283			alloc.TaskStates = exist.TaskStates
3284
3285			// If the scheduler is marking this allocation as lost we do not
3286			// want to reuse the status of the existing allocation.
3287			if alloc.ClientStatus != structs.AllocClientStatusLost {
3288				alloc.ClientStatus = exist.ClientStatus
3289				alloc.ClientDescription = exist.ClientDescription
3290			}
3291
3292			// The job has been denormalized so re-attach the original job
3293			if alloc.Job == nil {
3294				alloc.Job = exist.Job
3295			}
3296		}
3297
3298		// OPTIMIZATION:
3299		// These should be given a map of new to old allocation and the updates
3300		// should be one on all changes. The current implementation causes O(n)
3301		// lookups/copies/insertions rather than O(1)
3302		if err := s.updateDeploymentWithAlloc(index, alloc, exist, txn); err != nil {
3303			return fmt.Errorf("error updating deployment: %v", err)
3304		}
3305
3306		if err := s.updateSummaryWithAlloc(index, alloc, exist, txn); err != nil {
3307			return fmt.Errorf("error updating job summary: %v", err)
3308		}
3309
3310		if err := s.updateEntWithAlloc(index, alloc, exist, txn); err != nil {
3311			return err
3312		}
3313
3314		if err := s.updatePluginWithAlloc(index, alloc, txn); err != nil {
3315			return err
3316		}
3317
3318		if err := txn.Insert("allocs", alloc); err != nil {
3319			return fmt.Errorf("alloc insert failed: %v", err)
3320		}
3321
3322		if alloc.PreviousAllocation != "" {
3323			prevAlloc, err := txn.First("allocs", "id", alloc.PreviousAllocation)
3324			if err != nil {
3325				return fmt.Errorf("alloc lookup failed: %v", err)
3326			}
3327			existingPrevAlloc, _ := prevAlloc.(*structs.Allocation)
3328			if existingPrevAlloc != nil {
3329				prevAllocCopy := existingPrevAlloc.Copy()
3330				prevAllocCopy.NextAllocation = alloc.ID
3331				prevAllocCopy.ModifyIndex = index
3332				if err := txn.Insert("allocs", prevAllocCopy); err != nil {
3333					return fmt.Errorf("alloc insert failed: %v", err)
3334				}
3335			}
3336		}
3337
3338		// If the allocation is running, force the job to running status.
3339		forceStatus := ""
3340		if !alloc.TerminalStatus() {
3341			forceStatus = structs.JobStatusRunning
3342		}
3343
3344		tuple := structs.NamespacedID{
3345			ID:        alloc.JobID,
3346			Namespace: alloc.Namespace,
3347		}
3348		jobs[tuple] = forceStatus
3349	}
3350
3351	// Update the indexes
3352	if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
3353		return fmt.Errorf("index update failed: %v", err)
3354	}
3355
3356	// Set the job's status
3357	if err := s.setJobStatuses(index, txn, jobs, false); err != nil {
3358		return fmt.Errorf("setting job status failed: %v", err)
3359	}
3360
3361	return nil
3362}
3363
3364// UpdateAllocsDesiredTransitions is used to update a set of allocations
3365// desired transitions.
3366func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, index uint64, allocs map[string]*structs.DesiredTransition,
3367	evals []*structs.Evaluation) error {
3368
3369	txn := s.db.WriteTxnMsgT(msgType, index)
3370	defer txn.Abort()
3371
3372	// Handle each of the updated allocations
3373	for id, transition := range allocs {
3374		if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transition); err != nil {
3375			return err
3376		}
3377	}
3378
3379	for _, eval := range evals {
3380		if err := s.nestedUpsertEval(txn, index, eval); err != nil {
3381			return err
3382		}
3383	}
3384
3385	// Update the indexes
3386	if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
3387		return fmt.Errorf("index update failed: %v", err)
3388	}
3389
3390	return txn.Commit()
3391}
3392
3393// nestedUpdateAllocDesiredTransition is used to nest an update of an
3394// allocations desired transition
3395func (s *StateStore) nestedUpdateAllocDesiredTransition(
3396	txn *txn, index uint64, allocID string,
3397	transition *structs.DesiredTransition) error {
3398
3399	// Look for existing alloc
3400	existing, err := txn.First("allocs", "id", allocID)
3401	if err != nil {
3402		return fmt.Errorf("alloc lookup failed: %v", err)
3403	}
3404
3405	// Nothing to do if this does not exist
3406	if existing == nil {
3407		return nil
3408	}
3409	exist := existing.(*structs.Allocation)
3410
3411	// Copy everything from the existing allocation
3412	copyAlloc := exist.Copy()
3413
3414	// Merge the desired transitions
3415	copyAlloc.DesiredTransition.Merge(transition)
3416
3417	// Update the modify index
3418	copyAlloc.ModifyIndex = index
3419
3420	// Update the allocation
3421	if err := txn.Insert("allocs", copyAlloc); err != nil {
3422		return fmt.Errorf("alloc insert failed: %v", err)
3423	}
3424
3425	return nil
3426}
3427
3428// AllocByID is used to lookup an allocation by its ID
3429func (s *StateStore) AllocByID(ws memdb.WatchSet, id string) (*structs.Allocation, error) {
3430	txn := s.db.ReadTxn()
3431	return s.allocByIDImpl(txn, ws, id)
3432}
3433
3434// allocByIDImpl retrives an allocation and is called under and existing
3435// transaction. An optional watch set can be passed to add allocations to the
3436// watch set
3437func (s *StateStore) allocByIDImpl(txn Txn, ws memdb.WatchSet, id string) (*structs.Allocation, error) {
3438	watchCh, raw, err := txn.FirstWatch("allocs", "id", id)
3439	if err != nil {
3440		return nil, fmt.Errorf("alloc lookup failed: %v", err)
3441	}
3442
3443	ws.Add(watchCh)
3444
3445	if raw == nil {
3446		return nil, nil
3447	}
3448	alloc := raw.(*structs.Allocation)
3449	return alloc, nil
3450}
3451
3452// AllocsByIDPrefix is used to lookup allocs by prefix
3453func (s *StateStore) AllocsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) {
3454	txn := s.db.ReadTxn()
3455
3456	iter, err := txn.Get("allocs", "id_prefix", id)
3457	if err != nil {
3458		return nil, fmt.Errorf("alloc lookup failed: %v", err)
3459	}
3460
3461	ws.Add(iter.WatchCh())
3462
3463	// Wrap the iterator in a filter
3464	wrap := memdb.NewFilterIterator(iter, allocNamespaceFilter(namespace))
3465	return wrap, nil
3466}
3467
3468// allocNamespaceFilter returns a filter function that filters all allocations
3469// not in the given namespace.
3470func allocNamespaceFilter(namespace string) func(interface{}) bool {
3471	return func(raw interface{}) bool {
3472		alloc, ok := raw.(*structs.Allocation)
3473		if !ok {
3474			return true
3475		}
3476
3477		return alloc.Namespace != namespace
3478	}
3479}
3480
3481// AllocsByIDPrefix is used to lookup allocs by prefix
3482func (s *StateStore) AllocsByIDPrefixAllNSs(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) {
3483	txn := s.db.ReadTxn()
3484
3485	iter, err := txn.Get("allocs", "id_prefix", prefix)
3486	if err != nil {
3487		return nil, fmt.Errorf("alloc lookup failed: %v", err)
3488	}
3489
3490	ws.Add(iter.WatchCh())
3491
3492	return iter, nil
3493}
3494
3495// AllocsByNode returns all the allocations by node
3496func (s *StateStore) AllocsByNode(ws memdb.WatchSet, node string) ([]*structs.Allocation, error) {
3497	txn := s.db.ReadTxn()
3498
3499	return allocsByNodeTxn(txn, ws, node)
3500}
3501
3502func allocsByNodeTxn(txn ReadTxn, ws memdb.WatchSet, node string) ([]*structs.Allocation, error) {
3503	// Get an iterator over the node allocations, using only the
3504	// node prefix which ignores the terminal status
3505	iter, err := txn.Get("allocs", "node_prefix", node)
3506	if err != nil {
3507		return nil, err
3508	}
3509
3510	ws.Add(iter.WatchCh())
3511
3512	var out []*structs.Allocation
3513	for {
3514		raw := iter.Next()
3515		if raw == nil {
3516			break
3517		}
3518		out = append(out, raw.(*structs.Allocation))
3519	}
3520	return out, nil
3521}
3522
3523// AllocsByNode returns all the allocations by node and terminal status
3524func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error) {
3525	txn := s.db.ReadTxn()
3526
3527	// Get an iterator over the node allocations
3528	iter, err := txn.Get("allocs", "node", node, terminal)
3529	if err != nil {
3530		return nil, err
3531	}
3532
3533	ws.Add(iter.WatchCh())
3534
3535	var out []*structs.Allocation
3536	for {
3537		raw := iter.Next()
3538		if raw == nil {
3539			break
3540		}
3541		out = append(out, raw.(*structs.Allocation))
3542	}
3543	return out, nil
3544}
3545
3546// AllocsByJob returns allocations by job id
3547func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, anyCreateIndex bool) ([]*structs.Allocation, error) {
3548	txn := s.db.ReadTxn()
3549
3550	// Get the job
3551	var job *structs.Job
3552	rawJob, err := txn.First("jobs", "id", namespace, jobID)
3553	if err != nil {
3554		return nil, err
3555	}
3556	if rawJob != nil {
3557		job = rawJob.(*structs.Job)
3558	}
3559
3560	// Get an iterator over the node allocations
3561	iter, err := txn.Get("allocs", "job", namespace, jobID)
3562	if err != nil {
3563		return nil, err
3564	}
3565
3566	ws.Add(iter.WatchCh())
3567
3568	var out []*structs.Allocation
3569	for {
3570		raw := iter.Next()
3571		if raw == nil {
3572			break
3573		}
3574
3575		alloc := raw.(*structs.Allocation)
3576		// If the allocation belongs to a job with the same ID but a different
3577		// create index and we are not getting all the allocations whose Jobs
3578		// matches the same Job ID then we skip it
3579		if !anyCreateIndex && job != nil && alloc.Job.CreateIndex != job.CreateIndex {
3580			continue
3581		}
3582		out = append(out, raw.(*structs.Allocation))
3583	}
3584	return out, nil
3585}
3586
3587// AllocsByEval returns all the allocations by eval id
3588func (s *StateStore) AllocsByEval(ws memdb.WatchSet, evalID string) ([]*structs.Allocation, error) {
3589	txn := s.db.ReadTxn()
3590
3591	// Get an iterator over the eval allocations
3592	iter, err := txn.Get("allocs", "eval", evalID)
3593	if err != nil {
3594		return nil, err
3595	}
3596
3597	ws.Add(iter.WatchCh())
3598
3599	var out []*structs.Allocation
3600	for {
3601		raw := iter.Next()
3602		if raw == nil {
3603			break
3604		}
3605		out = append(out, raw.(*structs.Allocation))
3606	}
3607	return out, nil
3608}
3609
3610// AllocsByDeployment returns all the allocations by deployment id
3611func (s *StateStore) AllocsByDeployment(ws memdb.WatchSet, deploymentID string) ([]*structs.Allocation, error) {
3612	txn := s.db.ReadTxn()
3613
3614	// Get an iterator over the deployments allocations
3615	iter, err := txn.Get("allocs", "deployment", deploymentID)
3616	if err != nil {
3617		return nil, err
3618	}
3619
3620	ws.Add(iter.WatchCh())
3621
3622	var out []*structs.Allocation
3623	for {
3624		raw := iter.Next()
3625		if raw == nil {
3626			break
3627		}
3628		out = append(out, raw.(*structs.Allocation))
3629	}
3630	return out, nil
3631}
3632
3633// Allocs returns an iterator over all the evaluations
3634func (s *StateStore) Allocs(ws memdb.WatchSet) (memdb.ResultIterator, error) {
3635	txn := s.db.ReadTxn()
3636
3637	// Walk the entire table
3638	iter, err := txn.Get("allocs", "id")
3639	if err != nil {
3640		return nil, err
3641	}
3642
3643	ws.Add(iter.WatchCh())
3644
3645	return iter, nil
3646}
3647
3648// AllocsByNamespace returns an iterator over all the allocations in the
3649// namespace
3650func (s *StateStore) AllocsByNamespace(ws memdb.WatchSet, namespace string) (memdb.ResultIterator, error) {
3651	txn := s.db.ReadTxn()
3652	return s.allocsByNamespaceImpl(ws, txn, namespace)
3653}
3654
3655// allocsByNamespaceImpl returns an iterator over all the allocations in the
3656// namespace
3657func (s *StateStore) allocsByNamespaceImpl(ws memdb.WatchSet, txn *txn, namespace string) (memdb.ResultIterator, error) {
3658	// Walk the entire table
3659	iter, err := txn.Get("allocs", "namespace", namespace)
3660	if err != nil {
3661		return nil, err
3662	}
3663
3664	ws.Add(iter.WatchCh())
3665
3666	return iter, nil
3667}
3668
3669// UpsertVaultAccessors is used to register a set of Vault Accessors
3670func (s *StateStore) UpsertVaultAccessor(index uint64, accessors []*structs.VaultAccessor) error {
3671	txn := s.db.WriteTxn(index)
3672	defer txn.Abort()
3673
3674	for _, accessor := range accessors {
3675		// Set the create index
3676		accessor.CreateIndex = index
3677
3678		// Insert the accessor
3679		if err := txn.Insert("vault_accessors", accessor); err != nil {
3680			return fmt.Errorf("accessor insert failed: %v", err)
3681		}
3682	}
3683
3684	if err := txn.Insert("index", &IndexEntry{"vault_accessors", index}); err != nil {
3685		return fmt.Errorf("index update failed: %v", err)
3686	}
3687
3688	return txn.Commit()
3689}
3690
3691// DeleteVaultAccessors is used to delete a set of Vault Accessors
3692func (s *StateStore) DeleteVaultAccessors(index uint64, accessors []*structs.VaultAccessor) error {
3693	txn := s.db.WriteTxn(index)
3694	defer txn.Abort()
3695
3696	// Lookup the accessor
3697	for _, accessor := range accessors {
3698		// Delete the accessor
3699		if err := txn.Delete("vault_accessors", accessor); err != nil {
3700			return fmt.Errorf("accessor delete failed: %v", err)
3701		}
3702	}
3703
3704	if err := txn.Insert("index", &IndexEntry{"vault_accessors", index}); err != nil {
3705		return fmt.Errorf("index update failed: %v", err)
3706	}
3707
3708	return txn.Commit()
3709}
3710
3711// VaultAccessor returns the given Vault accessor
3712func (s *StateStore) VaultAccessor(ws memdb.WatchSet, accessor string) (*structs.VaultAccessor, error) {
3713	txn := s.db.ReadTxn()
3714
3715	watchCh, existing, err := txn.FirstWatch("vault_accessors", "id", accessor)
3716	if err != nil {
3717		return nil, fmt.Errorf("accessor lookup failed: %v", err)
3718	}
3719
3720	ws.Add(watchCh)
3721
3722	if existing != nil {
3723		return existing.(*structs.VaultAccessor), nil
3724	}
3725
3726	return nil, nil
3727}
3728
3729// VaultAccessors returns an iterator of Vault accessors.
3730func (s *StateStore) VaultAccessors(ws memdb.WatchSet) (memdb.ResultIterator, error) {
3731	txn := s.db.ReadTxn()
3732
3733	iter, err := txn.Get("vault_accessors", "id")
3734	if err != nil {
3735		return nil, err
3736	}
3737
3738	ws.Add(iter.WatchCh())
3739
3740	return iter, nil
3741}
3742
3743// VaultAccessorsByAlloc returns all the Vault accessors by alloc id
3744func (s *StateStore) VaultAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([]*structs.VaultAccessor, error) {
3745	txn := s.db.ReadTxn()
3746
3747	// Get an iterator over the accessors
3748	iter, err := txn.Get("vault_accessors", "alloc_id", allocID)
3749	if err != nil {
3750		return nil, err
3751	}
3752
3753	ws.Add(iter.WatchCh())
3754
3755	var out []*structs.VaultAccessor
3756	for {
3757		raw := iter.Next()
3758		if raw == nil {
3759			break
3760		}
3761		out = append(out, raw.(*structs.VaultAccessor))
3762	}
3763	return out, nil
3764}
3765
3766// VaultAccessorsByNode returns all the Vault accessors by node id
3767func (s *StateStore) VaultAccessorsByNode(ws memdb.WatchSet, nodeID string) ([]*structs.VaultAccessor, error) {
3768	txn := s.db.ReadTxn()
3769
3770	// Get an iterator over the accessors
3771	iter, err := txn.Get("vault_accessors", "node_id", nodeID)
3772	if err != nil {
3773		return nil, err
3774	}
3775
3776	ws.Add(iter.WatchCh())
3777
3778	var out []*structs.VaultAccessor
3779	for {
3780		raw := iter.Next()
3781		if raw == nil {
3782			break
3783		}
3784		out = append(out, raw.(*structs.VaultAccessor))
3785	}
3786	return out, nil
3787}
3788
3789func indexEntry(table string, index uint64) *IndexEntry {
3790	return &IndexEntry{
3791		Key:   table,
3792		Value: index,
3793	}
3794}
3795
3796const siTokenAccessorTable = "si_token_accessors"
3797
3798// UpsertSITokenAccessors is used to register a set of Service Identity token accessors.
3799func (s *StateStore) UpsertSITokenAccessors(index uint64, accessors []*structs.SITokenAccessor) error {
3800	txn := s.db.WriteTxn(index)
3801	defer txn.Abort()
3802
3803	for _, accessor := range accessors {
3804		// set the create index
3805		accessor.CreateIndex = index
3806
3807		// insert the accessor
3808		if err := txn.Insert(siTokenAccessorTable, accessor); err != nil {
3809			return errors.Wrap(err, "accessor insert failed")
3810		}
3811	}
3812
3813	// update the index for this table
3814	if err := txn.Insert("index", indexEntry(siTokenAccessorTable, index)); err != nil {
3815		return errors.Wrap(err, "index update failed")
3816	}
3817
3818	return txn.Commit()
3819}
3820
3821// DeleteSITokenAccessors is used to delete a set of Service Identity token accessors.
3822func (s *StateStore) DeleteSITokenAccessors(index uint64, accessors []*structs.SITokenAccessor) error {
3823	txn := s.db.WriteTxn(index)
3824	defer txn.Abort()
3825
3826	// Lookup each accessor
3827	for _, accessor := range accessors {
3828		// Delete the accessor
3829		if err := txn.Delete(siTokenAccessorTable, accessor); err != nil {
3830			return errors.Wrap(err, "accessor delete failed")
3831		}
3832	}
3833
3834	// update the index for this table
3835	if err := txn.Insert("index", indexEntry(siTokenAccessorTable, index)); err != nil {
3836		return errors.Wrap(err, "index update failed")
3837	}
3838
3839	return txn.Commit()
3840}
3841
3842// SITokenAccessor returns the given Service Identity token accessor.
3843func (s *StateStore) SITokenAccessor(ws memdb.WatchSet, accessorID string) (*structs.SITokenAccessor, error) {
3844	txn := s.db.ReadTxn()
3845	defer txn.Abort()
3846
3847	watchCh, existing, err := txn.FirstWatch(siTokenAccessorTable, "id", accessorID)
3848	if err != nil {
3849		return nil, errors.Wrap(err, "accessor lookup failed")
3850	}
3851
3852	ws.Add(watchCh)
3853
3854	if existing != nil {
3855		return existing.(*structs.SITokenAccessor), nil
3856	}
3857
3858	return nil, nil
3859}
3860
3861// SITokenAccessors returns an iterator of Service Identity token accessors.
3862func (s *StateStore) SITokenAccessors(ws memdb.WatchSet) (memdb.ResultIterator, error) {
3863	txn := s.db.ReadTxn()
3864	defer txn.Abort()
3865
3866	iter, err := txn.Get(siTokenAccessorTable, "id")
3867	if err != nil {
3868		return nil, err
3869	}
3870
3871	ws.Add(iter.WatchCh())
3872
3873	return iter, nil
3874}
3875
3876// SITokenAccessorsByAlloc returns all the Service Identity token accessors by alloc ID.
3877func (s *StateStore) SITokenAccessorsByAlloc(ws memdb.WatchSet, allocID string) ([]*structs.SITokenAccessor, error) {
3878	txn := s.db.ReadTxn()
3879	defer txn.Abort()
3880
3881	// Get an iterator over the accessors
3882	iter, err := txn.Get(siTokenAccessorTable, "alloc_id", allocID)
3883	if err != nil {
3884		return nil, err
3885	}
3886
3887	ws.Add(iter.WatchCh())
3888
3889	var result []*structs.SITokenAccessor
3890	for raw := iter.Next(); raw != nil; raw = iter.Next() {
3891		result = append(result, raw.(*structs.SITokenAccessor))
3892	}
3893
3894	return result, nil
3895}
3896
3897// SITokenAccessorsByNode returns all the Service Identity token accessors by node ID.
3898func (s *StateStore) SITokenAccessorsByNode(ws memdb.WatchSet, nodeID string) ([]*structs.SITokenAccessor, error) {
3899	txn := s.db.ReadTxn()
3900	defer txn.Abort()
3901
3902	// Get an iterator over the accessors
3903	iter, err := txn.Get(siTokenAccessorTable, "node_id", nodeID)
3904	if err != nil {
3905		return nil, err
3906	}
3907
3908	ws.Add(iter.WatchCh())
3909
3910	var result []*structs.SITokenAccessor
3911	for raw := iter.Next(); raw != nil; raw = iter.Next() {
3912		result = append(result, raw.(*structs.SITokenAccessor))
3913	}
3914
3915	return result, nil
3916}
3917
3918// UpdateDeploymentStatus is used to make deployment status updates and
3919// potentially make a evaluation
3920func (s *StateStore) UpdateDeploymentStatus(msgType structs.MessageType, index uint64, req *structs.DeploymentStatusUpdateRequest) error {
3921	txn := s.db.WriteTxnMsgT(msgType, index)
3922	defer txn.Abort()
3923
3924	if err := s.updateDeploymentStatusImpl(index, req.DeploymentUpdate, txn); err != nil {
3925		return err
3926	}
3927
3928	// Upsert the job if necessary
3929	if req.Job != nil {
3930		if err := s.upsertJobImpl(index, req.Job, false, txn); err != nil {
3931			return err
3932		}
3933	}
3934
3935	// Upsert the optional eval
3936	if req.Eval != nil {
3937		if err := s.nestedUpsertEval(txn, index, req.Eval); err != nil {
3938			return err
3939		}
3940	}
3941
3942	return txn.Commit()
3943}
3944
3945// updateDeploymentStatusImpl is used to make deployment status updates
3946func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.DeploymentStatusUpdate, txn *txn) error {
3947	// Retrieve deployment
3948	ws := memdb.NewWatchSet()
3949	deployment, err := s.deploymentByIDImpl(ws, u.DeploymentID, txn)
3950	if err != nil {
3951		return err
3952	} else if deployment == nil {
3953		return fmt.Errorf("Deployment ID %q couldn't be updated as it does not exist", u.DeploymentID)
3954	} else if !deployment.Active() {
3955		return fmt.Errorf("Deployment %q has terminal status %q:", deployment.ID, deployment.Status)
3956	}
3957
3958	// Apply the new status
3959	copy := deployment.Copy()
3960	copy.Status = u.Status
3961	copy.StatusDescription = u.StatusDescription
3962	copy.ModifyIndex = index
3963
3964	// Insert the deployment
3965	if err := txn.Insert("deployment", copy); err != nil {
3966		return err
3967	}
3968
3969	// Update the index
3970	if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil {
3971		return fmt.Errorf("index update failed: %v", err)
3972	}
3973
3974	// If the deployment is being marked as complete, set the job to stable.
3975	if copy.Status == structs.DeploymentStatusSuccessful {
3976		if err := s.updateJobStabilityImpl(index, copy.Namespace, copy.JobID, copy.JobVersion, true, txn); err != nil {
3977			return fmt.Errorf("failed to update job stability: %v", err)
3978		}
3979	}
3980
3981	return nil
3982}
3983
3984// UpdateJobStability updates the stability of the given job and version to the
3985// desired status.
3986func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, jobVersion uint64, stable bool) error {
3987	txn := s.db.WriteTxn(index)
3988	defer txn.Abort()
3989
3990	if err := s.updateJobStabilityImpl(index, namespace, jobID, jobVersion, stable, txn); err != nil {
3991		return err
3992	}
3993
3994	return txn.Commit()
3995}
3996
3997// updateJobStabilityImpl updates the stability of the given job and version
3998func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID string, jobVersion uint64, stable bool, txn *txn) error {
3999	// Get the job that is referenced
4000	job, err := s.jobByIDAndVersionImpl(nil, namespace, jobID, jobVersion, txn)
4001	if err != nil {
4002		return err
4003	}
4004
4005	// Has already been cleared, nothing to do
4006	if job == nil {
4007		return nil
4008	}
4009
4010	// If the job already has the desired stability, nothing to do
4011	if job.Stable == stable {
4012		return nil
4013	}
4014
4015	copy := job.Copy()
4016	copy.Stable = stable
4017	return s.upsertJobImpl(index, copy, true, txn)
4018}
4019
4020// UpdateDeploymentPromotion is used to promote canaries in a deployment and
4021// potentially make a evaluation
4022func (s *StateStore) UpdateDeploymentPromotion(msgType structs.MessageType, index uint64, req *structs.ApplyDeploymentPromoteRequest) error {
4023	txn := s.db.WriteTxnMsgT(msgType, index)
4024	defer txn.Abort()
4025
4026	// Retrieve deployment and ensure it is not terminal and is active
4027	ws := memdb.NewWatchSet()
4028	deployment, err := s.deploymentByIDImpl(ws, req.DeploymentID, txn)
4029	if err != nil {
4030		return err
4031	} else if deployment == nil {
4032		return fmt.Errorf("Deployment ID %q couldn't be updated as it does not exist", req.DeploymentID)
4033	} else if !deployment.Active() {
4034		return fmt.Errorf("Deployment %q has terminal status %q:", deployment.ID, deployment.Status)
4035	}
4036
4037	// Retrieve effected allocations
4038	iter, err := txn.Get("allocs", "deployment", req.DeploymentID)
4039	if err != nil {
4040		return err
4041	}
4042
4043	// groupIndex is a map of groups being promoted
4044	groupIndex := make(map[string]struct{}, len(req.Groups))
4045	for _, g := range req.Groups {
4046		groupIndex[g] = struct{}{}
4047	}
4048
4049	// canaryIndex is the set of placed canaries in the deployment
4050	canaryIndex := make(map[string]struct{}, len(deployment.TaskGroups))
4051	for _, dstate := range deployment.TaskGroups {
4052		for _, c := range dstate.PlacedCanaries {
4053			canaryIndex[c] = struct{}{}
4054		}
4055	}
4056
4057	// healthyCounts is a mapping of group to the number of healthy canaries
4058	healthyCounts := make(map[string]int, len(deployment.TaskGroups))
4059
4060	// promotable is the set of allocations that we can move from canary to
4061	// non-canary
4062	var promotable []*structs.Allocation
4063
4064	for {
4065		raw := iter.Next()
4066		if raw == nil {
4067			break
4068		}
4069
4070		alloc := raw.(*structs.Allocation)
4071
4072		// Check that the alloc is a canary
4073		if _, ok := canaryIndex[alloc.ID]; !ok {
4074			continue
4075		}
4076
4077		// Check that the canary is part of a group being promoted
4078		if _, ok := groupIndex[alloc.TaskGroup]; !req.All && !ok {
4079			continue
4080		}
4081
4082		// Ensure the canaries are healthy
4083		if alloc.TerminalStatus() || !alloc.DeploymentStatus.IsHealthy() {
4084			continue
4085		}
4086
4087		healthyCounts[alloc.TaskGroup]++
4088		promotable = append(promotable, alloc)
4089	}
4090
4091	// Determine if we have enough healthy allocations
4092	var unhealthyErr multierror.Error
4093	for tg, dstate := range deployment.TaskGroups {
4094		if _, ok := groupIndex[tg]; !req.All && !ok {
4095			continue
4096		}
4097
4098		need := dstate.DesiredCanaries
4099		if need == 0 {
4100			continue
4101		}
4102
4103		if have := healthyCounts[tg]; have < need {
4104			multierror.Append(&unhealthyErr, fmt.Errorf("Task group %q has %d/%d healthy allocations", tg, have, need))
4105		}
4106	}
4107
4108	if err := unhealthyErr.ErrorOrNil(); err != nil {
4109		return err
4110	}
4111
4112	// Update deployment
4113	copy := deployment.Copy()
4114	copy.ModifyIndex = index
4115	for tg, status := range copy.TaskGroups {
4116		_, ok := groupIndex[tg]
4117		if !req.All && !ok {
4118			continue
4119		}
4120
4121		// reset the progress deadline
4122		if status.ProgressDeadline > 0 && !status.RequireProgressBy.IsZero() {
4123			status.RequireProgressBy = time.Now().Add(status.ProgressDeadline)
4124		}
4125		status.Promoted = true
4126	}
4127
4128	// If the deployment no longer needs promotion, update its status
4129	if !copy.RequiresPromotion() && copy.Status == structs.DeploymentStatusRunning {
4130		copy.StatusDescription = structs.DeploymentStatusDescriptionRunning
4131	}
4132
4133	// Insert the deployment
4134	if err := s.upsertDeploymentImpl(index, copy, txn); err != nil {
4135		return err
4136	}
4137
4138	// Upsert the optional eval
4139	if req.Eval != nil {
4140		if err := s.nestedUpsertEval(txn, index, req.Eval); err != nil {
4141			return err
4142		}
4143	}
4144
4145	// For each promotable allocation remove the canary field
4146	for _, alloc := range promotable {
4147		promoted := alloc.Copy()
4148		promoted.DeploymentStatus.Canary = false
4149		promoted.DeploymentStatus.ModifyIndex = index
4150		promoted.ModifyIndex = index
4151		promoted.AllocModifyIndex = index
4152
4153		if err := txn.Insert("allocs", promoted); err != nil {
4154			return fmt.Errorf("alloc insert failed: %v", err)
4155		}
4156	}
4157
4158	// Update the alloc index
4159	if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
4160		return fmt.Errorf("index update failed: %v", err)
4161	}
4162
4163	return txn.Commit()
4164}
4165
4166// UpdateDeploymentAllocHealth is used to update the health of allocations as
4167// part of the deployment and potentially make a evaluation
4168func (s *StateStore) UpdateDeploymentAllocHealth(msgType structs.MessageType, index uint64, req *structs.ApplyDeploymentAllocHealthRequest) error {
4169	txn := s.db.WriteTxnMsgT(msgType, index)
4170	defer txn.Abort()
4171
4172	// Retrieve deployment and ensure it is not terminal and is active
4173	ws := memdb.NewWatchSet()
4174	deployment, err := s.deploymentByIDImpl(ws, req.DeploymentID, txn)
4175	if err != nil {
4176		return err
4177	} else if deployment == nil {
4178		return fmt.Errorf("Deployment ID %q couldn't be updated as it does not exist", req.DeploymentID)
4179	} else if !deployment.Active() {
4180		return fmt.Errorf("Deployment %q has terminal status %q:", deployment.ID, deployment.Status)
4181	}
4182
4183	// Update the health status of each allocation
4184	if total := len(req.HealthyAllocationIDs) + len(req.UnhealthyAllocationIDs); total != 0 {
4185		setAllocHealth := func(id string, healthy bool, ts time.Time) error {
4186			existing, err := txn.First("allocs", "id", id)
4187			if err != nil {
4188				return fmt.Errorf("alloc %q lookup failed: %v", id, err)
4189			}
4190			if existing == nil {
4191				return fmt.Errorf("unknown alloc %q", id)
4192			}
4193
4194			old := existing.(*structs.Allocation)
4195			if old.DeploymentID != req.DeploymentID {
4196				return fmt.Errorf("alloc %q is not part of deployment %q", id, req.DeploymentID)
4197			}
4198
4199			// Set the health
4200			copy := old.Copy()
4201			if copy.DeploymentStatus == nil {
4202				copy.DeploymentStatus = &structs.AllocDeploymentStatus{}
4203			}
4204			copy.DeploymentStatus.Healthy = helper.BoolToPtr(healthy)
4205			copy.DeploymentStatus.Timestamp = ts
4206			copy.DeploymentStatus.ModifyIndex = index
4207			copy.ModifyIndex = index
4208
4209			if err := s.updateDeploymentWithAlloc(index, copy, old, txn); err != nil {
4210				return fmt.Errorf("error updating deployment: %v", err)
4211			}
4212
4213			if err := txn.Insert("allocs", copy); err != nil {
4214				return fmt.Errorf("alloc insert failed: %v", err)
4215			}
4216
4217			return nil
4218		}
4219
4220		for _, id := range req.HealthyAllocationIDs {
4221			if err := setAllocHealth(id, true, req.Timestamp); err != nil {
4222				return err
4223			}
4224		}
4225		for _, id := range req.UnhealthyAllocationIDs {
4226			if err := setAllocHealth(id, false, req.Timestamp); err != nil {
4227				return err
4228			}
4229		}
4230
4231		// Update the indexes
4232		if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
4233			return fmt.Errorf("index update failed: %v", err)
4234		}
4235	}
4236
4237	// Update the deployment status as needed.
4238	if req.DeploymentUpdate != nil {
4239		if err := s.updateDeploymentStatusImpl(index, req.DeploymentUpdate, txn); err != nil {
4240			return err
4241		}
4242	}
4243
4244	// Upsert the job if necessary
4245	if req.Job != nil {
4246		if err := s.upsertJobImpl(index, req.Job, false, txn); err != nil {
4247			return err
4248		}
4249	}
4250
4251	// Upsert the optional eval
4252	if req.Eval != nil {
4253		if err := s.nestedUpsertEval(txn, index, req.Eval); err != nil {
4254			return err
4255		}
4256	}
4257
4258	return txn.Commit()
4259}
4260
4261// LastIndex returns the greatest index value for all indexes
4262func (s *StateStore) LatestIndex() (uint64, error) {
4263	indexes, err := s.Indexes()
4264	if err != nil {
4265		return 0, err
4266	}
4267
4268	var max uint64 = 0
4269	for {
4270		raw := indexes.Next()
4271		if raw == nil {
4272			break
4273		}
4274
4275		// Prepare the request struct
4276		idx := raw.(*IndexEntry)
4277
4278		// Determine the max
4279		if idx.Value > max {
4280			max = idx.Value
4281		}
4282	}
4283
4284	return max, nil
4285}
4286
4287// Index finds the matching index value
4288func (s *StateStore) Index(name string) (uint64, error) {
4289	txn := s.db.ReadTxn()
4290
4291	// Lookup the first matching index
4292	out, err := txn.First("index", "id", name)
4293	if err != nil {
4294		return 0, err
4295	}
4296	if out == nil {
4297		return 0, nil
4298	}
4299	return out.(*IndexEntry).Value, nil
4300}
4301
4302// Indexes returns an iterator over all the indexes
4303func (s *StateStore) Indexes() (memdb.ResultIterator, error) {
4304	txn := s.db.ReadTxn()
4305
4306	// Walk the entire nodes table
4307	iter, err := txn.Get("index", "id")
4308	if err != nil {
4309		return nil, err
4310	}
4311	return iter, nil
4312}
4313
4314// ReconcileJobSummaries re-creates summaries for all jobs present in the state
4315// store
4316func (s *StateStore) ReconcileJobSummaries(index uint64) error {
4317	txn := s.db.WriteTxn(index)
4318	defer txn.Abort()
4319
4320	// Get all the jobs
4321	iter, err := txn.Get("jobs", "id")
4322	if err != nil {
4323		return err
4324	}
4325	// COMPAT: Remove after 0.11
4326	// Iterate over jobs to build a list of parent jobs and their children
4327	parentMap := make(map[string][]*structs.Job)
4328	for {
4329		rawJob := iter.Next()
4330		if rawJob == nil {
4331			break
4332		}
4333		job := rawJob.(*structs.Job)
4334		if job.ParentID != "" {
4335			children := parentMap[job.ParentID]
4336			children = append(children, job)
4337			parentMap[job.ParentID] = children
4338		}
4339	}
4340
4341	// Get all the jobs again
4342	iter, err = txn.Get("jobs", "id")
4343	if err != nil {
4344		return err
4345	}
4346
4347	for {
4348		rawJob := iter.Next()
4349		if rawJob == nil {
4350			break
4351		}
4352		job := rawJob.(*structs.Job)
4353
4354		if job.IsParameterized() || job.IsPeriodic() {
4355			// COMPAT: Remove after 0.11
4356
4357			// The following block of code fixes incorrect child summaries due to a bug
4358			// See https://github.com/hashicorp/nomad/issues/3886 for details
4359			rawSummary, err := txn.First("job_summary", "id", job.Namespace, job.ID)
4360			if err != nil {
4361				return err
4362			}
4363			if rawSummary == nil {
4364				continue
4365			}
4366
4367			oldSummary := rawSummary.(*structs.JobSummary)
4368
4369			// Create an empty summary
4370			summary := &structs.JobSummary{
4371				JobID:     job.ID,
4372				Namespace: job.Namespace,
4373				Summary:   make(map[string]structs.TaskGroupSummary),
4374				Children:  &structs.JobChildrenSummary{},
4375			}
4376
4377			// Iterate over children of this job if any to fix summary counts
4378			children := parentMap[job.ID]
4379			for _, childJob := range children {
4380				switch childJob.Status {
4381				case structs.JobStatusPending:
4382					summary.Children.Pending++
4383				case structs.JobStatusDead:
4384					summary.Children.Dead++
4385				case structs.JobStatusRunning:
4386					summary.Children.Running++
4387				}
4388			}
4389
4390			// Insert the job summary if its different
4391			if !reflect.DeepEqual(summary, oldSummary) {
4392				// Set the create index of the summary same as the job's create index
4393				// and the modify index to the current index
4394				summary.CreateIndex = job.CreateIndex
4395				summary.ModifyIndex = index
4396
4397				if err := txn.Insert("job_summary", summary); err != nil {
4398					return fmt.Errorf("error inserting job summary: %v", err)
4399				}
4400			}
4401
4402			// Done with handling a parent job, continue to next
4403			continue
4404		}
4405
4406		// Create a job summary for the job
4407		summary := &structs.JobSummary{
4408			JobID:     job.ID,
4409			Namespace: job.Namespace,
4410			Summary:   make(map[string]structs.TaskGroupSummary),
4411		}
4412		for _, tg := range job.TaskGroups {
4413			summary.Summary[tg.Name] = structs.TaskGroupSummary{}
4414		}
4415
4416		// Find all the allocations for the jobs
4417		iterAllocs, err := txn.Get("allocs", "job", job.Namespace, job.ID)
4418		if err != nil {
4419			return err
4420		}
4421
4422		// Calculate the summary for the job
4423		for {
4424			rawAlloc := iterAllocs.Next()
4425			if rawAlloc == nil {
4426				break
4427			}
4428			alloc := rawAlloc.(*structs.Allocation)
4429
4430			// Ignore the allocation if it doesn't belong to the currently
4431			// registered job. The allocation is checked because of issue #2304
4432			if alloc.Job == nil || alloc.Job.CreateIndex != job.CreateIndex {
4433				continue
4434			}
4435
4436			tg := summary.Summary[alloc.TaskGroup]
4437			switch alloc.ClientStatus {
4438			case structs.AllocClientStatusFailed:
4439				tg.Failed += 1
4440			case structs.AllocClientStatusLost:
4441				tg.Lost += 1
4442			case structs.AllocClientStatusComplete:
4443				tg.Complete += 1
4444			case structs.AllocClientStatusRunning:
4445				tg.Running += 1
4446			case structs.AllocClientStatusPending:
4447				tg.Starting += 1
4448			default:
4449				s.logger.Error("invalid client status set on allocation", "client_status", alloc.ClientStatus, "alloc_id", alloc.ID)
4450			}
4451			summary.Summary[alloc.TaskGroup] = tg
4452		}
4453
4454		// Set the create index of the summary same as the job's create index
4455		// and the modify index to the current index
4456		summary.CreateIndex = job.CreateIndex
4457		summary.ModifyIndex = index
4458
4459		// Insert the job summary
4460		if err := txn.Insert("job_summary", summary); err != nil {
4461			return fmt.Errorf("error inserting job summary: %v", err)
4462		}
4463	}
4464
4465	// Update the indexes table for job summary
4466	if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
4467		return fmt.Errorf("index update failed: %v", err)
4468	}
4469	return txn.Commit()
4470}
4471
4472// setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID.
4473// It takes a map of job IDs to an optional forceStatus string. It returns an
4474// error if the job doesn't exist or setJobStatus fails.
4475func (s *StateStore) setJobStatuses(index uint64, txn *txn,
4476	jobs map[structs.NamespacedID]string, evalDelete bool) error {
4477	for tuple, forceStatus := range jobs {
4478
4479		existing, err := txn.First("jobs", "id", tuple.Namespace, tuple.ID)
4480		if err != nil {
4481			return fmt.Errorf("job lookup failed: %v", err)
4482		}
4483
4484		if existing == nil {
4485			continue
4486		}
4487
4488		if err := s.setJobStatus(index, txn, existing.(*structs.Job), evalDelete, forceStatus); err != nil {
4489			return err
4490		}
4491
4492	}
4493
4494	return nil
4495}
4496
4497// setJobStatus sets the status of the job by looking up associated evaluations
4498// and allocations. evalDelete should be set to true if setJobStatus is being
4499// called because an evaluation is being deleted (potentially because of garbage
4500// collection). If forceStatus is non-empty, the job's status will be set to the
4501// passed status.
4502func (s *StateStore) setJobStatus(index uint64, txn *txn,
4503	job *structs.Job, evalDelete bool, forceStatus string) error {
4504
4505	// Capture the current status so we can check if there is a change
4506	oldStatus := job.Status
4507	newStatus := forceStatus
4508
4509	// If forceStatus is not set, compute the jobs status.
4510	if forceStatus == "" {
4511		var err error
4512		newStatus, err = s.getJobStatus(txn, job, evalDelete)
4513		if err != nil {
4514			return err
4515		}
4516	}
4517
4518	// Fast-path if the job has not changed.
4519	if oldStatus == newStatus {
4520		return nil
4521	}
4522
4523	// Copy and update the existing job
4524	updated := job.Copy()
4525	updated.Status = newStatus
4526	updated.ModifyIndex = index
4527
4528	// Insert the job
4529	if err := txn.Insert("jobs", updated); err != nil {
4530		return fmt.Errorf("job insert failed: %v", err)
4531	}
4532	if err := txn.Insert("index", &IndexEntry{"jobs", index}); err != nil {
4533		return fmt.Errorf("index update failed: %v", err)
4534	}
4535
4536	// Update the children summary
4537	if err := s.setJobSummary(txn, updated, index, oldStatus, newStatus); err != nil {
4538		return fmt.Errorf("job summary update failed %w", err)
4539	}
4540	return nil
4541}
4542
4543func (s *StateStore) setJobSummary(txn *txn, updated *structs.Job, index uint64, oldStatus, newStatus string) error {
4544	if updated.ParentID == "" {
4545		return nil
4546	}
4547
4548	// Try to update the summary of the parent job summary
4549	summaryRaw, err := txn.First("job_summary", "id", updated.Namespace, updated.ParentID)
4550	if err != nil {
4551		return fmt.Errorf("unable to retrieve summary for parent job: %v", err)
4552	}
4553
4554	// Only continue if the summary exists. It could not exist if the parent
4555	// job was removed
4556	if summaryRaw != nil {
4557		existing := summaryRaw.(*structs.JobSummary)
4558		pSummary := existing.Copy()
4559		if pSummary.Children == nil {
4560			pSummary.Children = new(structs.JobChildrenSummary)
4561		}
4562
4563		// Determine the transition and update the correct fields
4564		children := pSummary.Children
4565
4566		// Decrement old status
4567		if oldStatus != "" {
4568			switch oldStatus {
4569			case structs.JobStatusPending:
4570				children.Pending--
4571			case structs.JobStatusRunning:
4572				children.Running--
4573			case structs.JobStatusDead:
4574				children.Dead--
4575			default:
4576				return fmt.Errorf("unknown old job status %q", oldStatus)
4577			}
4578		}
4579
4580		// Increment new status
4581		switch newStatus {
4582		case structs.JobStatusPending:
4583			children.Pending++
4584		case structs.JobStatusRunning:
4585			children.Running++
4586		case structs.JobStatusDead:
4587			children.Dead++
4588		default:
4589			return fmt.Errorf("unknown new job status %q", newStatus)
4590		}
4591
4592		// Update the index
4593		pSummary.ModifyIndex = index
4594
4595		// Insert the summary
4596		if err := txn.Insert("job_summary", pSummary); err != nil {
4597			return fmt.Errorf("job summary insert failed: %v", err)
4598		}
4599		if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
4600			return fmt.Errorf("index update failed: %v", err)
4601		}
4602	}
4603	return nil
4604}
4605
4606func (s *StateStore) getJobStatus(txn *txn, job *structs.Job, evalDelete bool) (string, error) {
4607	// System, Periodic and Parameterized jobs are running until explicitly
4608	// stopped
4609	if job.Type == structs.JobTypeSystem || job.IsParameterized() || job.IsPeriodic() {
4610		if job.Stop {
4611			return structs.JobStatusDead, nil
4612		}
4613
4614		return structs.JobStatusRunning, nil
4615	}
4616
4617	allocs, err := txn.Get("allocs", "job", job.Namespace, job.ID)
4618	if err != nil {
4619		return "", err
4620	}
4621
4622	// If there is a non-terminal allocation, the job is running.
4623	hasAlloc := false
4624	for alloc := allocs.Next(); alloc != nil; alloc = allocs.Next() {
4625		hasAlloc = true
4626		if !alloc.(*structs.Allocation).TerminalStatus() {
4627			return structs.JobStatusRunning, nil
4628		}
4629	}
4630
4631	evals, err := txn.Get("evals", "job_prefix", job.Namespace, job.ID)
4632	if err != nil {
4633		return "", err
4634	}
4635
4636	hasEval := false
4637	for raw := evals.Next(); raw != nil; raw = evals.Next() {
4638		e := raw.(*structs.Evaluation)
4639
4640		// Filter non-exact matches
4641		if e.JobID != job.ID {
4642			continue
4643		}
4644
4645		hasEval = true
4646		if !e.TerminalStatus() {
4647			return structs.JobStatusPending, nil
4648		}
4649	}
4650
4651	// The job is dead if all the allocations and evals are terminal or if there
4652	// are no evals because of garbage collection.
4653	if evalDelete || hasEval || hasAlloc {
4654		return structs.JobStatusDead, nil
4655	}
4656
4657	return structs.JobStatusPending, nil
4658}
4659
4660// updateSummaryWithJob creates or updates job summaries when new jobs are
4661// upserted or existing ones are updated
4662func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
4663	txn *txn) error {
4664
4665	// Update the job summary
4666	summaryRaw, err := txn.First("job_summary", "id", job.Namespace, job.ID)
4667	if err != nil {
4668		return fmt.Errorf("job summary lookup failed: %v", err)
4669	}
4670
4671	// Get the summary or create if necessary
4672	var summary *structs.JobSummary
4673	hasSummaryChanged := false
4674	if summaryRaw != nil {
4675		summary = summaryRaw.(*structs.JobSummary).Copy()
4676	} else {
4677		summary = &structs.JobSummary{
4678			JobID:       job.ID,
4679			Namespace:   job.Namespace,
4680			Summary:     make(map[string]structs.TaskGroupSummary),
4681			Children:    new(structs.JobChildrenSummary),
4682			CreateIndex: index,
4683		}
4684		hasSummaryChanged = true
4685	}
4686
4687	for _, tg := range job.TaskGroups {
4688		if _, ok := summary.Summary[tg.Name]; !ok {
4689			newSummary := structs.TaskGroupSummary{
4690				Complete: 0,
4691				Failed:   0,
4692				Running:  0,
4693				Starting: 0,
4694			}
4695			summary.Summary[tg.Name] = newSummary
4696			hasSummaryChanged = true
4697		}
4698	}
4699
4700	// The job summary has changed, so update the modify index.
4701	if hasSummaryChanged {
4702		summary.ModifyIndex = index
4703
4704		// Update the indexes table for job summary
4705		if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
4706			return fmt.Errorf("index update failed: %v", err)
4707		}
4708		if err := txn.Insert("job_summary", summary); err != nil {
4709			return err
4710		}
4711	}
4712
4713	return nil
4714}
4715
4716// updateJobScalingPolicies upserts any scaling policies contained in the job and removes
4717// any previous scaling policies that were removed from the job
4718func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, txn *txn) error {
4719
4720	ws := memdb.NewWatchSet()
4721
4722	scalingPolicies := job.GetScalingPolicies()
4723	newTargets := map[string]bool{}
4724	for _, p := range scalingPolicies {
4725		newTargets[p.JobKey()] = true
4726	}
4727	// find existing policies that need to be deleted
4728	deletedPolicies := []string{}
4729	iter, err := s.ScalingPoliciesByJobTxn(ws, job.Namespace, job.ID, txn)
4730	if err != nil {
4731		return fmt.Errorf("ScalingPoliciesByJob lookup failed: %v", err)
4732	}
4733	for raw := iter.Next(); raw != nil; raw = iter.Next() {
4734		oldPolicy := raw.(*structs.ScalingPolicy)
4735		if !newTargets[oldPolicy.JobKey()] {
4736			deletedPolicies = append(deletedPolicies, oldPolicy.ID)
4737		}
4738	}
4739	err = s.DeleteScalingPoliciesTxn(index, deletedPolicies, txn)
4740	if err != nil {
4741		return fmt.Errorf("DeleteScalingPolicies of removed policies failed: %v", err)
4742	}
4743
4744	err = s.UpsertScalingPoliciesTxn(index, scalingPolicies, txn)
4745	if err != nil {
4746		return fmt.Errorf("UpsertScalingPolicies of policies failed: %v", err)
4747	}
4748
4749	return nil
4750}
4751
4752// updateJobCSIPlugins runs on job update, and indexes the job in the plugin
4753func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, txn *txn) error {
4754	plugIns := make(map[string]*structs.CSIPlugin)
4755
4756	loop := func(job *structs.Job, delete bool) error {
4757		for _, tg := range job.TaskGroups {
4758			for _, t := range tg.Tasks {
4759				if t.CSIPluginConfig == nil {
4760					continue
4761				}
4762
4763				plugIn, ok := plugIns[t.CSIPluginConfig.ID]
4764				if !ok {
4765					p, err := s.CSIPluginByIDTxn(txn, nil, t.CSIPluginConfig.ID)
4766					if err != nil {
4767						return err
4768					}
4769					if p == nil {
4770						plugIn = structs.NewCSIPlugin(t.CSIPluginConfig.ID, index)
4771					} else {
4772						plugIn = p.Copy()
4773						plugIn.ModifyIndex = index
4774					}
4775					plugIns[plugIn.ID] = plugIn
4776				}
4777
4778				if delete {
4779					plugIn.DeleteJob(job, nil)
4780				} else {
4781					plugIn.AddJob(job, nil)
4782				}
4783			}
4784		}
4785
4786		return nil
4787	}
4788
4789	if prev != nil {
4790		err := loop(prev, true)
4791		if err != nil {
4792			return err
4793		}
4794	}
4795
4796	err := loop(job, false)
4797	if err != nil {
4798		return err
4799	}
4800
4801	for _, plugIn := range plugIns {
4802		err = txn.Insert("csi_plugins", plugIn)
4803		if err != nil {
4804			return fmt.Errorf("csi_plugins insert error: %v", err)
4805		}
4806	}
4807
4808	if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
4809		return fmt.Errorf("index update failed: %v", err)
4810	}
4811
4812	return nil
4813}
4814
4815// updateDeploymentWithAlloc is used to update the deployment state associated
4816// with the given allocation. The passed alloc may be updated if the deployment
4817// status has changed to capture the modify index at which it has changed.
4818func (s *StateStore) updateDeploymentWithAlloc(index uint64, alloc, existing *structs.Allocation, txn *txn) error {
4819	// Nothing to do if the allocation is not associated with a deployment
4820	if alloc.DeploymentID == "" {
4821		return nil
4822	}
4823
4824	// Get the deployment
4825	ws := memdb.NewWatchSet()
4826	deployment, err := s.deploymentByIDImpl(ws, alloc.DeploymentID, txn)
4827	if err != nil {
4828		return err
4829	}
4830	if deployment == nil {
4831		return nil
4832	}
4833
4834	// Retrieve the deployment state object
4835	_, ok := deployment.TaskGroups[alloc.TaskGroup]
4836	if !ok {
4837		// If the task group isn't part of the deployment, the task group wasn't
4838		// part of a rolling update so nothing to do
4839		return nil
4840	}
4841
4842	// Do not modify in-place. Instead keep track of what must be done
4843	placed := 0
4844	healthy := 0
4845	unhealthy := 0
4846
4847	// If there was no existing allocation, this is a placement and we increment
4848	// the placement
4849	existingHealthSet := existing != nil && existing.DeploymentStatus.HasHealth()
4850	allocHealthSet := alloc.DeploymentStatus.HasHealth()
4851	if existing == nil || existing.DeploymentID != alloc.DeploymentID {
4852		placed++
4853	} else if !existingHealthSet && allocHealthSet {
4854		if *alloc.DeploymentStatus.Healthy {
4855			healthy++
4856		} else {
4857			unhealthy++
4858		}
4859	} else if existingHealthSet && allocHealthSet {
4860		// See if it has gone from healthy to unhealthy
4861		if *existing.DeploymentStatus.Healthy && !*alloc.DeploymentStatus.Healthy {
4862			healthy--
4863			unhealthy++
4864		}
4865	}
4866
4867	// Nothing to do
4868	if placed == 0 && healthy == 0 && unhealthy == 0 {
4869		return nil
4870	}
4871
4872	// Update the allocation's deployment status modify index
4873	if alloc.DeploymentStatus != nil && healthy+unhealthy != 0 {
4874		alloc.DeploymentStatus.ModifyIndex = index
4875	}
4876
4877	// Create a copy of the deployment object
4878	deploymentCopy := deployment.Copy()
4879	deploymentCopy.ModifyIndex = index
4880
4881	dstate := deploymentCopy.TaskGroups[alloc.TaskGroup]
4882	dstate.PlacedAllocs += placed
4883	dstate.HealthyAllocs += healthy
4884	dstate.UnhealthyAllocs += unhealthy
4885
4886	// Ensure PlacedCanaries accurately reflects the alloc canary status
4887	if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Canary {
4888		found := false
4889		for _, canary := range dstate.PlacedCanaries {
4890			if alloc.ID == canary {
4891				found = true
4892				break
4893			}
4894		}
4895		if !found {
4896			dstate.PlacedCanaries = append(dstate.PlacedCanaries, alloc.ID)
4897		}
4898	}
4899
4900	// Update the progress deadline
4901	if pd := dstate.ProgressDeadline; pd != 0 {
4902		// If we are the first placed allocation for the deployment start the progress deadline.
4903		if placed != 0 && dstate.RequireProgressBy.IsZero() {
4904			// Use modify time instead of create time because we may in-place
4905			// update the allocation to be part of a new deployment.
4906			dstate.RequireProgressBy = time.Unix(0, alloc.ModifyTime).Add(pd)
4907		} else if healthy != 0 {
4908			if d := alloc.DeploymentStatus.Timestamp.Add(pd); d.After(dstate.RequireProgressBy) {
4909				dstate.RequireProgressBy = d
4910			}
4911		}
4912	}
4913
4914	// Upsert the deployment
4915	if err := s.upsertDeploymentImpl(index, deploymentCopy, txn); err != nil {
4916		return err
4917	}
4918
4919	return nil
4920}
4921
4922// updateSummaryWithAlloc updates the job summary when allocations are updated
4923// or inserted
4924func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocation,
4925	existingAlloc *structs.Allocation, txn *txn) error {
4926
4927	// We don't have to update the summary if the job is missing
4928	if alloc.Job == nil {
4929		return nil
4930	}
4931
4932	summaryRaw, err := txn.First("job_summary", "id", alloc.Namespace, alloc.JobID)
4933	if err != nil {
4934		return fmt.Errorf("unable to lookup job summary for job id %q in namespace %q: %v", alloc.JobID, alloc.Namespace, err)
4935	}
4936
4937	if summaryRaw == nil {
4938		// Check if the job is de-registered
4939		rawJob, err := txn.First("jobs", "id", alloc.Namespace, alloc.JobID)
4940		if err != nil {
4941			return fmt.Errorf("unable to query job: %v", err)
4942		}
4943
4944		// If the job is de-registered then we skip updating it's summary
4945		if rawJob == nil {
4946			return nil
4947		}
4948
4949		return fmt.Errorf("job summary for job %q in namespace %q is not present", alloc.JobID, alloc.Namespace)
4950	}
4951
4952	// Get a copy of the existing summary
4953	jobSummary := summaryRaw.(*structs.JobSummary).Copy()
4954
4955	// Not updating the job summary because the allocation doesn't belong to the
4956	// currently registered job
4957	if jobSummary.CreateIndex != alloc.Job.CreateIndex {
4958		return nil
4959	}
4960
4961	tgSummary, ok := jobSummary.Summary[alloc.TaskGroup]
4962	if !ok {
4963		return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup)
4964	}
4965
4966	summaryChanged := false
4967	if existingAlloc == nil {
4968		switch alloc.DesiredStatus {
4969		case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
4970			s.logger.Error("new allocation inserted into state store with bad desired status",
4971				"alloc_id", alloc.ID, "desired_status", alloc.DesiredStatus)
4972		}
4973		switch alloc.ClientStatus {
4974		case structs.AllocClientStatusPending:
4975			tgSummary.Starting += 1
4976			if tgSummary.Queued > 0 {
4977				tgSummary.Queued -= 1
4978			}
4979			summaryChanged = true
4980		case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed,
4981			structs.AllocClientStatusComplete:
4982			s.logger.Error("new allocation inserted into state store with bad client status",
4983				"alloc_id", alloc.ID, "client_status", alloc.ClientStatus)
4984		}
4985	} else if existingAlloc.ClientStatus != alloc.ClientStatus {
4986		// Incrementing the client of the bin of the current state
4987		switch alloc.ClientStatus {
4988		case structs.AllocClientStatusRunning:
4989			tgSummary.Running += 1
4990		case structs.AllocClientStatusFailed:
4991			tgSummary.Failed += 1
4992		case structs.AllocClientStatusPending:
4993			tgSummary.Starting += 1
4994		case structs.AllocClientStatusComplete:
4995			tgSummary.Complete += 1
4996		case structs.AllocClientStatusLost:
4997			tgSummary.Lost += 1
4998		}
4999
5000		// Decrementing the count of the bin of the last state
5001		switch existingAlloc.ClientStatus {
5002		case structs.AllocClientStatusRunning:
5003			if tgSummary.Running > 0 {
5004				tgSummary.Running -= 1
5005			}
5006		case structs.AllocClientStatusPending:
5007			if tgSummary.Starting > 0 {
5008				tgSummary.Starting -= 1
5009			}
5010		case structs.AllocClientStatusLost:
5011			if tgSummary.Lost > 0 {
5012				tgSummary.Lost -= 1
5013			}
5014		case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete:
5015		default:
5016			s.logger.Error("invalid old client status for allocation",
5017				"alloc_id", existingAlloc.ID, "client_status", existingAlloc.ClientStatus)
5018		}
5019		summaryChanged = true
5020	}
5021	jobSummary.Summary[alloc.TaskGroup] = tgSummary
5022
5023	if summaryChanged {
5024		jobSummary.ModifyIndex = index
5025
5026		s.updatePluginWithJobSummary(index, jobSummary, alloc, txn)
5027
5028		// Update the indexes table for job summary
5029		if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
5030			return fmt.Errorf("index update failed: %v", err)
5031		}
5032
5033		if err := txn.Insert("job_summary", jobSummary); err != nil {
5034			return fmt.Errorf("updating job summary failed: %v", err)
5035		}
5036	}
5037
5038	return nil
5039}
5040
5041// updatePluginWithAlloc updates the CSI plugins for an alloc when the
5042// allocation is updated or inserted with a terminal server status.
5043func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation,
5044	txn *txn) error {
5045	if !alloc.ServerTerminalStatus() {
5046		return nil
5047	}
5048
5049	tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
5050	for _, t := range tg.Tasks {
5051		if t.CSIPluginConfig != nil {
5052			pluginID := t.CSIPluginConfig.ID
5053			plug, err := s.CSIPluginByIDTxn(txn, nil, pluginID)
5054			if err != nil {
5055				return err
5056			}
5057			if plug == nil {
5058				// plugin may not have been created because it never
5059				// became healthy, just move on
5060				return nil
5061			}
5062			plug = plug.Copy()
5063			err = plug.DeleteAlloc(alloc.ID, alloc.NodeID)
5064			if err != nil {
5065				return err
5066			}
5067			err = updateOrGCPlugin(index, txn, plug)
5068			if err != nil {
5069				return err
5070			}
5071		}
5072	}
5073
5074	return nil
5075}
5076
5077// updatePluginWithJobSummary updates the CSI plugins for a job when the
5078// job summary is updated by an alloc
5079func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.JobSummary, alloc *structs.Allocation,
5080	txn *txn) error {
5081
5082	tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
5083	if tg == nil {
5084		return nil
5085	}
5086
5087	for _, t := range tg.Tasks {
5088		if t.CSIPluginConfig != nil {
5089			pluginID := t.CSIPluginConfig.ID
5090			plug, err := s.CSIPluginByIDTxn(txn, nil, pluginID)
5091			if err != nil {
5092				return err
5093			}
5094			if plug == nil {
5095				plug = structs.NewCSIPlugin(pluginID, index)
5096			} else {
5097				plug = plug.Copy()
5098			}
5099
5100			plug.UpdateExpectedWithJob(alloc.Job, summary, alloc.ServerTerminalStatus())
5101			err = updateOrGCPlugin(index, txn, plug)
5102			if err != nil {
5103				return err
5104			}
5105		}
5106	}
5107
5108	return nil
5109}
5110
5111// UpsertACLPolicies is used to create or update a set of ACL policies
5112func (s *StateStore) UpsertACLPolicies(msgType structs.MessageType, index uint64, policies []*structs.ACLPolicy) error {
5113	txn := s.db.WriteTxnMsgT(msgType, index)
5114	defer txn.Abort()
5115
5116	for _, policy := range policies {
5117		// Ensure the policy hash is non-nil. This should be done outside the state store
5118		// for performance reasons, but we check here for defense in depth.
5119		if len(policy.Hash) == 0 {
5120			policy.SetHash()
5121		}
5122
5123		// Check if the policy already exists
5124		existing, err := txn.First("acl_policy", "id", policy.Name)
5125		if err != nil {
5126			return fmt.Errorf("policy lookup failed: %v", err)
5127		}
5128
5129		// Update all the indexes
5130		if existing != nil {
5131			policy.CreateIndex = existing.(*structs.ACLPolicy).CreateIndex
5132			policy.ModifyIndex = index
5133		} else {
5134			policy.CreateIndex = index
5135			policy.ModifyIndex = index
5136		}
5137
5138		// Update the policy
5139		if err := txn.Insert("acl_policy", policy); err != nil {
5140			return fmt.Errorf("upserting policy failed: %v", err)
5141		}
5142	}
5143
5144	// Update the indexes tabl
5145	if err := txn.Insert("index", &IndexEntry{"acl_policy", index}); err != nil {
5146		return fmt.Errorf("index update failed: %v", err)
5147	}
5148
5149	return txn.Commit()
5150}
5151
5152// DeleteACLPolicies deletes the policies with the given names
5153func (s *StateStore) DeleteACLPolicies(msgType structs.MessageType, index uint64, names []string) error {
5154	txn := s.db.WriteTxnMsgT(msgType, index)
5155	defer txn.Abort()
5156
5157	// Delete the policy
5158	for _, name := range names {
5159		if _, err := txn.DeleteAll("acl_policy", "id", name); err != nil {
5160			return fmt.Errorf("deleting acl policy failed: %v", err)
5161		}
5162	}
5163	if err := txn.Insert("index", &IndexEntry{"acl_policy", index}); err != nil {
5164		return fmt.Errorf("index update failed: %v", err)
5165	}
5166	return txn.Commit()
5167}
5168
5169// ACLPolicyByName is used to lookup a policy by name
5170func (s *StateStore) ACLPolicyByName(ws memdb.WatchSet, name string) (*structs.ACLPolicy, error) {
5171	txn := s.db.ReadTxn()
5172
5173	watchCh, existing, err := txn.FirstWatch("acl_policy", "id", name)
5174	if err != nil {
5175		return nil, fmt.Errorf("acl policy lookup failed: %v", err)
5176	}
5177	ws.Add(watchCh)
5178
5179	if existing != nil {
5180		return existing.(*structs.ACLPolicy), nil
5181	}
5182	return nil, nil
5183}
5184
5185// ACLPolicyByNamePrefix is used to lookup policies by prefix
5186func (s *StateStore) ACLPolicyByNamePrefix(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) {
5187	txn := s.db.ReadTxn()
5188
5189	iter, err := txn.Get("acl_policy", "id_prefix", prefix)
5190	if err != nil {
5191		return nil, fmt.Errorf("acl policy lookup failed: %v", err)
5192	}
5193	ws.Add(iter.WatchCh())
5194
5195	return iter, nil
5196}
5197
5198// ACLPolicies returns an iterator over all the acl policies
5199func (s *StateStore) ACLPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error) {
5200	txn := s.db.ReadTxn()
5201
5202	// Walk the entire table
5203	iter, err := txn.Get("acl_policy", "id")
5204	if err != nil {
5205		return nil, err
5206	}
5207	ws.Add(iter.WatchCh())
5208	return iter, nil
5209}
5210
5211// UpsertACLTokens is used to create or update a set of ACL tokens
5212func (s *StateStore) UpsertACLTokens(msgType structs.MessageType, index uint64, tokens []*structs.ACLToken) error {
5213	txn := s.db.WriteTxnMsgT(msgType, index)
5214	defer txn.Abort()
5215
5216	for _, token := range tokens {
5217		// Ensure the policy hash is non-nil. This should be done outside the state store
5218		// for performance reasons, but we check here for defense in depth.
5219		if len(token.Hash) == 0 {
5220			token.SetHash()
5221		}
5222
5223		// Check if the token already exists
5224		existing, err := txn.First("acl_token", "id", token.AccessorID)
5225		if err != nil {
5226			return fmt.Errorf("token lookup failed: %v", err)
5227		}
5228
5229		// Update all the indexes
5230		if existing != nil {
5231			existTK := existing.(*structs.ACLToken)
5232			token.CreateIndex = existTK.CreateIndex
5233			token.ModifyIndex = index
5234
5235			// Do not allow SecretID or create time to change
5236			token.SecretID = existTK.SecretID
5237			token.CreateTime = existTK.CreateTime
5238
5239		} else {
5240			token.CreateIndex = index
5241			token.ModifyIndex = index
5242		}
5243
5244		// Update the token
5245		if err := txn.Insert("acl_token", token); err != nil {
5246			return fmt.Errorf("upserting token failed: %v", err)
5247		}
5248	}
5249
5250	// Update the indexes table
5251	if err := txn.Insert("index", &IndexEntry{"acl_token", index}); err != nil {
5252		return fmt.Errorf("index update failed: %v", err)
5253	}
5254	return txn.Commit()
5255}
5256
5257// DeleteACLTokens deletes the tokens with the given accessor ids
5258func (s *StateStore) DeleteACLTokens(msgType structs.MessageType, index uint64, ids []string) error {
5259	txn := s.db.WriteTxnMsgT(msgType, index)
5260	defer txn.Abort()
5261
5262	// Delete the tokens
5263	for _, id := range ids {
5264		if _, err := txn.DeleteAll("acl_token", "id", id); err != nil {
5265			return fmt.Errorf("deleting acl token failed: %v", err)
5266		}
5267	}
5268	if err := txn.Insert("index", &IndexEntry{"acl_token", index}); err != nil {
5269		return fmt.Errorf("index update failed: %v", err)
5270	}
5271	return txn.Commit()
5272}
5273
5274// ACLTokenByAccessorID is used to lookup a token by accessor ID
5275func (s *StateStore) ACLTokenByAccessorID(ws memdb.WatchSet, id string) (*structs.ACLToken, error) {
5276	if id == "" {
5277		return nil, fmt.Errorf("acl token lookup failed: missing accessor id")
5278	}
5279
5280	txn := s.db.ReadTxn()
5281
5282	watchCh, existing, err := txn.FirstWatch("acl_token", "id", id)
5283	if err != nil {
5284		return nil, fmt.Errorf("acl token lookup failed: %v", err)
5285	}
5286	ws.Add(watchCh)
5287
5288	if existing != nil {
5289		return existing.(*structs.ACLToken), nil
5290	}
5291	return nil, nil
5292}
5293
5294// ACLTokenBySecretID is used to lookup a token by secret ID
5295func (s *StateStore) ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error) {
5296	if secretID == "" {
5297		return nil, fmt.Errorf("acl token lookup failed: missing secret id")
5298	}
5299
5300	txn := s.db.ReadTxn()
5301
5302	watchCh, existing, err := txn.FirstWatch("acl_token", "secret", secretID)
5303	if err != nil {
5304		return nil, fmt.Errorf("acl token lookup failed: %v", err)
5305	}
5306	ws.Add(watchCh)
5307
5308	if existing != nil {
5309		return existing.(*structs.ACLToken), nil
5310	}
5311	return nil, nil
5312}
5313
5314// ACLTokenByAccessorIDPrefix is used to lookup tokens by prefix
5315func (s *StateStore) ACLTokenByAccessorIDPrefix(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) {
5316	txn := s.db.ReadTxn()
5317
5318	iter, err := txn.Get("acl_token", "id_prefix", prefix)
5319	if err != nil {
5320		return nil, fmt.Errorf("acl token lookup failed: %v", err)
5321	}
5322	ws.Add(iter.WatchCh())
5323	return iter, nil
5324}
5325
5326// ACLTokens returns an iterator over all the tokens
5327func (s *StateStore) ACLTokens(ws memdb.WatchSet) (memdb.ResultIterator, error) {
5328	txn := s.db.ReadTxn()
5329
5330	// Walk the entire table
5331	iter, err := txn.Get("acl_token", "id")
5332	if err != nil {
5333		return nil, err
5334	}
5335	ws.Add(iter.WatchCh())
5336	return iter, nil
5337}
5338
5339// ACLTokensByGlobal returns an iterator over all the tokens filtered by global value
5340func (s *StateStore) ACLTokensByGlobal(ws memdb.WatchSet, globalVal bool) (memdb.ResultIterator, error) {
5341	txn := s.db.ReadTxn()
5342
5343	// Walk the entire table
5344	iter, err := txn.Get("acl_token", "global", globalVal)
5345	if err != nil {
5346		return nil, err
5347	}
5348	ws.Add(iter.WatchCh())
5349	return iter, nil
5350}
5351
5352// CanBootstrapACLToken checks if bootstrapping is possible and returns the reset index
5353func (s *StateStore) CanBootstrapACLToken() (bool, uint64, error) {
5354	txn := s.db.ReadTxn()
5355
5356	// Lookup the bootstrap sentinel
5357	out, err := txn.First("index", "id", "acl_token_bootstrap")
5358	if err != nil {
5359		return false, 0, err
5360	}
5361
5362	// No entry, we haven't bootstrapped yet
5363	if out == nil {
5364		return true, 0, nil
5365	}
5366
5367	// Return the reset index if we've already bootstrapped
5368	return false, out.(*IndexEntry).Value, nil
5369}
5370
5371// BootstrapACLToken is used to create an initial ACL token
5372func (s *StateStore) BootstrapACLTokens(msgType structs.MessageType, index uint64, resetIndex uint64, token *structs.ACLToken) error {
5373	txn := s.db.WriteTxnMsgT(msgType, index)
5374	defer txn.Abort()
5375
5376	// Check if we have already done a bootstrap
5377	existing, err := txn.First("index", "id", "acl_token_bootstrap")
5378	if err != nil {
5379		return fmt.Errorf("bootstrap check failed: %v", err)
5380	}
5381	if existing != nil {
5382		if resetIndex == 0 {
5383			return fmt.Errorf("ACL bootstrap already done")
5384		} else if resetIndex != existing.(*IndexEntry).Value {
5385			return fmt.Errorf("Invalid reset index for ACL bootstrap")
5386		}
5387	}
5388
5389	// Update the Create/Modify time
5390	token.CreateIndex = index
5391	token.ModifyIndex = index
5392
5393	// Insert the token
5394	if err := txn.Insert("acl_token", token); err != nil {
5395		return fmt.Errorf("upserting token failed: %v", err)
5396	}
5397
5398	// Update the indexes table, prevents future bootstrap until reset
5399	if err := txn.Insert("index", &IndexEntry{"acl_token", index}); err != nil {
5400		return fmt.Errorf("index update failed: %v", err)
5401	}
5402	if err := txn.Insert("index", &IndexEntry{"acl_token_bootstrap", index}); err != nil {
5403		return fmt.Errorf("index update failed: %v", err)
5404	}
5405	return txn.Commit()
5406}
5407
5408// UpsertOneTimeToken is used to create or update a set of ACL
5409// tokens. Validating that we're not upserting an already-expired token is
5410// made the responsibility of the caller to facilitate testing.
5411func (s *StateStore) UpsertOneTimeToken(msgType structs.MessageType, index uint64, token *structs.OneTimeToken) error {
5412	txn := s.db.WriteTxnMsgT(msgType, index)
5413	defer txn.Abort()
5414
5415	// we expect the RPC call to set the ExpiresAt
5416	if token.ExpiresAt.IsZero() {
5417		return fmt.Errorf("one-time token must have an ExpiresAt time")
5418	}
5419
5420	// Update all the indexes
5421	token.CreateIndex = index
5422	token.ModifyIndex = index
5423
5424	// Create the token
5425	if err := txn.Insert("one_time_token", token); err != nil {
5426		return fmt.Errorf("upserting one-time token failed: %v", err)
5427	}
5428
5429	// Update the indexes table
5430	if err := txn.Insert("index", &IndexEntry{"one_time_token", index}); err != nil {
5431		return fmt.Errorf("index update failed: %v", err)
5432	}
5433	return txn.Commit()
5434}
5435
5436// DeleteOneTimeTokens deletes the tokens with the given ACLToken Accessor IDs
5437func (s *StateStore) DeleteOneTimeTokens(msgType structs.MessageType, index uint64, ids []string) error {
5438	txn := s.db.WriteTxnMsgT(msgType, index)
5439	defer txn.Abort()
5440
5441	var deleted int
5442	for _, id := range ids {
5443		d, err := txn.DeleteAll("one_time_token", "id", id)
5444		if err != nil {
5445			return fmt.Errorf("deleting one-time token failed: %v", err)
5446		}
5447		deleted += d
5448	}
5449
5450	if deleted > 0 {
5451		if err := txn.Insert("index", &IndexEntry{"one_time_token", index}); err != nil {
5452			return fmt.Errorf("index update failed: %v", err)
5453		}
5454	}
5455	return txn.Commit()
5456}
5457
5458// ExpireOneTimeTokens deletes tokens that have expired
5459func (s *StateStore) ExpireOneTimeTokens(msgType structs.MessageType, index uint64) error {
5460	txn := s.db.WriteTxnMsgT(msgType, index)
5461	defer txn.Abort()
5462
5463	iter, err := s.oneTimeTokensExpiredTxn(txn, nil)
5464	if err != nil {
5465		return err
5466	}
5467
5468	var deleted int
5469	for {
5470		raw := iter.Next()
5471		if raw == nil {
5472			break
5473		}
5474		ott, ok := raw.(*structs.OneTimeToken)
5475		if !ok || ott == nil {
5476			return fmt.Errorf("could not decode one-time token")
5477		}
5478		d, err := txn.DeleteAll("one_time_token", "secret", ott.OneTimeSecretID)
5479		if err != nil {
5480			return fmt.Errorf("deleting one-time token failed: %v", err)
5481		}
5482		deleted += d
5483	}
5484
5485	if deleted > 0 {
5486		if err := txn.Insert("index", &IndexEntry{"one_time_token", index}); err != nil {
5487			return fmt.Errorf("index update failed: %v", err)
5488		}
5489	}
5490	return txn.Commit()
5491}
5492
5493// oneTimeTokensExpiredTxn returns an iterator over all expired one-time tokens
5494func (s *StateStore) oneTimeTokensExpiredTxn(txn *txn, ws memdb.WatchSet) (memdb.ResultIterator, error) {
5495	iter, err := txn.Get("one_time_token", "id")
5496	if err != nil {
5497		return nil, fmt.Errorf("one-time token lookup failed: %v", err)
5498	}
5499
5500	ws.Add(iter.WatchCh())
5501	iter = memdb.NewFilterIterator(iter, expiredOneTimeTokenFilter(time.Now()))
5502	return iter, nil
5503}
5504
5505// OneTimeTokenBySecret is used to lookup a token by secret
5506func (s *StateStore) OneTimeTokenBySecret(ws memdb.WatchSet, secret string) (*structs.OneTimeToken, error) {
5507	if secret == "" {
5508		return nil, fmt.Errorf("one-time token lookup failed: missing secret")
5509	}
5510
5511	txn := s.db.ReadTxn()
5512
5513	watchCh, existing, err := txn.FirstWatch("one_time_token", "secret", secret)
5514	if err != nil {
5515		return nil, fmt.Errorf("one-time token lookup failed: %v", err)
5516	}
5517	ws.Add(watchCh)
5518
5519	if existing != nil {
5520		return existing.(*structs.OneTimeToken), nil
5521	}
5522	return nil, nil
5523}
5524
5525// expiredOneTimeTokenFilter returns a filter function that returns only
5526// expired one-time tokens
5527func expiredOneTimeTokenFilter(now time.Time) func(interface{}) bool {
5528	return func(raw interface{}) bool {
5529		ott, ok := raw.(*structs.OneTimeToken)
5530		if !ok {
5531			return true
5532		}
5533
5534		return ott.ExpiresAt.After(now)
5535	}
5536}
5537
5538// SchedulerConfig is used to get the current Scheduler configuration.
5539func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) {
5540	tx := s.db.ReadTxn()
5541	defer tx.Abort()
5542
5543	// Get the scheduler config
5544	c, err := tx.First("scheduler_config", "id")
5545	if err != nil {
5546		return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err)
5547	}
5548
5549	config, ok := c.(*structs.SchedulerConfiguration)
5550	if !ok {
5551		return 0, nil, nil
5552	}
5553
5554	return config.ModifyIndex, config, nil
5555}
5556
5557// SchedulerSetConfig is used to set the current Scheduler configuration.
5558func (s *StateStore) SchedulerSetConfig(index uint64, config *structs.SchedulerConfiguration) error {
5559	tx := s.db.WriteTxn(index)
5560	defer tx.Abort()
5561
5562	s.schedulerSetConfigTxn(index, tx, config)
5563
5564	return tx.Commit()
5565}
5566
5567func (s *StateStore) ClusterMetadata(ws memdb.WatchSet) (*structs.ClusterMetadata, error) {
5568	txn := s.db.ReadTxn()
5569	defer txn.Abort()
5570
5571	// Get the cluster metadata
5572	watchCh, m, err := txn.FirstWatch("cluster_meta", "id")
5573	if err != nil {
5574		return nil, errors.Wrap(err, "failed cluster metadata lookup")
5575	}
5576	ws.Add(watchCh)
5577
5578	if m != nil {
5579		return m.(*structs.ClusterMetadata), nil
5580	}
5581
5582	return nil, nil
5583}
5584
5585func (s *StateStore) ClusterSetMetadata(index uint64, meta *structs.ClusterMetadata) error {
5586	txn := s.db.WriteTxn(index)
5587	defer txn.Abort()
5588
5589	if err := s.setClusterMetadata(txn, meta); err != nil {
5590		return errors.Wrap(err, "set cluster metadata failed")
5591	}
5592
5593	return txn.Commit()
5594}
5595
5596// WithWriteTransaction executes the passed function within a write transaction,
5597// and returns its result.  If the invocation returns no error, the transaction
5598// is committed; otherwise, it's aborted.
5599func (s *StateStore) WithWriteTransaction(msgType structs.MessageType, index uint64, fn func(Txn) error) error {
5600	tx := s.db.WriteTxnMsgT(msgType, index)
5601	defer tx.Abort()
5602
5603	err := fn(tx)
5604	if err == nil {
5605		return tx.Commit()
5606	}
5607	return err
5608}
5609
5610// SchedulerCASConfig is used to update the scheduler configuration with a
5611// given Raft index. If the CAS index specified is not equal to the last observed index
5612// for the config, then the call is a noop.
5613func (s *StateStore) SchedulerCASConfig(index, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) {
5614	tx := s.db.WriteTxn(index)
5615	defer tx.Abort()
5616
5617	// Check for an existing config
5618	existing, err := tx.First("scheduler_config", "id")
5619	if err != nil {
5620		return false, fmt.Errorf("failed scheduler config lookup: %s", err)
5621	}
5622
5623	// If the existing index does not match the provided CAS
5624	// index arg, then we shouldn't update anything and can safely
5625	// return early here.
5626	e, ok := existing.(*structs.SchedulerConfiguration)
5627	if !ok || (e != nil && e.ModifyIndex != cidx) {
5628		return false, nil
5629	}
5630
5631	s.schedulerSetConfigTxn(index, tx, config)
5632
5633	if err := tx.Commit(); err != nil {
5634		return false, err
5635	}
5636	return true, nil
5637}
5638
5639func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *txn, config *structs.SchedulerConfiguration) error {
5640	// Check for an existing config
5641	existing, err := tx.First("scheduler_config", "id")
5642	if err != nil {
5643		return fmt.Errorf("failed scheduler config lookup: %s", err)
5644	}
5645
5646	// Set the indexes.
5647	if existing != nil {
5648		config.CreateIndex = existing.(*structs.SchedulerConfiguration).CreateIndex
5649	} else {
5650		config.CreateIndex = idx
5651	}
5652	config.ModifyIndex = idx
5653
5654	if err := tx.Insert("scheduler_config", config); err != nil {
5655		return fmt.Errorf("failed updating scheduler config: %s", err)
5656	}
5657	return nil
5658}
5659
5660func (s *StateStore) setClusterMetadata(txn *txn, meta *structs.ClusterMetadata) error {
5661	// Check for an existing config, if it exists, verify that the cluster ID matches
5662	existing, err := txn.First("cluster_meta", "id")
5663	if err != nil {
5664		return fmt.Errorf("failed cluster meta lookup: %v", err)
5665	}
5666
5667	if existing != nil {
5668		existingClusterID := existing.(*structs.ClusterMetadata).ClusterID
5669		if meta.ClusterID != existingClusterID && existingClusterID != "" {
5670			// there is a bug in cluster ID detection
5671			return fmt.Errorf("refusing to set new cluster id, previous: %s, new: %s", existingClusterID, meta.ClusterID)
5672		}
5673	}
5674
5675	// update is technically a noop, unless someday we add more / mutable fields
5676	if err := txn.Insert("cluster_meta", meta); err != nil {
5677		return fmt.Errorf("set cluster metadata failed: %v", err)
5678	}
5679
5680	return nil
5681}
5682
5683// UpsertScalingPolicy is used to insert a new scaling policy.
5684func (s *StateStore) UpsertScalingPolicies(index uint64, scalingPolicies []*structs.ScalingPolicy) error {
5685	txn := s.db.WriteTxn(index)
5686	defer txn.Abort()
5687
5688	if err := s.UpsertScalingPoliciesTxn(index, scalingPolicies, txn); err != nil {
5689		return err
5690	}
5691
5692	return txn.Commit()
5693}
5694
5695// upsertScalingPolicy is used to insert a new scaling policy.
5696func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*structs.ScalingPolicy,
5697	txn *txn) error {
5698
5699	hadUpdates := false
5700
5701	for _, policy := range scalingPolicies {
5702		// Check if the scaling policy already exists
5703		// Policy uniqueness is based on target and type
5704		it, err := txn.Get("scaling_policy", "target",
5705			policy.Target[structs.ScalingTargetNamespace],
5706			policy.Target[structs.ScalingTargetJob],
5707			policy.Target[structs.ScalingTargetGroup],
5708			policy.Target[structs.ScalingTargetTask],
5709		)
5710		if err != nil {
5711			return fmt.Errorf("scaling policy lookup failed: %v", err)
5712		}
5713
5714		// Check if type matches
5715		var existing *structs.ScalingPolicy
5716		for raw := it.Next(); raw != nil; raw = it.Next() {
5717			p := raw.(*structs.ScalingPolicy)
5718			if p.Type == policy.Type {
5719				existing = p
5720				break
5721			}
5722		}
5723
5724		// Setup the indexes correctly
5725		if existing != nil {
5726			if !existing.Diff(policy) {
5727				continue
5728			}
5729			policy.ID = existing.ID
5730			policy.CreateIndex = existing.CreateIndex
5731		} else {
5732			// policy.ID must have been set already in Job.Register before log apply
5733			policy.CreateIndex = index
5734		}
5735		policy.ModifyIndex = index
5736
5737		// Insert the scaling policy
5738		hadUpdates = true
5739		if err := txn.Insert("scaling_policy", policy); err != nil {
5740			return err
5741		}
5742	}
5743
5744	// Update the indexes table for scaling policy if we updated any policies
5745	if hadUpdates {
5746		if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
5747			return fmt.Errorf("index update failed: %v", err)
5748		}
5749	}
5750
5751	return nil
5752}
5753
5754// NamespaceByName is used to lookup a namespace by name
5755func (s *StateStore) NamespaceByName(ws memdb.WatchSet, name string) (*structs.Namespace, error) {
5756	txn := s.db.ReadTxn()
5757	return s.namespaceByNameImpl(ws, txn, name)
5758}
5759
5760// namespaceByNameImpl is used to lookup a namespace by name
5761func (s *StateStore) namespaceByNameImpl(ws memdb.WatchSet, txn *txn, name string) (*structs.Namespace, error) {
5762	watchCh, existing, err := txn.FirstWatch(TableNamespaces, "id", name)
5763	if err != nil {
5764		return nil, fmt.Errorf("namespace lookup failed: %v", err)
5765	}
5766	ws.Add(watchCh)
5767
5768	if existing != nil {
5769		return existing.(*structs.Namespace), nil
5770	}
5771	return nil, nil
5772}
5773
5774// namespaceExists returns whether a namespace exists
5775func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) {
5776	if namespace == structs.DefaultNamespace {
5777		return true, nil
5778	}
5779
5780	existing, err := txn.First(TableNamespaces, "id", namespace)
5781	if err != nil {
5782		return false, fmt.Errorf("namespace lookup failed: %v", err)
5783	}
5784
5785	return existing != nil, nil
5786}
5787
5788// NamespacesByNamePrefix is used to lookup namespaces by prefix
5789func (s *StateStore) NamespacesByNamePrefix(ws memdb.WatchSet, namePrefix string) (memdb.ResultIterator, error) {
5790	txn := s.db.ReadTxn()
5791
5792	iter, err := txn.Get(TableNamespaces, "id_prefix", namePrefix)
5793	if err != nil {
5794		return nil, fmt.Errorf("namespaces lookup failed: %v", err)
5795	}
5796	ws.Add(iter.WatchCh())
5797
5798	return iter, nil
5799}
5800
5801// Namespaces returns an iterator over all the namespaces
5802func (s *StateStore) Namespaces(ws memdb.WatchSet) (memdb.ResultIterator, error) {
5803	txn := s.db.ReadTxn()
5804
5805	// Walk the entire namespace table
5806	iter, err := txn.Get(TableNamespaces, "id")
5807	if err != nil {
5808		return nil, err
5809	}
5810	ws.Add(iter.WatchCh())
5811	return iter, nil
5812}
5813
5814func (s *StateStore) NamespaceNames() ([]string, error) {
5815	it, err := s.Namespaces(nil)
5816	if err != nil {
5817		return nil, err
5818	}
5819
5820	nses := []string{}
5821	for {
5822		next := it.Next()
5823		if next == nil {
5824			break
5825		}
5826		ns := next.(*structs.Namespace)
5827		nses = append(nses, ns.Name)
5828	}
5829
5830	return nses, nil
5831}
5832
5833// UpsertNamespace is used to register or update a set of namespaces
5834func (s *StateStore) UpsertNamespaces(index uint64, namespaces []*structs.Namespace) error {
5835	txn := s.db.WriteTxn(index)
5836	defer txn.Abort()
5837
5838	for _, ns := range namespaces {
5839		if err := s.upsertNamespaceImpl(index, txn, ns); err != nil {
5840			return err
5841		}
5842	}
5843
5844	if err := txn.Insert("index", &IndexEntry{TableNamespaces, index}); err != nil {
5845		return fmt.Errorf("index update failed: %v", err)
5846	}
5847
5848	return txn.Commit()
5849}
5850
5851// upsertNamespaceImpl is used to upsert a namespace
5852func (s *StateStore) upsertNamespaceImpl(index uint64, txn *txn, namespace *structs.Namespace) error {
5853	// Ensure the namespace hash is non-nil. This should be done outside the state store
5854	// for performance reasons, but we check here for defense in depth.
5855	ns := namespace
5856	if len(ns.Hash) == 0 {
5857		ns.SetHash()
5858	}
5859
5860	// Check if the namespace already exists
5861	existing, err := txn.First(TableNamespaces, "id", ns.Name)
5862	if err != nil {
5863		return fmt.Errorf("namespace lookup failed: %v", err)
5864	}
5865
5866	// Setup the indexes correctly and determine which quotas need to be
5867	// reconciled
5868	var oldQuota string
5869	if existing != nil {
5870		exist := existing.(*structs.Namespace)
5871		ns.CreateIndex = exist.CreateIndex
5872		ns.ModifyIndex = index
5873
5874		// Grab the old quota on the namespace
5875		oldQuota = exist.Quota
5876	} else {
5877		ns.CreateIndex = index
5878		ns.ModifyIndex = index
5879	}
5880
5881	// Validate that the quota on the new namespace exists
5882	if ns.Quota != "" {
5883		exists, err := s.quotaSpecExists(txn, ns.Quota)
5884		if err != nil {
5885			return fmt.Errorf("looking up namespace quota %q failed: %v", ns.Quota, err)
5886		} else if !exists {
5887			return fmt.Errorf("namespace %q using non-existent quota %q", ns.Name, ns.Quota)
5888		}
5889	}
5890
5891	// Insert the namespace
5892	if err := txn.Insert(TableNamespaces, ns); err != nil {
5893		return fmt.Errorf("namespace insert failed: %v", err)
5894	}
5895
5896	// Reconcile changed quotas
5897	return s.quotaReconcile(index, txn, ns.Quota, oldQuota)
5898}
5899
5900// DeleteNamespaces is used to remove a set of namespaces
5901func (s *StateStore) DeleteNamespaces(index uint64, names []string) error {
5902	txn := s.db.WriteTxn(index)
5903	defer txn.Abort()
5904
5905	for _, name := range names {
5906		// Lookup the namespace
5907		existing, err := txn.First(TableNamespaces, "id", name)
5908		if err != nil {
5909			return fmt.Errorf("namespace lookup failed: %v", err)
5910		}
5911		if existing == nil {
5912			return fmt.Errorf("namespace not found")
5913		}
5914
5915		ns := existing.(*structs.Namespace)
5916		if ns.Name == structs.DefaultNamespace {
5917			return fmt.Errorf("default namespace can not be deleted")
5918		}
5919
5920		// Ensure that the namespace doesn't have any non-terminal jobs
5921		iter, err := s.jobsByNamespaceImpl(nil, name, txn)
5922		if err != nil {
5923			return err
5924		}
5925
5926		for {
5927			raw := iter.Next()
5928			if raw == nil {
5929				break
5930			}
5931			job := raw.(*structs.Job)
5932
5933			if job.Status != structs.JobStatusDead {
5934				return fmt.Errorf("namespace %q contains at least one non-terminal job %q. "+
5935					"All jobs must be terminal in namespace before it can be deleted", name, job.ID)
5936			}
5937		}
5938
5939		// Delete the namespace
5940		if err := txn.Delete(TableNamespaces, existing); err != nil {
5941			return fmt.Errorf("namespace deletion failed: %v", err)
5942		}
5943	}
5944
5945	if err := txn.Insert("index", &IndexEntry{TableNamespaces, index}); err != nil {
5946		return fmt.Errorf("index update failed: %v", err)
5947	}
5948
5949	return txn.Commit()
5950}
5951
5952func (s *StateStore) DeleteScalingPolicies(index uint64, ids []string) error {
5953	txn := s.db.WriteTxn(index)
5954	defer txn.Abort()
5955
5956	err := s.DeleteScalingPoliciesTxn(index, ids, txn)
5957	if err == nil {
5958		return txn.Commit()
5959	}
5960
5961	return err
5962}
5963
5964// DeleteScalingPolicies is used to delete a set of scaling policies by ID
5965func (s *StateStore) DeleteScalingPoliciesTxn(index uint64, ids []string, txn *txn) error {
5966	if len(ids) == 0 {
5967		return nil
5968	}
5969
5970	for _, id := range ids {
5971		// Lookup the scaling policy
5972		existing, err := txn.First("scaling_policy", "id", id)
5973		if err != nil {
5974			return fmt.Errorf("scaling policy lookup failed: %v", err)
5975		}
5976		if existing == nil {
5977			return fmt.Errorf("scaling policy not found")
5978		}
5979
5980		// Delete the scaling policy
5981		if err := txn.Delete("scaling_policy", existing); err != nil {
5982			return fmt.Errorf("scaling policy delete failed: %v", err)
5983		}
5984	}
5985
5986	if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
5987		return fmt.Errorf("index update failed: %v", err)
5988	}
5989
5990	return nil
5991}
5992
5993// ScalingPolicies returns an iterator over all the scaling policies
5994func (s *StateStore) ScalingPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error) {
5995	txn := s.db.ReadTxn()
5996
5997	// Walk the entire scaling_policy table
5998	iter, err := txn.Get("scaling_policy", "id")
5999	if err != nil {
6000		return nil, err
6001	}
6002
6003	ws.Add(iter.WatchCh())
6004
6005	return iter, nil
6006}
6007
6008// ScalingPoliciesByTypePrefix returns an iterator over scaling policies with a certain type prefix.
6009func (s *StateStore) ScalingPoliciesByTypePrefix(ws memdb.WatchSet, t string) (memdb.ResultIterator, error) {
6010	txn := s.db.ReadTxn()
6011
6012	iter, err := txn.Get("scaling_policy", "type_prefix", t)
6013	if err != nil {
6014		return nil, err
6015	}
6016
6017	ws.Add(iter.WatchCh())
6018	return iter, nil
6019}
6020
6021func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace, typ string) (memdb.ResultIterator, error) {
6022	txn := s.db.ReadTxn()
6023
6024	iter, err := txn.Get("scaling_policy", "target_prefix", namespace)
6025	if err != nil {
6026		return nil, err
6027	}
6028
6029	ws.Add(iter.WatchCh())
6030
6031	// Wrap the iterator in a filter to exact match the namespace
6032	iter = memdb.NewFilterIterator(iter, scalingPolicyNamespaceFilter(namespace))
6033
6034	// If policy type is specified as well, wrap again
6035	if typ != "" {
6036		iter = memdb.NewFilterIterator(iter, func(raw interface{}) bool {
6037			p, ok := raw.(*structs.ScalingPolicy)
6038			if !ok {
6039				return true
6040			}
6041			return !strings.HasPrefix(p.Type, typ)
6042		})
6043	}
6044
6045	return iter, nil
6046}
6047
6048func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID, policyType string) (memdb.ResultIterator,
6049	error) {
6050	txn := s.db.ReadTxn()
6051	iter, err := s.ScalingPoliciesByJobTxn(ws, namespace, jobID, txn)
6052	if err != nil {
6053		return nil, err
6054	}
6055
6056	if policyType == "" {
6057		return iter, nil
6058	}
6059
6060	filter := func(raw interface{}) bool {
6061		p, ok := raw.(*structs.ScalingPolicy)
6062		if !ok {
6063			return true
6064		}
6065		return policyType != p.Type
6066	}
6067
6068	return memdb.NewFilterIterator(iter, filter), nil
6069}
6070
6071func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID string,
6072	txn *txn) (memdb.ResultIterator, error) {
6073
6074	iter, err := txn.Get("scaling_policy", "target_prefix", namespace, jobID)
6075	if err != nil {
6076		return nil, err
6077	}
6078
6079	ws.Add(iter.WatchCh())
6080
6081	filter := func(raw interface{}) bool {
6082		d, ok := raw.(*structs.ScalingPolicy)
6083		if !ok {
6084			return true
6085		}
6086
6087		return d.Target[structs.ScalingTargetJob] != jobID
6088	}
6089
6090	// Wrap the iterator in a filter
6091	wrap := memdb.NewFilterIterator(iter, filter)
6092	return wrap, nil
6093}
6094
6095func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.ScalingPolicy, error) {
6096	txn := s.db.ReadTxn()
6097
6098	watchCh, existing, err := txn.FirstWatch("scaling_policy", "id", id)
6099	if err != nil {
6100		return nil, fmt.Errorf("scaling_policy lookup failed: %v", err)
6101	}
6102	ws.Add(watchCh)
6103
6104	if existing != nil {
6105		return existing.(*structs.ScalingPolicy), nil
6106	}
6107
6108	return nil, nil
6109}
6110
6111// ScalingPolicyByTargetAndType returns a fully-qualified policy against a target and policy type,
6112// or nil if it does not exist. This method does not honor the watchset on the policy type, just the target.
6113func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[string]string, typ string) (*structs.ScalingPolicy,
6114	error) {
6115	txn := s.db.ReadTxn()
6116
6117	namespace := target[structs.ScalingTargetNamespace]
6118	job := target[structs.ScalingTargetJob]
6119	group := target[structs.ScalingTargetGroup]
6120	task := target[structs.ScalingTargetTask]
6121
6122	it, err := txn.Get("scaling_policy", "target", namespace, job, group, task)
6123	if err != nil {
6124		return nil, fmt.Errorf("scaling_policy lookup failed: %v", err)
6125	}
6126
6127	ws.Add(it.WatchCh())
6128
6129	// Check for type
6130	var existing *structs.ScalingPolicy
6131	for raw := it.Next(); raw != nil; raw = it.Next() {
6132		p := raw.(*structs.ScalingPolicy)
6133		if p.Type == typ {
6134			existing = p
6135			break
6136		}
6137	}
6138
6139	if existing != nil {
6140		return existing, nil
6141	}
6142
6143	return nil, nil
6144}
6145
6146func (s *StateStore) ScalingPoliciesByIDPrefix(ws memdb.WatchSet, namespace string, prefix string) (memdb.ResultIterator, error) {
6147	txn := s.db.ReadTxn()
6148
6149	iter, err := txn.Get("scaling_policy", "id_prefix", prefix)
6150	if err != nil {
6151		return nil, fmt.Errorf("scaling policy lookup failed: %v", err)
6152	}
6153
6154	ws.Add(iter.WatchCh())
6155
6156	iter = memdb.NewFilterIterator(iter, scalingPolicyNamespaceFilter(namespace))
6157
6158	return iter, nil
6159}
6160
6161// scalingPolicyNamespaceFilter returns a filter function that filters all
6162// scaling policies not targeting the given namespace.
6163func scalingPolicyNamespaceFilter(namespace string) func(interface{}) bool {
6164	return func(raw interface{}) bool {
6165		p, ok := raw.(*structs.ScalingPolicy)
6166		if !ok {
6167			return true
6168		}
6169
6170		return p.Target[structs.ScalingTargetNamespace] != namespace
6171	}
6172}
6173
6174// StateSnapshot is used to provide a point-in-time snapshot
6175type StateSnapshot struct {
6176	StateStore
6177}
6178
6179// DenormalizeAllocationsMap takes in a map of nodes to allocations, and queries the
6180// Allocation for each of the Allocation diffs and merges the updated attributes with
6181// the existing Allocation, and attaches the Job provided
6182func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation) error {
6183	for nodeID, allocs := range nodeAllocations {
6184		denormalizedAllocs, err := s.DenormalizeAllocationSlice(allocs)
6185		if err != nil {
6186			return err
6187		}
6188
6189		nodeAllocations[nodeID] = denormalizedAllocs
6190	}
6191	return nil
6192}
6193
6194// DenormalizeAllocationSlice queries the Allocation for each allocation diff
6195// represented as an Allocation and merges the updated attributes with the existing
6196// Allocation, and attaches the Job provided.
6197//
6198// This should only be called on terminal allocs, particularly stopped or preempted allocs
6199func (s *StateSnapshot) DenormalizeAllocationSlice(allocs []*structs.Allocation) ([]*structs.Allocation, error) {
6200	allocDiffs := make([]*structs.AllocationDiff, len(allocs))
6201	for i, alloc := range allocs {
6202		allocDiffs[i] = alloc.AllocationDiff()
6203	}
6204
6205	return s.DenormalizeAllocationDiffSlice(allocDiffs)
6206}
6207
6208// DenormalizeAllocationDiffSlice queries the Allocation for each AllocationDiff and merges
6209// the updated attributes with the existing Allocation, and attaches the Job provided.
6210//
6211// This should only be called on terminal alloc, particularly stopped or preempted allocs
6212func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.AllocationDiff) ([]*structs.Allocation, error) {
6213	// Output index for denormalized Allocations
6214	j := 0
6215
6216	denormalizedAllocs := make([]*structs.Allocation, len(allocDiffs))
6217	for _, allocDiff := range allocDiffs {
6218		alloc, err := s.AllocByID(nil, allocDiff.ID)
6219		if err != nil {
6220			return nil, fmt.Errorf("alloc lookup failed: %v", err)
6221		}
6222		if alloc == nil {
6223			return nil, fmt.Errorf("alloc %v doesn't exist", allocDiff.ID)
6224		}
6225
6226		// Merge the updates to the Allocation.  Don't update alloc.Job for terminal allocs
6227		// so alloc refers to the latest Job view before destruction and to ease handler implementations
6228		allocCopy := alloc.Copy()
6229
6230		if allocDiff.PreemptedByAllocation != "" {
6231			allocCopy.PreemptedByAllocation = allocDiff.PreemptedByAllocation
6232			allocCopy.DesiredDescription = getPreemptedAllocDesiredDescription(allocDiff.PreemptedByAllocation)
6233			allocCopy.DesiredStatus = structs.AllocDesiredStatusEvict
6234		} else {
6235			// If alloc is a stopped alloc
6236			allocCopy.DesiredDescription = allocDiff.DesiredDescription
6237			allocCopy.DesiredStatus = structs.AllocDesiredStatusStop
6238			if allocDiff.ClientStatus != "" {
6239				allocCopy.ClientStatus = allocDiff.ClientStatus
6240			}
6241			if allocDiff.FollowupEvalID != "" {
6242				allocCopy.FollowupEvalID = allocDiff.FollowupEvalID
6243			}
6244		}
6245		if allocDiff.ModifyTime != 0 {
6246			allocCopy.ModifyTime = allocDiff.ModifyTime
6247		}
6248
6249		// Update the allocDiff in the slice to equal the denormalized alloc
6250		denormalizedAllocs[j] = allocCopy
6251		j++
6252	}
6253	// Retain only the denormalized Allocations in the slice
6254	denormalizedAllocs = denormalizedAllocs[:j]
6255	return denormalizedAllocs, nil
6256}
6257
6258func getPreemptedAllocDesiredDescription(preemptedByAllocID string) string {
6259	return fmt.Sprintf("Preempted by alloc ID %v", preemptedByAllocID)
6260}
6261
6262// StateRestore is used to optimize the performance when
6263// restoring state by only using a single large transaction
6264// instead of thousands of sub transactions
6265type StateRestore struct {
6266	txn *txn
6267}
6268
6269// Abort is used to abort the restore operation
6270func (s *StateRestore) Abort() {
6271	s.txn.Abort()
6272}
6273
6274// Commit is used to commit the restore operation
6275func (s *StateRestore) Commit() error {
6276	return s.txn.Commit()
6277}
6278
6279// NodeRestore is used to restore a node
6280func (r *StateRestore) NodeRestore(node *structs.Node) error {
6281	if err := r.txn.Insert("nodes", node); err != nil {
6282		return fmt.Errorf("node insert failed: %v", err)
6283	}
6284	return nil
6285}
6286
6287// JobRestore is used to restore a job
6288func (r *StateRestore) JobRestore(job *structs.Job) error {
6289	if err := r.txn.Insert("jobs", job); err != nil {
6290		return fmt.Errorf("job insert failed: %v", err)
6291	}
6292	return nil
6293}
6294
6295// EvalRestore is used to restore an evaluation
6296func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error {
6297	if err := r.txn.Insert("evals", eval); err != nil {
6298		return fmt.Errorf("eval insert failed: %v", err)
6299	}
6300	return nil
6301}
6302
6303// AllocRestore is used to restore an allocation
6304func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error {
6305	if err := r.txn.Insert("allocs", alloc); err != nil {
6306		return fmt.Errorf("alloc insert failed: %v", err)
6307	}
6308	return nil
6309}
6310
6311// IndexRestore is used to restore an index
6312func (r *StateRestore) IndexRestore(idx *IndexEntry) error {
6313	if err := r.txn.Insert("index", idx); err != nil {
6314		return fmt.Errorf("index insert failed: %v", err)
6315	}
6316	return nil
6317}
6318
6319// PeriodicLaunchRestore is used to restore a periodic launch.
6320func (r *StateRestore) PeriodicLaunchRestore(launch *structs.PeriodicLaunch) error {
6321	if err := r.txn.Insert("periodic_launch", launch); err != nil {
6322		return fmt.Errorf("periodic launch insert failed: %v", err)
6323	}
6324	return nil
6325}
6326
6327// JobSummaryRestore is used to restore a job summary
6328func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error {
6329	if err := r.txn.Insert("job_summary", jobSummary); err != nil {
6330		return fmt.Errorf("job summary insert failed: %v", err)
6331	}
6332	return nil
6333}
6334
6335// JobVersionRestore is used to restore a job version
6336func (r *StateRestore) JobVersionRestore(version *structs.Job) error {
6337	if err := r.txn.Insert("job_version", version); err != nil {
6338		return fmt.Errorf("job version insert failed: %v", err)
6339	}
6340	return nil
6341}
6342
6343// DeploymentRestore is used to restore a deployment
6344func (r *StateRestore) DeploymentRestore(deployment *structs.Deployment) error {
6345	if err := r.txn.Insert("deployment", deployment); err != nil {
6346		return fmt.Errorf("deployment insert failed: %v", err)
6347	}
6348	return nil
6349}
6350
6351// VaultAccessorRestore is used to restore a vault accessor
6352func (r *StateRestore) VaultAccessorRestore(accessor *structs.VaultAccessor) error {
6353	if err := r.txn.Insert("vault_accessors", accessor); err != nil {
6354		return fmt.Errorf("vault accessor insert failed: %v", err)
6355	}
6356	return nil
6357}
6358
6359// SITokenAccessorRestore is used to restore an SI token accessor
6360func (r *StateRestore) SITokenAccessorRestore(accessor *structs.SITokenAccessor) error {
6361	if err := r.txn.Insert(siTokenAccessorTable, accessor); err != nil {
6362		return errors.Wrap(err, "si token accessor insert failed")
6363	}
6364	return nil
6365}
6366
6367// ACLPolicyRestore is used to restore an ACL policy
6368func (r *StateRestore) ACLPolicyRestore(policy *structs.ACLPolicy) error {
6369	if err := r.txn.Insert("acl_policy", policy); err != nil {
6370		return fmt.Errorf("inserting acl policy failed: %v", err)
6371	}
6372	return nil
6373}
6374
6375// ACLTokenRestore is used to restore an ACL token
6376func (r *StateRestore) ACLTokenRestore(token *structs.ACLToken) error {
6377	if err := r.txn.Insert("acl_token", token); err != nil {
6378		return fmt.Errorf("inserting acl token failed: %v", err)
6379	}
6380	return nil
6381}
6382
6383// OneTimeTokenRestore is used to restore a one-time token
6384func (r *StateRestore) OneTimeTokenRestore(token *structs.OneTimeToken) error {
6385	if err := r.txn.Insert("one_time_token", token); err != nil {
6386		return fmt.Errorf("inserting one-time token failed: %v", err)
6387	}
6388	return nil
6389}
6390
6391func (r *StateRestore) SchedulerConfigRestore(schedConfig *structs.SchedulerConfiguration) error {
6392	if err := r.txn.Insert("scheduler_config", schedConfig); err != nil {
6393		return fmt.Errorf("inserting scheduler config failed: %s", err)
6394	}
6395	return nil
6396}
6397
6398func (r *StateRestore) ClusterMetadataRestore(meta *structs.ClusterMetadata) error {
6399	if err := r.txn.Insert("cluster_meta", meta); err != nil {
6400		return fmt.Errorf("inserting cluster meta failed: %v", err)
6401	}
6402	return nil
6403}
6404
6405// ScalingPolicyRestore is used to restore a scaling policy
6406func (r *StateRestore) ScalingPolicyRestore(scalingPolicy *structs.ScalingPolicy) error {
6407	if err := r.txn.Insert("scaling_policy", scalingPolicy); err != nil {
6408		return fmt.Errorf("scaling policy insert failed: %v", err)
6409	}
6410	return nil
6411}
6412
6413// CSIPluginRestore is used to restore a CSI plugin
6414func (r *StateRestore) CSIPluginRestore(plugin *structs.CSIPlugin) error {
6415	if err := r.txn.Insert("csi_plugins", plugin); err != nil {
6416		return fmt.Errorf("csi plugin insert failed: %v", err)
6417	}
6418	return nil
6419}
6420
6421// CSIVolumeRestore is used to restore a CSI volume
6422func (r *StateRestore) CSIVolumeRestore(volume *structs.CSIVolume) error {
6423	if err := r.txn.Insert("csi_volumes", volume); err != nil {
6424		return fmt.Errorf("csi volume insert failed: %v", err)
6425	}
6426	return nil
6427}
6428
6429// ScalingEventsRestore is used to restore scaling events for a job
6430func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) error {
6431	if err := r.txn.Insert("scaling_event", jobEvents); err != nil {
6432		return fmt.Errorf("scaling event insert failed: %v", err)
6433	}
6434	return nil
6435}
6436
6437// NamespaceRestore is used to restore a namespace
6438func (r *StateRestore) NamespaceRestore(ns *structs.Namespace) error {
6439	if err := r.txn.Insert(TableNamespaces, ns); err != nil {
6440		return fmt.Errorf("namespace insert failed: %v", err)
6441	}
6442	return nil
6443}
6444