1package scheduler 2 3import ( 4 "fmt" 5 "sync" 6 "time" 7 8 testing "github.com/mitchellh/go-testing-interface" 9 10 "github.com/hashicorp/go-memdb" 11 "github.com/hashicorp/nomad/helper/testlog" 12 "github.com/hashicorp/nomad/nomad/state" 13 "github.com/hashicorp/nomad/nomad/structs" 14) 15 16// RejectPlan is used to always reject the entire plan and force a state refresh 17type RejectPlan struct { 18 Harness *Harness 19} 20 21func (r *RejectPlan) SubmitPlan(*structs.Plan) (*structs.PlanResult, State, error) { 22 result := new(structs.PlanResult) 23 result.RefreshIndex = r.Harness.NextIndex() 24 return result, r.Harness.State, nil 25} 26 27func (r *RejectPlan) UpdateEval(eval *structs.Evaluation) error { 28 return nil 29} 30 31func (r *RejectPlan) CreateEval(*structs.Evaluation) error { 32 return nil 33} 34 35func (r *RejectPlan) ReblockEval(*structs.Evaluation) error { 36 return nil 37} 38 39// Harness is a lightweight testing harness for schedulers. It manages a state 40// store copy and provides the planner interface. It can be extended for various 41// testing uses or for invoking the scheduler without side effects. 42type Harness struct { 43 t testing.T 44 State *state.StateStore 45 46 Planner Planner 47 planLock sync.Mutex 48 49 Plans []*structs.Plan 50 Evals []*structs.Evaluation 51 CreateEvals []*structs.Evaluation 52 ReblockEvals []*structs.Evaluation 53 54 nextIndex uint64 55 nextIndexLock sync.Mutex 56 57 optimizePlan bool 58} 59 60// NewHarness is used to make a new testing harness 61func NewHarness(t testing.T) *Harness { 62 state := state.TestStateStore(t) 63 h := &Harness{ 64 t: t, 65 State: state, 66 nextIndex: 1, 67 } 68 return h 69} 70 71// NewHarnessWithState creates a new harness with the given state for testing 72// purposes. 73func NewHarnessWithState(t testing.T, state *state.StateStore) *Harness { 74 return &Harness{ 75 t: t, 76 State: state, 77 nextIndex: 1, 78 } 79} 80 81// SubmitPlan is used to handle plan submission 82func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, error) { 83 // Ensure sequential plan application 84 h.planLock.Lock() 85 defer h.planLock.Unlock() 86 87 // Store the plan 88 h.Plans = append(h.Plans, plan) 89 90 // Check for custom planner 91 if h.Planner != nil { 92 return h.Planner.SubmitPlan(plan) 93 } 94 95 // Get the index 96 index := h.NextIndex() 97 98 // Prepare the result 99 result := new(structs.PlanResult) 100 result.NodeUpdate = plan.NodeUpdate 101 result.NodeAllocation = plan.NodeAllocation 102 result.NodePreemptions = plan.NodePreemptions 103 result.AllocIndex = index 104 105 // Flatten evicts and allocs 106 now := time.Now().UTC().UnixNano() 107 108 allocsUpdated := make([]*structs.Allocation, 0, len(result.NodeAllocation)) 109 for _, allocList := range plan.NodeAllocation { 110 allocsUpdated = append(allocsUpdated, allocList...) 111 } 112 updateCreateTimestamp(allocsUpdated, now) 113 114 // Setup the update request 115 req := structs.ApplyPlanResultsRequest{ 116 AllocUpdateRequest: structs.AllocUpdateRequest{ 117 Job: plan.Job, 118 }, 119 Deployment: plan.Deployment, 120 DeploymentUpdates: plan.DeploymentUpdates, 121 EvalID: plan.EvalID, 122 } 123 124 if h.optimizePlan { 125 stoppedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodeUpdate)) 126 for _, updateList := range plan.NodeUpdate { 127 for _, stoppedAlloc := range updateList { 128 stoppedAllocDiffs = append(stoppedAllocDiffs, stoppedAlloc.AllocationDiff()) 129 } 130 } 131 req.AllocsStopped = stoppedAllocDiffs 132 133 req.AllocsUpdated = allocsUpdated 134 135 preemptedAllocDiffs := make([]*structs.AllocationDiff, 0, len(result.NodePreemptions)) 136 for _, preemptions := range plan.NodePreemptions { 137 for _, preemptedAlloc := range preemptions { 138 allocDiff := preemptedAlloc.AllocationDiff() 139 allocDiff.ModifyTime = now 140 preemptedAllocDiffs = append(preemptedAllocDiffs, allocDiff) 141 } 142 } 143 req.AllocsPreempted = preemptedAllocDiffs 144 } else { 145 // COMPAT 0.11: Handles unoptimized log format 146 var allocs []*structs.Allocation 147 148 allocsStopped := make([]*structs.Allocation, 0, len(result.NodeUpdate)) 149 for _, updateList := range plan.NodeUpdate { 150 allocsStopped = append(allocsStopped, updateList...) 151 } 152 allocs = append(allocs, allocsStopped...) 153 154 allocs = append(allocs, allocsUpdated...) 155 updateCreateTimestamp(allocs, now) 156 157 req.Alloc = allocs 158 159 // Set modify time for preempted allocs and flatten them 160 var preemptedAllocs []*structs.Allocation 161 for _, preemptions := range result.NodePreemptions { 162 for _, alloc := range preemptions { 163 alloc.ModifyTime = now 164 preemptedAllocs = append(preemptedAllocs, alloc) 165 } 166 } 167 168 req.NodePreemptions = preemptedAllocs 169 } 170 171 // Apply the full plan 172 err := h.State.UpsertPlanResults(index, &req) 173 return result, nil, err 174} 175 176// OptimizePlan is a function used only for Harness to help set the optimzePlan field, 177// since Harness doesn't have access to a Server object 178func (h *Harness) OptimizePlan(optimize bool) { 179 h.optimizePlan = optimize 180} 181 182func updateCreateTimestamp(allocations []*structs.Allocation, now int64) { 183 // Set the time the alloc was applied for the first time. This can be used 184 // to approximate the scheduling time. 185 for _, alloc := range allocations { 186 if alloc.CreateTime == 0 { 187 alloc.CreateTime = now 188 } 189 } 190} 191 192func (h *Harness) UpdateEval(eval *structs.Evaluation) error { 193 // Ensure sequential plan application 194 h.planLock.Lock() 195 defer h.planLock.Unlock() 196 197 // Store the eval 198 h.Evals = append(h.Evals, eval) 199 200 // Check for custom planner 201 if h.Planner != nil { 202 return h.Planner.UpdateEval(eval) 203 } 204 return nil 205} 206 207func (h *Harness) CreateEval(eval *structs.Evaluation) error { 208 // Ensure sequential plan application 209 h.planLock.Lock() 210 defer h.planLock.Unlock() 211 212 // Store the eval 213 h.CreateEvals = append(h.CreateEvals, eval) 214 215 // Check for custom planner 216 if h.Planner != nil { 217 return h.Planner.CreateEval(eval) 218 } 219 return nil 220} 221 222func (h *Harness) ReblockEval(eval *structs.Evaluation) error { 223 // Ensure sequential plan application 224 h.planLock.Lock() 225 defer h.planLock.Unlock() 226 227 // Check that the evaluation was already blocked. 228 ws := memdb.NewWatchSet() 229 old, err := h.State.EvalByID(ws, eval.ID) 230 if err != nil { 231 return err 232 } 233 234 if old == nil { 235 return fmt.Errorf("evaluation does not exist to be reblocked") 236 } 237 if old.Status != structs.EvalStatusBlocked { 238 return fmt.Errorf("evaluation %q is not already in a blocked state", old.ID) 239 } 240 241 h.ReblockEvals = append(h.ReblockEvals, eval) 242 return nil 243} 244 245// NextIndex returns the next index 246func (h *Harness) NextIndex() uint64 { 247 h.nextIndexLock.Lock() 248 defer h.nextIndexLock.Unlock() 249 idx := h.nextIndex 250 h.nextIndex += 1 251 return idx 252} 253 254// Snapshot is used to snapshot the current state 255func (h *Harness) Snapshot() State { 256 snap, _ := h.State.Snapshot() 257 return snap 258} 259 260// Scheduler is used to return a new scheduler from 261// a snapshot of current state using the harness for planning. 262func (h *Harness) Scheduler(factory Factory) Scheduler { 263 logger := testlog.HCLogger(h.t) 264 return factory(logger, h.Snapshot(), h) 265} 266 267// Process is used to process an evaluation given a factory 268// function to create the scheduler 269func (h *Harness) Process(factory Factory, eval *structs.Evaluation) error { 270 sched := h.Scheduler(factory) 271 return sched.Process(eval) 272} 273 274func (h *Harness) AssertEvalStatus(t testing.T, state string) { 275 if len(h.Evals) != 1 { 276 t.Fatalf("bad: %#v", h.Evals) 277 } 278 update := h.Evals[0] 279 280 if update.Status != state { 281 t.Fatalf("bad: %#v", update) 282 } 283} 284