1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.scheduler 19 20import java.nio.ByteBuffer 21 22import scala.collection.mutable.HashMap 23 24import org.mockito.Matchers.{anyInt, anyString, eq => meq} 25import org.mockito.Mockito.{atLeast, atMost, never, spy, verify, when} 26import org.scalatest.BeforeAndAfterEach 27 28import org.apache.spark._ 29import org.apache.spark.internal.config 30import org.apache.spark.internal.Logging 31 32class FakeSchedulerBackend extends SchedulerBackend { 33 def start() {} 34 def stop() {} 35 def reviveOffers() {} 36 def defaultParallelism(): Int = 1 37} 38 39class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach 40 with Logging { 41 42 var failedTaskSetException: Option[Throwable] = None 43 var failedTaskSetReason: String = null 44 var failedTaskSet = false 45 46 var taskScheduler: TaskSchedulerImpl = null 47 var dagScheduler: DAGScheduler = null 48 49 override def beforeEach(): Unit = { 50 super.beforeEach() 51 failedTaskSet = false 52 failedTaskSetException = None 53 failedTaskSetReason = null 54 } 55 56 override def afterEach(): Unit = { 57 super.afterEach() 58 if (taskScheduler != null) { 59 taskScheduler.stop() 60 taskScheduler = null 61 } 62 if (dagScheduler != null) { 63 dagScheduler.stop() 64 dagScheduler = null 65 } 66 } 67 68 def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = { 69 val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite") 70 confs.foreach { case (k, v) => 71 conf.set(k, v) 72 } 73 sc = new SparkContext(conf) 74 taskScheduler = new TaskSchedulerImpl(sc) 75 taskScheduler.initialize(new FakeSchedulerBackend) 76 // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. 77 dagScheduler = new DAGScheduler(sc, taskScheduler) { 78 override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {} 79 override def executorAdded(execId: String, host: String): Unit = {} 80 override def taskSetFailed( 81 taskSet: TaskSet, 82 reason: String, 83 exception: Option[Throwable]): Unit = { 84 // Normally the DAGScheduler puts this in the event loop, which will eventually fail 85 // dependent jobs 86 failedTaskSet = true 87 failedTaskSetReason = reason 88 failedTaskSetException = exception 89 } 90 } 91 taskScheduler 92 } 93 94 test("Scheduler does not always schedule tasks on the same workers") { 95 val taskScheduler = setupScheduler() 96 val numFreeCores = 1 97 val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores), 98 new WorkerOffer("executor1", "host1", numFreeCores)) 99 // Repeatedly try to schedule a 1-task job, and make sure that it doesn't always 100 // get scheduled on the same executor. While there is a chance this test will fail 101 // because the task randomly gets placed on the first executor all 1000 times, the 102 // probability of that happening is 2^-1000 (so sufficiently small to be considered 103 // negligible). 104 val numTrials = 1000 105 val selectedExecutorIds = 1.to(numTrials).map { _ => 106 val taskSet = FakeTask.createTaskSet(1) 107 taskScheduler.submitTasks(taskSet) 108 val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten 109 assert(1 === taskDescriptions.length) 110 taskDescriptions(0).executorId 111 } 112 val count = selectedExecutorIds.count(_ == workerOffers(0).executorId) 113 assert(count > 0) 114 assert(count < numTrials) 115 assert(!failedTaskSet) 116 } 117 118 test("Scheduler correctly accounts for multiple CPUs per task") { 119 val taskCpus = 2 120 val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) 121 // Give zero core offers. Should not generate any tasks 122 val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 0), 123 new WorkerOffer("executor1", "host1", 0)) 124 val taskSet = FakeTask.createTaskSet(1) 125 taskScheduler.submitTasks(taskSet) 126 var taskDescriptions = taskScheduler.resourceOffers(zeroCoreWorkerOffers).flatten 127 assert(0 === taskDescriptions.length) 128 129 // No tasks should run as we only have 1 core free. 130 val numFreeCores = 1 131 val singleCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores), 132 new WorkerOffer("executor1", "host1", numFreeCores)) 133 taskScheduler.submitTasks(taskSet) 134 taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten 135 assert(0 === taskDescriptions.length) 136 137 // Now change the offers to have 2 cores in one executor and verify if it 138 // is chosen. 139 val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus), 140 new WorkerOffer("executor1", "host1", numFreeCores)) 141 taskScheduler.submitTasks(taskSet) 142 taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten 143 assert(1 === taskDescriptions.length) 144 assert("executor0" === taskDescriptions(0).executorId) 145 assert(!failedTaskSet) 146 } 147 148 test("Scheduler does not crash when tasks are not serializable") { 149 val taskCpus = 2 150 val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) 151 val numFreeCores = 1 152 val taskSet = new TaskSet( 153 Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) 154 val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus), 155 new WorkerOffer("executor1", "host1", numFreeCores)) 156 taskScheduler.submitTasks(taskSet) 157 var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten 158 assert(0 === taskDescriptions.length) 159 assert(failedTaskSet) 160 assert(failedTaskSetReason.contains("Failed to serialize task")) 161 162 // Now check that we can still submit tasks 163 // Even if one of the task sets has not-serializable tasks, the other task set should 164 // still be processed without error 165 taskScheduler.submitTasks(FakeTask.createTaskSet(1)) 166 taskScheduler.submitTasks(taskSet) 167 taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten 168 assert(taskDescriptions.map(_.executorId) === Seq("executor0")) 169 } 170 171 test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") { 172 val taskScheduler = setupScheduler() 173 val attempt1 = FakeTask.createTaskSet(1, 0) 174 val attempt2 = FakeTask.createTaskSet(1, 1) 175 taskScheduler.submitTasks(attempt1) 176 intercept[IllegalStateException] { taskScheduler.submitTasks(attempt2) } 177 178 // OK to submit multiple if previous attempts are all zombie 179 taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId) 180 .get.isZombie = true 181 taskScheduler.submitTasks(attempt2) 182 val attempt3 = FakeTask.createTaskSet(1, 2) 183 intercept[IllegalStateException] { taskScheduler.submitTasks(attempt3) } 184 taskScheduler.taskSetManagerForAttempt(attempt2.stageId, attempt2.stageAttemptId) 185 .get.isZombie = true 186 taskScheduler.submitTasks(attempt3) 187 assert(!failedTaskSet) 188 } 189 190 test("don't schedule more tasks after a taskset is zombie") { 191 val taskScheduler = setupScheduler() 192 193 val numFreeCores = 1 194 val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores)) 195 val attempt1 = FakeTask.createTaskSet(10) 196 197 // submit attempt 1, offer some resources, some tasks get scheduled 198 taskScheduler.submitTasks(attempt1) 199 val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten 200 assert(1 === taskDescriptions.length) 201 202 // now mark attempt 1 as a zombie 203 taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId) 204 .get.isZombie = true 205 206 // don't schedule anything on another resource offer 207 val taskDescriptions2 = taskScheduler.resourceOffers(workerOffers).flatten 208 assert(0 === taskDescriptions2.length) 209 210 // if we schedule another attempt for the same stage, it should get scheduled 211 val attempt2 = FakeTask.createTaskSet(10, 1) 212 213 // submit attempt 2, offer some resources, some tasks get scheduled 214 taskScheduler.submitTasks(attempt2) 215 val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten 216 assert(1 === taskDescriptions3.length) 217 val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId).get 218 assert(mgr.taskSet.stageAttemptId === 1) 219 assert(!failedTaskSet) 220 } 221 222 test("if a zombie attempt finishes, continue scheduling tasks for non-zombie attempts") { 223 val taskScheduler = setupScheduler() 224 225 val numFreeCores = 10 226 val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores)) 227 val attempt1 = FakeTask.createTaskSet(10) 228 229 // submit attempt 1, offer some resources, some tasks get scheduled 230 taskScheduler.submitTasks(attempt1) 231 val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten 232 assert(10 === taskDescriptions.length) 233 234 // now mark attempt 1 as a zombie 235 val mgr1 = taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId).get 236 mgr1.isZombie = true 237 238 // don't schedule anything on another resource offer 239 val taskDescriptions2 = taskScheduler.resourceOffers(workerOffers).flatten 240 assert(0 === taskDescriptions2.length) 241 242 // submit attempt 2 243 val attempt2 = FakeTask.createTaskSet(10, 1) 244 taskScheduler.submitTasks(attempt2) 245 246 // attempt 1 finished (this can happen even if it was marked zombie earlier -- all tasks were 247 // already submitted, and then they finish) 248 taskScheduler.taskSetFinished(mgr1) 249 250 // now with another resource offer, we should still schedule all the tasks in attempt2 251 val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten 252 assert(10 === taskDescriptions3.length) 253 254 taskDescriptions3.foreach { task => 255 val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get 256 assert(mgr.taskSet.stageAttemptId === 1) 257 } 258 assert(!failedTaskSet) 259 } 260 261 test("tasks are not re-scheduled while executor loss reason is pending") { 262 val taskScheduler = setupScheduler() 263 264 val e0Offers = IndexedSeq(new WorkerOffer("executor0", "host0", 1)) 265 val e1Offers = IndexedSeq(new WorkerOffer("executor1", "host0", 1)) 266 val attempt1 = FakeTask.createTaskSet(1) 267 268 // submit attempt 1, offer resources, task gets scheduled 269 taskScheduler.submitTasks(attempt1) 270 val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten 271 assert(1 === taskDescriptions.length) 272 273 // mark executor0 as dead but pending fail reason 274 taskScheduler.executorLost("executor0", LossReasonPending) 275 276 // offer some more resources on a different executor, nothing should change 277 val taskDescriptions2 = taskScheduler.resourceOffers(e1Offers).flatten 278 assert(0 === taskDescriptions2.length) 279 280 // provide the actual loss reason for executor0 281 taskScheduler.executorLost("executor0", SlaveLost("oops")) 282 283 // executor0's tasks should have failed now that the loss reason is known, so offering more 284 // resources should make them be scheduled on the new executor. 285 val taskDescriptions3 = taskScheduler.resourceOffers(e1Offers).flatten 286 assert(1 === taskDescriptions3.length) 287 assert("executor1" === taskDescriptions3(0).executorId) 288 assert(!failedTaskSet) 289 } 290 291 test("abort stage if executor loss results in unschedulability from previously failed tasks") { 292 // Make sure we can detect when a taskset becomes unschedulable from a blacklisting. This 293 // test explores a particular corner case -- you may have one task fail, but still be 294 // schedulable on another executor. However, that executor may fail later on, leaving the 295 // first task with no place to run. 296 val taskScheduler = setupScheduler( 297 config.BLACKLIST_ENABLED.key -> "true" 298 ) 299 300 val taskSet = FakeTask.createTaskSet(2) 301 taskScheduler.submitTasks(taskSet) 302 val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get 303 304 val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq( 305 new WorkerOffer("executor0", "host0", 1), 306 new WorkerOffer("executor1", "host1", 1) 307 )).flatten 308 assert(Set("executor0", "executor1") === firstTaskAttempts.map(_.executorId).toSet) 309 310 // fail one of the tasks, but leave the other running 311 val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get 312 taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, TaskResultLost) 313 // at this point, our failed task could run on the other executor, so don't give up the task 314 // set yet. 315 assert(!failedTaskSet) 316 317 // Now we fail our second executor. The other task can still run on executor1, so make an offer 318 // on that executor, and make sure that the other task (not the failed one) is assigned there 319 taskScheduler.executorLost("executor1", SlaveLost("oops")) 320 val nextTaskAttempts = 321 taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))).flatten 322 // Note: Its OK if some future change makes this already realize the taskset has become 323 // unschedulable at this point (though in the current implementation, we're sure it will not) 324 assert(nextTaskAttempts.size === 1) 325 assert(nextTaskAttempts.head.executorId === "executor0") 326 assert(nextTaskAttempts.head.attemptNumber === 1) 327 assert(nextTaskAttempts.head.index != failedTask.index) 328 329 // now we should definitely realize that our task set is unschedulable, because the only 330 // task left can't be scheduled on any executors due to the blacklist 331 taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))) 332 sc.listenerBus.waitUntilEmpty(100000) 333 assert(tsm.isZombie) 334 assert(failedTaskSet) 335 val idx = failedTask.index 336 assert(failedTaskSetReason === s"Aborting TaskSet 0.0 because task $idx (partition $idx) " + 337 s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior can be " + 338 s"configured via spark.blacklist.*.") 339 } 340 341 test("don't abort if there is an executor available, though it hasn't had scheduled tasks yet") { 342 // interaction of SPARK-15865 & SPARK-16106 343 // if we have a small number of tasks, we might be able to schedule them all on the first 344 // executor. But if those tasks fail, we should still realize there is another executor 345 // available and not bail on the job 346 347 val taskScheduler = setupScheduler( 348 config.BLACKLIST_ENABLED.key -> "true" 349 ) 350 351 val taskSet = FakeTask.createTaskSet(2, (0 until 2).map { _ => Seq(TaskLocation("host0")) }: _*) 352 taskScheduler.submitTasks(taskSet) 353 val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get 354 355 val offers = IndexedSeq( 356 // each offer has more than enough free cores for the entire task set, so when combined 357 // with the locality preferences, we schedule all tasks on one executor 358 new WorkerOffer("executor0", "host0", 4), 359 new WorkerOffer("executor1", "host1", 4) 360 ) 361 val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten 362 assert(firstTaskAttempts.size == 2) 363 firstTaskAttempts.foreach { taskAttempt => assert("executor0" === taskAttempt.executorId) } 364 365 // fail all the tasks on the bad executor 366 firstTaskAttempts.foreach { taskAttempt => 367 taskScheduler.handleFailedTask(tsm, taskAttempt.taskId, TaskState.FAILED, TaskResultLost) 368 } 369 370 // Here is the main check of this test -- we have the same offers again, and we schedule it 371 // successfully. Because the scheduler first tries to schedule with locality in mind, at first 372 // it won't schedule anything on executor1. But despite that, we don't abort the job. Then the 373 // scheduler tries for ANY locality, and successfully schedules tasks on executor1. 374 val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten 375 assert(secondTaskAttempts.size == 2) 376 secondTaskAttempts.foreach { taskAttempt => assert("executor1" === taskAttempt.executorId) } 377 assert(!failedTaskSet) 378 } 379 380 test("SPARK-16106 locality levels updated if executor added to existing host") { 381 val taskScheduler = setupScheduler() 382 383 taskScheduler.submitTasks(FakeTask.createTaskSet(2, 0, 384 (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _* 385 )) 386 387 val taskDescs = taskScheduler.resourceOffers(IndexedSeq( 388 new WorkerOffer("executor0", "host0", 1), 389 new WorkerOffer("executor1", "host1", 1) 390 )).flatten 391 // only schedule one task because of locality 392 assert(taskDescs.size === 1) 393 394 val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId).get 395 assert(mgr.myLocalityLevels.toSet === Set(TaskLocality.NODE_LOCAL, TaskLocality.ANY)) 396 // we should know about both executors, even though we only scheduled tasks on one of them 397 assert(taskScheduler.getExecutorsAliveOnHost("host0") === Some(Set("executor0"))) 398 assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1"))) 399 400 // when executor2 is added, we should realize that we can run process-local tasks. 401 // And we should know its alive on the host. 402 val secondTaskDescs = taskScheduler.resourceOffers( 403 IndexedSeq(new WorkerOffer("executor2", "host0", 1))).flatten 404 assert(secondTaskDescs.size === 1) 405 assert(mgr.myLocalityLevels.toSet === 406 Set(TaskLocality.PROCESS_LOCAL, TaskLocality.NODE_LOCAL, TaskLocality.ANY)) 407 assert(taskScheduler.getExecutorsAliveOnHost("host0") === Some(Set("executor0", "executor2"))) 408 assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1"))) 409 410 // And even if we don't have anything left to schedule, another resource offer on yet another 411 // executor should also update the set of live executors 412 val thirdTaskDescs = taskScheduler.resourceOffers( 413 IndexedSeq(new WorkerOffer("executor3", "host1", 1))).flatten 414 assert(thirdTaskDescs.size === 0) 415 assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3"))) 416 } 417 test("if an executor is lost then the state for its running tasks is cleaned up (SPARK-18553)") { 418 sc = new SparkContext("local", "TaskSchedulerImplSuite") 419 val taskScheduler = new TaskSchedulerImpl(sc) 420 taskScheduler.initialize(new FakeSchedulerBackend) 421 // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. 422 new DAGScheduler(sc, taskScheduler) { 423 override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} 424 override def executorAdded(execId: String, host: String) {} 425 } 426 427 val e0Offers = IndexedSeq(WorkerOffer("executor0", "host0", 1)) 428 val attempt1 = FakeTask.createTaskSet(1) 429 430 // submit attempt 1, offer resources, task gets scheduled 431 taskScheduler.submitTasks(attempt1) 432 val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten 433 assert(1 === taskDescriptions.length) 434 435 // mark executor0 as dead 436 taskScheduler.executorLost("executor0", SlaveLost()) 437 assert(!taskScheduler.isExecutorAlive("executor0")) 438 assert(!taskScheduler.hasExecutorsAliveOnHost("host0")) 439 assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty) 440 441 442 // Check that state associated with the lost task attempt is cleaned up: 443 assert(taskScheduler.taskIdToExecutorId.isEmpty) 444 assert(taskScheduler.taskIdToTaskSetManager.isEmpty) 445 assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty) 446 } 447 448 test("if a task finishes with TaskState.LOST its executor is marked as dead") { 449 sc = new SparkContext("local", "TaskSchedulerImplSuite") 450 val taskScheduler = new TaskSchedulerImpl(sc) 451 taskScheduler.initialize(new FakeSchedulerBackend) 452 // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. 453 new DAGScheduler(sc, taskScheduler) { 454 override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} 455 override def executorAdded(execId: String, host: String) {} 456 } 457 458 val e0Offers = IndexedSeq(WorkerOffer("executor0", "host0", 1)) 459 val attempt1 = FakeTask.createTaskSet(1) 460 461 // submit attempt 1, offer resources, task gets scheduled 462 taskScheduler.submitTasks(attempt1) 463 val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten 464 assert(1 === taskDescriptions.length) 465 466 // Report the task as failed with TaskState.LOST 467 taskScheduler.statusUpdate( 468 tid = taskDescriptions.head.taskId, 469 state = TaskState.LOST, 470 serializedData = ByteBuffer.allocate(0) 471 ) 472 473 // Check that state associated with the lost task attempt is cleaned up: 474 assert(taskScheduler.taskIdToExecutorId.isEmpty) 475 assert(taskScheduler.taskIdToTaskSetManager.isEmpty) 476 assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty) 477 478 // Check that the executor has been marked as dead 479 assert(!taskScheduler.isExecutorAlive("executor0")) 480 assert(!taskScheduler.hasExecutorsAliveOnHost("host0")) 481 assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty) 482 } 483} 484