1package scheduler
2
3import (
4	"fmt"
5	"sync"
6	"time"
7
8	testing "github.com/mitchellh/go-testing-interface"
9
10	"github.com/hashicorp/go-memdb"
11	"github.com/hashicorp/nomad/helper/testlog"
12	"github.com/hashicorp/nomad/nomad/state"
13	"github.com/hashicorp/nomad/nomad/structs"
14)
15
16// RejectPlan is used to always reject the entire plan and force a state refresh
17type RejectPlan struct {
18	Harness *Harness
19}
20
21func (r *RejectPlan) SubmitPlan(*structs.Plan) (*structs.PlanResult, State, error) {
22	result := new(structs.PlanResult)
23	result.RefreshIndex = r.Harness.NextIndex()
24	return result, r.Harness.State, nil
25}
26
27func (r *RejectPlan) UpdateEval(eval *structs.Evaluation) error {
28	return nil
29}
30
31func (r *RejectPlan) CreateEval(*structs.Evaluation) error {
32	return nil
33}
34
35func (r *RejectPlan) ReblockEval(*structs.Evaluation) error {
36	return nil
37}
38
39// Harness is a lightweight testing harness for schedulers. It manages a state
40// store copy and provides the planner interface. It can be extended for various
41// testing uses or for invoking the scheduler without side effects.
42type Harness struct {
43	t     testing.T
44	State *state.StateStore
45
46	Planner  Planner
47	planLock sync.Mutex
48
49	Plans        []*structs.Plan
50	Evals        []*structs.Evaluation
51	CreateEvals  []*structs.Evaluation
52	ReblockEvals []*structs.Evaluation
53
54	nextIndex     uint64
55	nextIndexLock sync.Mutex
56
57	optimizePlan bool
58}
59
60// NewHarness is used to make a new testing harness
61func NewHarness(t testing.T) *Harness {
62	state := state.TestStateStore(t)
63	h := &Harness{
64		t:         t,
65		State:     state,
66		nextIndex: 1,
67	}
68	return h
69}
70
71// NewHarnessWithState creates a new harness with the given state for testing
72// purposes.
73func NewHarnessWithState(t testing.T, state *state.StateStore) *Harness {
74	return &Harness{
75		t:         t,
76		State:     state,
77		nextIndex: 1,
78	}
79}
80
81// SubmitPlan is used to handle plan submission
82func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, error) {
83	// Ensure sequential plan application
84	h.planLock.Lock()
85	defer h.planLock.Unlock()
86
87	// Store the plan
88	h.Plans = append(h.Plans, plan)
89
90	// Check for custom planner
91	if h.Planner != nil {
92		return h.Planner.SubmitPlan(plan)
93	}
94
95	// Get the index
96	index := h.NextIndex()
97
98	// Prepare the result
99	result := new(structs.PlanResult)
100	result.NodeUpdate = plan.NodeUpdate
101	result.NodeAllocation = plan.NodeAllocation
102	result.NodePreemptions = plan.NodePreemptions
103	result.AllocIndex = index
104
105	// Flatten evicts and allocs
106	now := time.Now().UTC().UnixNano()
107
108	allocsUpdated := make([]*structs.Allocation, 0, len(result.NodeAllocation))
109	for _, allocList := range plan.NodeAllocation {
110		allocsUpdated = append(allocsUpdated, allocList...)
111	}
112	updateCreateTimestamp(allocsUpdated, now)
113
114	// Setup the update request
115	req := structs.ApplyPlanResultsRequest{
116		AllocUpdateRequest: structs.AllocUpdateRequest{
117			Job: plan.Job,
118		},
119		Deployment:        plan.Deployment,
120		DeploymentUpdates: plan.DeploymentUpdates,
121		EvalID:            plan.EvalID,
122	}
123
124	if h.optimizePlan {
125		stoppedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodeUpdate))
126		for _, updateList := range plan.NodeUpdate {
127			for _, stoppedAlloc := range updateList {
128				stoppedAllocDiffs = append(stoppedAllocDiffs, stoppedAlloc.AllocationDiff())
129			}
130		}
131		req.AllocsStopped = stoppedAllocDiffs
132
133		req.AllocsUpdated = allocsUpdated
134
135		preemptedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodePreemptions))
136		for _, preemptions := range plan.NodePreemptions {
137			for _, preemptedAlloc := range preemptions {
138				allocDiff := preemptedAlloc.AllocationDiff()
139				allocDiff.ModifyTime = now
140				preemptedAllocDiffs = append(preemptedAllocDiffs, allocDiff)
141			}
142		}
143		req.AllocsPreempted = preemptedAllocDiffs
144	} else {
145		// COMPAT 0.11: Handles unoptimized log format
146		var allocs []*structs.Allocation
147
148		allocsStopped := make([]*structs.Allocation, 0, len(result.NodeUpdate))
149		for _, updateList := range plan.NodeUpdate {
150			allocsStopped = append(allocsStopped, updateList...)
151		}
152		allocs = append(allocs, allocsStopped...)
153
154		allocs = append(allocs, allocsUpdated...)
155		updateCreateTimestamp(allocs, now)
156
157		req.Alloc = allocs
158
159		// Set modify time for preempted allocs and flatten them
160		var preemptedAllocs []*structs.Allocation
161		for _, preemptions := range result.NodePreemptions {
162			for _, alloc := range preemptions {
163				alloc.ModifyTime = now
164				preemptedAllocs = append(preemptedAllocs, alloc)
165			}
166		}
167
168		req.NodePreemptions = preemptedAllocs
169	}
170
171	// Apply the full plan
172	err := h.State.UpsertPlanResults(index, &req)
173	return result, nil, err
174}
175
176// OptimizePlan is a function used only for Harness to help set the optimzePlan field,
177// since Harness doesn't have access to a Server object
178func (h *Harness) OptimizePlan(optimize bool) {
179	h.optimizePlan = optimize
180}
181
182func updateCreateTimestamp(allocations []*structs.Allocation, now int64) {
183	// Set the time the alloc was applied for the first time. This can be used
184	// to approximate the scheduling time.
185	for _, alloc := range allocations {
186		if alloc.CreateTime == 0 {
187			alloc.CreateTime = now
188		}
189	}
190}
191
192func (h *Harness) UpdateEval(eval *structs.Evaluation) error {
193	// Ensure sequential plan application
194	h.planLock.Lock()
195	defer h.planLock.Unlock()
196
197	// Store the eval
198	h.Evals = append(h.Evals, eval)
199
200	// Check for custom planner
201	if h.Planner != nil {
202		return h.Planner.UpdateEval(eval)
203	}
204	return nil
205}
206
207func (h *Harness) CreateEval(eval *structs.Evaluation) error {
208	// Ensure sequential plan application
209	h.planLock.Lock()
210	defer h.planLock.Unlock()
211
212	// Store the eval
213	h.CreateEvals = append(h.CreateEvals, eval)
214
215	// Check for custom planner
216	if h.Planner != nil {
217		return h.Planner.CreateEval(eval)
218	}
219	return nil
220}
221
222func (h *Harness) ReblockEval(eval *structs.Evaluation) error {
223	// Ensure sequential plan application
224	h.planLock.Lock()
225	defer h.planLock.Unlock()
226
227	// Check that the evaluation was already blocked.
228	ws := memdb.NewWatchSet()
229	old, err := h.State.EvalByID(ws, eval.ID)
230	if err != nil {
231		return err
232	}
233
234	if old == nil {
235		return fmt.Errorf("evaluation does not exist to be reblocked")
236	}
237	if old.Status != structs.EvalStatusBlocked {
238		return fmt.Errorf("evaluation %q is not already in a blocked state", old.ID)
239	}
240
241	h.ReblockEvals = append(h.ReblockEvals, eval)
242	return nil
243}
244
245// NextIndex returns the next index
246func (h *Harness) NextIndex() uint64 {
247	h.nextIndexLock.Lock()
248	defer h.nextIndexLock.Unlock()
249	idx := h.nextIndex
250	h.nextIndex += 1
251	return idx
252}
253
254// Snapshot is used to snapshot the current state
255func (h *Harness) Snapshot() State {
256	snap, _ := h.State.Snapshot()
257	return snap
258}
259
260// Scheduler is used to return a new scheduler from
261// a snapshot of current state using the harness for planning.
262func (h *Harness) Scheduler(factory Factory) Scheduler {
263	logger := testlog.HCLogger(h.t)
264	return factory(logger, h.Snapshot(), h)
265}
266
267// Process is used to process an evaluation given a factory
268// function to create the scheduler
269func (h *Harness) Process(factory Factory, eval *structs.Evaluation) error {
270	sched := h.Scheduler(factory)
271	return sched.Process(eval)
272}
273
274func (h *Harness) AssertEvalStatus(t testing.T, state string) {
275	if len(h.Evals) != 1 {
276		t.Fatalf("bad: %#v", h.Evals)
277	}
278	update := h.Evals[0]
279
280	if update.Status != state {
281		t.Fatalf("bad: %#v", update)
282	}
283}
284