1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.scheduler 19 20import java.util.Random 21 22import scala.collection.mutable 23import scala.collection.mutable.ArrayBuffer 24 25import org.mockito.Mockito.{mock, verify} 26 27import org.apache.spark._ 28import org.apache.spark.internal.config 29import org.apache.spark.internal.Logging 30import org.apache.spark.util.{AccumulatorV2, ManualClock} 31 32class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) 33 extends DAGScheduler(sc) { 34 35 override def taskStarted(task: Task[_], taskInfo: TaskInfo) { 36 taskScheduler.startedTasks += taskInfo.index 37 } 38 39 override def taskEnded( 40 task: Task[_], 41 reason: TaskEndReason, 42 result: Any, 43 accumUpdates: Seq[AccumulatorV2[_, _]], 44 taskInfo: TaskInfo) { 45 taskScheduler.endedTasks(taskInfo.index) = reason 46 } 47 48 override def executorAdded(execId: String, host: String) {} 49 50 override def executorLost(execId: String, reason: ExecutorLossReason) {} 51 52 override def taskSetFailed( 53 taskSet: TaskSet, 54 reason: String, 55 exception: Option[Throwable]): Unit = { 56 taskScheduler.taskSetsFailed += taskSet.id 57 } 58} 59 60// Get the rack for a given host 61object FakeRackUtil { 62 private val hostToRack = new mutable.HashMap[String, String]() 63 64 def cleanUp() { 65 hostToRack.clear() 66 } 67 68 def assignHostToRack(host: String, rack: String) { 69 hostToRack(host) = rack 70 } 71 72 def getRackForHost(host: String): Option[String] = { 73 hostToRack.get(host) 74 } 75} 76 77/** 78 * A mock TaskSchedulerImpl implementation that just remembers information about tasks started and 79 * feedback received from the TaskSetManagers. Note that it's important to initialize this with 80 * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost 81 * to work, and these are required for locality in TaskSetManager. 82 */ 83class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */) 84 extends TaskSchedulerImpl(sc) 85{ 86 val startedTasks = new ArrayBuffer[Long] 87 val endedTasks = new mutable.HashMap[Long, TaskEndReason] 88 val finishedManagers = new ArrayBuffer[TaskSetManager] 89 val taskSetsFailed = new ArrayBuffer[String] 90 91 val executors = new mutable.HashMap[String, String] 92 for ((execId, host) <- liveExecutors) { 93 addExecutor(execId, host) 94 } 95 96 for ((execId, host) <- liveExecutors; rack <- getRackForHost(host)) { 97 hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host 98 } 99 100 dagScheduler = new FakeDAGScheduler(sc, this) 101 102 def removeExecutor(execId: String) { 103 executors -= execId 104 val host = executorIdToHost.get(execId) 105 assert(host != None) 106 val hostId = host.get 107 val executorsOnHost = hostToExecutors(hostId) 108 executorsOnHost -= execId 109 for (rack <- getRackForHost(hostId); hosts <- hostsByRack.get(rack)) { 110 hosts -= hostId 111 if (hosts.isEmpty) { 112 hostsByRack -= rack 113 } 114 } 115 } 116 117 override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager 118 119 override def isExecutorAlive(execId: String): Boolean = executors.contains(execId) 120 121 override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host) 122 123 override def hasHostAliveOnRack(rack: String): Boolean = { 124 hostsByRack.get(rack) != None 125 } 126 127 def addExecutor(execId: String, host: String) { 128 executors.put(execId, host) 129 val executorsOnHost = hostToExecutors.getOrElseUpdate(host, new mutable.HashSet[String]) 130 executorsOnHost += execId 131 executorIdToHost += execId -> host 132 for (rack <- getRackForHost(host)) { 133 hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host 134 } 135 } 136 137 override def getRackForHost(value: String): Option[String] = FakeRackUtil.getRackForHost(value) 138} 139 140/** 141 * A Task implementation that results in a large serialized task. 142 */ 143class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0, 0) { 144 145 val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) 146 val random = new Random(0) 147 random.nextBytes(randomBuffer) 148 149 override def runTask(context: TaskContext): Array[Byte] = randomBuffer 150 override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]() 151} 152 153class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logging { 154 import TaskLocality.{ANY, PROCESS_LOCAL, NO_PREF, NODE_LOCAL, RACK_LOCAL} 155 156 private val conf = new SparkConf 157 158 val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s") 159 val MAX_TASK_FAILURES = 4 160 161 var sched: FakeTaskScheduler = null 162 163 override def beforeEach(): Unit = { 164 super.beforeEach() 165 FakeRackUtil.cleanUp() 166 sched = null 167 } 168 169 override def afterEach(): Unit = { 170 super.afterEach() 171 if (sched != null) { 172 sched.dagScheduler.stop() 173 sched.stop() 174 sched = null 175 } 176 } 177 178 179 test("TaskSet with no preferences") { 180 sc = new SparkContext("local", "test") 181 sched = new FakeTaskScheduler(sc, ("exec1", "host1")) 182 val taskSet = FakeTask.createTaskSet(1) 183 val clock = new ManualClock 184 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 185 val accumUpdates = taskSet.tasks.head.metrics.internalAccums 186 187 // Offer a host with NO_PREF as the constraint, 188 // we should get a nopref task immediately since that's what we only have 189 val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) 190 assert(taskOption.isDefined) 191 192 // Tell it the task has finished 193 manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates)) 194 assert(sched.endedTasks(0) === Success) 195 assert(sched.finishedManagers.contains(manager)) 196 } 197 198 test("multiple offers with no preferences") { 199 sc = new SparkContext("local", "test") 200 sched = new FakeTaskScheduler(sc, ("exec1", "host1")) 201 val taskSet = FakeTask.createTaskSet(3) 202 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) 203 val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => 204 task.metrics.internalAccums 205 } 206 207 // First three offers should all find tasks 208 for (i <- 0 until 3) { 209 val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) 210 assert(taskOption.isDefined) 211 val task = taskOption.get 212 assert(task.executorId === "exec1") 213 } 214 assert(sched.startedTasks.toSet === Set(0, 1, 2)) 215 216 // Re-offer the host -- now we should get no more tasks 217 assert(manager.resourceOffer("exec1", "host1", NO_PREF) === None) 218 219 // Finish the first two tasks 220 manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdatesByTask(0))) 221 manager.handleSuccessfulTask(1, createTaskResult(1, accumUpdatesByTask(1))) 222 assert(sched.endedTasks(0) === Success) 223 assert(sched.endedTasks(1) === Success) 224 assert(!sched.finishedManagers.contains(manager)) 225 226 // Finish the last task 227 manager.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask(2))) 228 assert(sched.endedTasks(2) === Success) 229 assert(sched.finishedManagers.contains(manager)) 230 } 231 232 test("skip unsatisfiable locality levels") { 233 sc = new SparkContext("local", "test") 234 sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2")) 235 val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB"))) 236 val clock = new ManualClock 237 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 238 239 // An executor that is not NODE_LOCAL should be rejected. 240 assert(manager.resourceOffer("execC", "host2", ANY) === None) 241 242 // Because there are no alive PROCESS_LOCAL executors, the base locality level should be 243 // NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before 244 // any of the locality wait timers expire. 245 assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0) 246 } 247 248 test("basic delay scheduling") { 249 sc = new SparkContext("local", "test") 250 sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) 251 val taskSet = FakeTask.createTaskSet(4, 252 Seq(TaskLocation("host1", "exec1")), 253 Seq(TaskLocation("host2", "exec2")), 254 Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")), 255 Seq() // Last task has no locality prefs 256 ) 257 val clock = new ManualClock 258 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 259 // First offer host1, exec1: first task should be chosen 260 assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) 261 assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None) 262 263 clock.advance(LOCALITY_WAIT_MS) 264 // Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 3) should 265 // get chosen before the noPref task 266 assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2) 267 268 // Offer host2, exec2, at NODE_LOCAL level: we should choose task 2 269 assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).get.index == 1) 270 271 // Offer host2, exec2 again, at NODE_LOCAL level: we should get noPref task 272 // after failing to find a node_Local task 273 assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None) 274 clock.advance(LOCALITY_WAIT_MS) 275 assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3) 276 } 277 278 test("we do not need to delay scheduling when we only have noPref tasks in the queue") { 279 sc = new SparkContext("local", "test") 280 sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", "host2")) 281 val taskSet = FakeTask.createTaskSet(3, 282 Seq(TaskLocation("host1", "exec1")), 283 Seq(TaskLocation("host2", "exec3")), 284 Seq() // Last task has no locality prefs 285 ) 286 val clock = new ManualClock 287 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 288 // First offer host1, exec1: first task should be chosen 289 assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0) 290 assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL).get.index === 1) 291 assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL) == None) 292 assert(manager.resourceOffer("exec3", "host2", NO_PREF).get.index === 2) 293 } 294 295 test("delay scheduling with fallback") { 296 sc = new SparkContext("local", "test") 297 sched = new FakeTaskScheduler(sc, 298 ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) 299 val taskSet = FakeTask.createTaskSet(5, 300 Seq(TaskLocation("host1")), 301 Seq(TaskLocation("host2")), 302 Seq(TaskLocation("host2")), 303 Seq(TaskLocation("host3")), 304 Seq(TaskLocation("host2")) 305 ) 306 val clock = new ManualClock 307 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 308 309 // First offer host1: first task should be chosen 310 assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) 311 312 // Offer host1 again: nothing should get chosen 313 assert(manager.resourceOffer("exec1", "host1", ANY) === None) 314 315 clock.advance(LOCALITY_WAIT_MS) 316 317 // Offer host1 again: second task (on host2) should get chosen 318 assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) 319 320 // Offer host1 again: third task (on host2) should get chosen 321 assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2) 322 323 // Offer host2: fifth task (also on host2) should get chosen 324 assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 4) 325 326 // Now that we've launched a local task, we should no longer launch the task for host3 327 assert(manager.resourceOffer("exec2", "host2", ANY) === None) 328 329 clock.advance(LOCALITY_WAIT_MS) 330 331 // After another delay, we can go ahead and launch that task non-locally 332 assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3) 333 } 334 335 test("delay scheduling with failed hosts") { 336 sc = new SparkContext("local", "test") 337 sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), 338 ("exec3", "host3")) 339 val taskSet = FakeTask.createTaskSet(3, 340 Seq(TaskLocation("host1")), 341 Seq(TaskLocation("host2")), 342 Seq(TaskLocation("host3")) 343 ) 344 val clock = new ManualClock 345 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 346 347 // First offer host1: first task should be chosen 348 assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) 349 350 // After this, nothing should get chosen, because we have separated tasks with unavailable 351 // preference from the noPrefPendingTasks 352 assert(manager.resourceOffer("exec1", "host1", ANY) === None) 353 354 // Now mark host2 as dead 355 sched.removeExecutor("exec2") 356 manager.executorLost("exec2", "host2", SlaveLost()) 357 358 // nothing should be chosen 359 assert(manager.resourceOffer("exec1", "host1", ANY) === None) 360 361 clock.advance(LOCALITY_WAIT_MS * 2) 362 363 // task 1 and 2 would be scheduled as nonLocal task 364 assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) 365 assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2) 366 367 // all finished 368 assert(manager.resourceOffer("exec1", "host1", ANY) === None) 369 assert(manager.resourceOffer("exec2", "host2", ANY) === None) 370 } 371 372 test("task result lost") { 373 sc = new SparkContext("local", "test") 374 sched = new FakeTaskScheduler(sc, ("exec1", "host1")) 375 val taskSet = FakeTask.createTaskSet(1) 376 val clock = new ManualClock 377 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 378 379 assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) 380 381 // Tell it the task has finished but the result was lost. 382 manager.handleFailedTask(0, TaskState.FINISHED, TaskResultLost) 383 assert(sched.endedTasks(0) === TaskResultLost) 384 385 // Re-offer the host -- now we should get task 0 again. 386 assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) 387 } 388 389 test("repeated failures lead to task set abortion") { 390 sc = new SparkContext("local", "test") 391 sched = new FakeTaskScheduler(sc, ("exec1", "host1")) 392 val taskSet = FakeTask.createTaskSet(1) 393 val clock = new ManualClock 394 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 395 396 // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted 397 // after the last failure. 398 (1 to manager.maxTaskFailures).foreach { index => 399 val offerResult = manager.resourceOffer("exec1", "host1", ANY) 400 assert(offerResult.isDefined, 401 "Expect resource offer on iteration %s to return a task".format(index)) 402 assert(offerResult.get.index === 0) 403 manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) 404 if (index < MAX_TASK_FAILURES) { 405 assert(!sched.taskSetsFailed.contains(taskSet.id)) 406 } else { 407 assert(sched.taskSetsFailed.contains(taskSet.id)) 408 } 409 } 410 } 411 412 test("executors should be blacklisted after task failure, in spite of locality preferences") { 413 val rescheduleDelay = 300L 414 val conf = new SparkConf(). 415 set(config.BLACKLIST_ENABLED, true). 416 set(config.BLACKLIST_TIMEOUT_CONF, rescheduleDelay). 417 // don't wait to jump locality levels in this test 418 set("spark.locality.wait", "0") 419 420 sc = new SparkContext("local", "test", conf) 421 // two executors on same host, one on different. 422 sched = new FakeTaskScheduler(sc, ("exec1", "host1"), 423 ("exec1.1", "host1"), ("exec2", "host2")) 424 // affinity to exec1 on host1 - which we will fail. 425 val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1"))) 426 val clock = new ManualClock 427 val manager = new TaskSetManager(sched, taskSet, 4, clock) 428 429 { 430 val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) 431 assert(offerResult.isDefined, "Expect resource offer to return a task") 432 433 assert(offerResult.get.index === 0) 434 assert(offerResult.get.executorId === "exec1") 435 436 // Cause exec1 to fail : failure 1 437 manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) 438 assert(!sched.taskSetsFailed.contains(taskSet.id)) 439 440 // Ensure scheduling on exec1 fails after failure 1 due to blacklist 441 assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty) 442 assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).isEmpty) 443 assert(manager.resourceOffer("exec1", "host1", RACK_LOCAL).isEmpty) 444 assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) 445 } 446 447 // Run the task on exec1.1 - should work, and then fail it on exec1.1 448 { 449 val offerResult = manager.resourceOffer("exec1.1", "host1", NODE_LOCAL) 450 assert(offerResult.isDefined, 451 "Expect resource offer to return a task for exec1.1, offerResult = " + offerResult) 452 453 assert(offerResult.get.index === 0) 454 assert(offerResult.get.executorId === "exec1.1") 455 456 // Cause exec1.1 to fail : failure 2 457 manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) 458 assert(!sched.taskSetsFailed.contains(taskSet.id)) 459 460 // Ensure scheduling on exec1.1 fails after failure 2 due to blacklist 461 assert(manager.resourceOffer("exec1.1", "host1", NODE_LOCAL).isEmpty) 462 } 463 464 // Run the task on exec2 - should work, and then fail it on exec2 465 { 466 val offerResult = manager.resourceOffer("exec2", "host2", ANY) 467 assert(offerResult.isDefined, "Expect resource offer to return a task") 468 469 assert(offerResult.get.index === 0) 470 assert(offerResult.get.executorId === "exec2") 471 472 // Cause exec2 to fail : failure 3 473 manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) 474 assert(!sched.taskSetsFailed.contains(taskSet.id)) 475 476 // Ensure scheduling on exec2 fails after failure 3 due to blacklist 477 assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) 478 } 479 480 // Despite advancing beyond the time for expiring executors from within the blacklist, 481 // we *never* expire from *within* the stage blacklist 482 clock.advance(rescheduleDelay) 483 484 { 485 val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) 486 assert(offerResult.isEmpty) 487 } 488 489 { 490 val offerResult = manager.resourceOffer("exec3", "host3", ANY) 491 assert(offerResult.isDefined) 492 assert(offerResult.get.index === 0) 493 assert(offerResult.get.executorId === "exec3") 494 495 assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty) 496 497 // Cause exec3 to fail : failure 4 498 manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) 499 } 500 501 // we have failed the same task 4 times now : task id should now be in taskSetsFailed 502 assert(sched.taskSetsFailed.contains(taskSet.id)) 503 } 504 505 test("new executors get added and lost") { 506 // Assign host2 to rack2 507 FakeRackUtil.assignHostToRack("host2", "rack2") 508 sc = new SparkContext("local", "test") 509 sched = new FakeTaskScheduler(sc) 510 val taskSet = FakeTask.createTaskSet(4, 511 Seq(TaskLocation("host1", "execA")), 512 Seq(TaskLocation("host1", "execB")), 513 Seq(TaskLocation("host2", "execC")), 514 Seq()) 515 val clock = new ManualClock 516 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 517 // Only ANY is valid 518 assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY))) 519 // Add a new executor 520 sched.addExecutor("execD", "host1") 521 manager.executorAdded() 522 // Valid locality should contain NODE_LOCAL and ANY 523 assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY))) 524 // Add another executor 525 sched.addExecutor("execC", "host2") 526 manager.executorAdded() 527 // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and ANY 528 assert(manager.myLocalityLevels.sameElements( 529 Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY))) 530 // test if the valid locality is recomputed when the executor is lost 531 sched.removeExecutor("execC") 532 manager.executorLost("execC", "host2", SlaveLost()) 533 assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY))) 534 sched.removeExecutor("execD") 535 manager.executorLost("execD", "host1", SlaveLost()) 536 assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY))) 537 } 538 539 test("Executors exit for reason unrelated to currently running tasks") { 540 sc = new SparkContext("local", "test") 541 sched = new FakeTaskScheduler(sc) 542 val taskSet = FakeTask.createTaskSet(4, 543 Seq(TaskLocation("host1", "execA")), 544 Seq(TaskLocation("host1", "execB")), 545 Seq(TaskLocation("host2", "execC")), 546 Seq()) 547 val manager = new TaskSetManager(sched, taskSet, 1, new ManualClock) 548 sched.addExecutor("execA", "host1") 549 manager.executorAdded() 550 sched.addExecutor("execC", "host2") 551 manager.executorAdded() 552 assert(manager.resourceOffer("exec1", "host1", ANY).isDefined) 553 sched.removeExecutor("execA") 554 manager.executorLost( 555 "execA", 556 "host1", 557 ExecutorExited(143, false, "Terminated for reason unrelated to running tasks")) 558 assert(!sched.taskSetsFailed.contains(taskSet.id)) 559 assert(manager.resourceOffer("execC", "host2", ANY).isDefined) 560 sched.removeExecutor("execC") 561 manager.executorLost( 562 "execC", "host2", ExecutorExited(1, true, "Terminated due to issue with running tasks")) 563 assert(sched.taskSetsFailed.contains(taskSet.id)) 564 } 565 566 test("test RACK_LOCAL tasks") { 567 // Assign host1 to rack1 568 FakeRackUtil.assignHostToRack("host1", "rack1") 569 // Assign host2 to rack1 570 FakeRackUtil.assignHostToRack("host2", "rack1") 571 // Assign host3 to rack2 572 FakeRackUtil.assignHostToRack("host3", "rack2") 573 sc = new SparkContext("local", "test") 574 sched = new FakeTaskScheduler(sc, 575 ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) 576 val taskSet = FakeTask.createTaskSet(2, 577 Seq(TaskLocation("host1", "execA")), 578 Seq(TaskLocation("host1", "execA"))) 579 val clock = new ManualClock 580 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 581 582 assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY))) 583 // Set allowed locality to ANY 584 clock.advance(LOCALITY_WAIT_MS * 3) 585 // Offer host3 586 // No task is scheduled if we restrict locality to RACK_LOCAL 587 assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None) 588 // Task 0 can be scheduled with ANY 589 assert(manager.resourceOffer("execC", "host3", ANY).get.index === 0) 590 // Offer host2 591 // Task 1 can be scheduled with RACK_LOCAL 592 assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1) 593 } 594 595 test("do not emit warning when serialized task is small") { 596 sc = new SparkContext("local", "test") 597 sched = new FakeTaskScheduler(sc, ("exec1", "host1")) 598 val taskSet = FakeTask.createTaskSet(1) 599 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) 600 601 assert(!manager.emittedTaskSizeWarning) 602 603 assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) 604 605 assert(!manager.emittedTaskSizeWarning) 606 } 607 608 test("emit warning when serialized task is large") { 609 sc = new SparkContext("local", "test") 610 sched = new FakeTaskScheduler(sc, ("exec1", "host1")) 611 612 val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null) 613 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) 614 615 assert(!manager.emittedTaskSizeWarning) 616 617 assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) 618 619 assert(manager.emittedTaskSizeWarning) 620 } 621 622 test("Not serializable exception thrown if the task cannot be serialized") { 623 sc = new SparkContext("local", "test") 624 sched = new FakeTaskScheduler(sc, ("exec1", "host1")) 625 626 val taskSet = new TaskSet( 627 Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) 628 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) 629 630 intercept[TaskNotSerializableException] { 631 manager.resourceOffer("exec1", "host1", ANY) 632 } 633 assert(manager.isZombie) 634 } 635 636 test("abort the job if total size of results is too large") { 637 val conf = new SparkConf().set("spark.driver.maxResultSize", "2m") 638 sc = new SparkContext("local", "test", conf) 639 640 def genBytes(size: Int): (Int) => Array[Byte] = { (x: Int) => 641 val bytes = Array.ofDim[Byte](size) 642 scala.util.Random.nextBytes(bytes) 643 bytes 644 } 645 646 // multiple 1k result 647 val r = sc.makeRDD(0 until 10, 10).map(genBytes(1024)).collect() 648 assert(10 === r.size) 649 650 // single 10M result 651 val thrown = intercept[SparkException] {sc.makeRDD(genBytes(10 << 20)(0), 1).collect()} 652 assert(thrown.getMessage().contains("bigger than spark.driver.maxResultSize")) 653 654 // multiple 1M results 655 val thrown2 = intercept[SparkException] { 656 sc.makeRDD(0 until 10, 10).map(genBytes(1 << 20)).collect() 657 } 658 assert(thrown2.getMessage().contains("bigger than spark.driver.maxResultSize")) 659 } 660 661 test("speculative and noPref task should be scheduled after node-local") { 662 sc = new SparkContext("local", "test") 663 sched = new FakeTaskScheduler( 664 sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) 665 val taskSet = FakeTask.createTaskSet(4, 666 Seq(TaskLocation("host1", "execA")), 667 Seq(TaskLocation("host2"), TaskLocation("host1")), 668 Seq(), 669 Seq(TaskLocation("host3", "execC"))) 670 val clock = new ManualClock 671 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 672 673 assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0) 674 assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) 675 assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1) 676 677 manager.speculatableTasks += 1 678 clock.advance(LOCALITY_WAIT_MS) 679 // schedule the nonPref task 680 assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2) 681 // schedule the speculative task 682 assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1) 683 clock.advance(LOCALITY_WAIT_MS * 3) 684 // schedule non-local tasks 685 assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3) 686 } 687 688 test("node-local tasks should be scheduled right away " + 689 "when there are only node-local and no-preference tasks") { 690 sc = new SparkContext("local", "test") 691 sched = new FakeTaskScheduler( 692 sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) 693 val taskSet = FakeTask.createTaskSet(4, 694 Seq(TaskLocation("host1")), 695 Seq(TaskLocation("host2")), 696 Seq(), 697 Seq(TaskLocation("host3"))) 698 val clock = new ManualClock 699 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 700 701 // node-local tasks are scheduled without delay 702 assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0) 703 assert(manager.resourceOffer("execA", "host2", NODE_LOCAL).get.index === 1) 704 assert(manager.resourceOffer("execA", "host3", NODE_LOCAL).get.index === 3) 705 assert(manager.resourceOffer("execA", "host3", NODE_LOCAL) === None) 706 707 // schedule no-preference after node local ones 708 assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2) 709 } 710 711 test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished") 712 { 713 sc = new SparkContext("local", "test") 714 sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) 715 val taskSet = FakeTask.createTaskSet(4, 716 Seq(TaskLocation("host1")), 717 Seq(TaskLocation("host2")), 718 Seq(ExecutorCacheTaskLocation("host1", "execA")), 719 Seq(ExecutorCacheTaskLocation("host2", "execB"))) 720 val clock = new ManualClock 721 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 722 723 // process-local tasks are scheduled first 724 assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 2) 725 assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 3) 726 // node-local tasks are scheduled without delay 727 assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0) 728 assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 1) 729 assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) 730 assert(manager.resourceOffer("execB", "host2", NODE_LOCAL) == None) 731 } 732 733 test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") { 734 sc = new SparkContext("local", "test") 735 sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) 736 val taskSet = FakeTask.createTaskSet(3, 737 Seq(), 738 Seq(ExecutorCacheTaskLocation("host1", "execA")), 739 Seq(ExecutorCacheTaskLocation("host2", "execB"))) 740 val clock = new ManualClock 741 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 742 743 // process-local tasks are scheduled first 744 assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 1) 745 assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 2) 746 // no-pref tasks are scheduled without delay 747 assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) == None) 748 assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) 749 assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 0) 750 assert(manager.resourceOffer("execA", "host1", ANY) == None) 751 } 752 753 test("Ensure TaskSetManager is usable after addition of levels") { 754 // Regression test for SPARK-2931 755 sc = new SparkContext("local", "test") 756 sched = new FakeTaskScheduler(sc) 757 val taskSet = FakeTask.createTaskSet(2, 758 Seq(TaskLocation("host1", "execA")), 759 Seq(TaskLocation("host2", "execB.1"))) 760 val clock = new ManualClock 761 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 762 // Only ANY is valid 763 assert(manager.myLocalityLevels.sameElements(Array(ANY))) 764 // Add a new executor 765 sched.addExecutor("execA", "host1") 766 sched.addExecutor("execB.2", "host2") 767 manager.executorAdded() 768 assert(manager.pendingTasksWithNoPrefs.size === 0) 769 // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY 770 assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) 771 assert(manager.resourceOffer("execA", "host1", ANY) !== None) 772 clock.advance(LOCALITY_WAIT_MS * 4) 773 assert(manager.resourceOffer("execB.2", "host2", ANY) !== None) 774 sched.removeExecutor("execA") 775 sched.removeExecutor("execB.2") 776 manager.executorLost("execA", "host1", SlaveLost()) 777 manager.executorLost("execB.2", "host2", SlaveLost()) 778 clock.advance(LOCALITY_WAIT_MS * 4) 779 sched.addExecutor("execC", "host3") 780 manager.executorAdded() 781 // Prior to the fix, this line resulted in an ArrayIndexOutOfBoundsException: 782 assert(manager.resourceOffer("execC", "host3", ANY) !== None) 783 } 784 785 test("Test that locations with HDFSCacheTaskLocation are treated as PROCESS_LOCAL.") { 786 // Regression test for SPARK-2931 787 sc = new SparkContext("local", "test") 788 sched = new FakeTaskScheduler(sc, 789 ("execA", "host1"), ("execB", "host2"), ("execC", "host3")) 790 val taskSet = FakeTask.createTaskSet(3, 791 Seq(TaskLocation("host1")), 792 Seq(TaskLocation("host2")), 793 Seq(TaskLocation("hdfs_cache_host3"))) 794 val clock = new ManualClock 795 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 796 assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) 797 sched.removeExecutor("execA") 798 manager.executorAdded() 799 assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) 800 sched.removeExecutor("execB") 801 manager.executorAdded() 802 assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) 803 sched.removeExecutor("execC") 804 manager.executorAdded() 805 assert(manager.myLocalityLevels.sameElements(Array(ANY))) 806 } 807 808 test("Test TaskLocation for different host type.") { 809 assert(TaskLocation("host1") === HostTaskLocation("host1")) 810 assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1")) 811 assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3")) 812 assert(TaskLocation("executor_some.host1_executor_task_3") === 813 ExecutorCacheTaskLocation("some.host1", "executor_task_3")) 814 } 815 816 test("Kill other task attempts when one attempt belonging to the same task succeeds") { 817 sc = new SparkContext("local", "test") 818 sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) 819 val taskSet = FakeTask.createTaskSet(4) 820 // Set the speculation multiplier to be 0 so speculative tasks are launched immediately 821 sc.conf.set("spark.speculation.multiplier", "0.0") 822 val clock = new ManualClock() 823 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 824 val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => 825 task.metrics.internalAccums 826 } 827 // Offer resources for 4 tasks to start 828 for ((k, v) <- List( 829 "exec1" -> "host1", 830 "exec1" -> "host1", 831 "exec2" -> "host2", 832 "exec2" -> "host2")) { 833 val taskOption = manager.resourceOffer(k, v, NO_PREF) 834 assert(taskOption.isDefined) 835 val task = taskOption.get 836 assert(task.executorId === k) 837 } 838 assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) 839 // Complete the 3 tasks and leave 1 task in running 840 for (id <- Set(0, 1, 2)) { 841 manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) 842 assert(sched.endedTasks(id) === Success) 843 } 844 845 // checkSpeculatableTasks checks that the task runtime is greater than the threshold for 846 // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for 847 // > 0ms, so advance the clock by 1ms here. 848 clock.advance(1) 849 assert(manager.checkSpeculatableTasks(0)) 850 // Offer resource to start the speculative attempt for the running task 851 val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) 852 assert(taskOption5.isDefined) 853 val task5 = taskOption5.get 854 assert(task5.index === 3) 855 assert(task5.taskId === 4) 856 assert(task5.executorId === "exec1") 857 assert(task5.attemptNumber === 1) 858 sched.backend = mock(classOf[SchedulerBackend]) 859 // Complete the speculative attempt for the running task 860 manager.handleSuccessfulTask(4, createTaskResult(3, accumUpdatesByTask(3))) 861 // Verify that it kills other running attempt 862 verify(sched.backend).killTask(3, "exec2", true) 863 // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be 864 // killed, so the FakeTaskScheduler is only told about the successful completion 865 // of the speculated task. 866 assert(sched.endedTasks(3) === Success) 867 } 868 869 test("Killing speculative tasks does not count towards aborting the taskset") { 870 sc = new SparkContext("local", "test") 871 sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) 872 val taskSet = FakeTask.createTaskSet(5) 873 // Set the speculation multiplier to be 0 so speculative tasks are launched immediately 874 sc.conf.set("spark.speculation.multiplier", "0.0") 875 sc.conf.set("spark.speculation.quantile", "0.6") 876 val clock = new ManualClock() 877 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) 878 val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => 879 task.metrics.internalAccums 880 } 881 // Offer resources for 5 tasks to start 882 val tasks = new ArrayBuffer[TaskDescription]() 883 for ((k, v) <- List( 884 "exec1" -> "host1", 885 "exec1" -> "host1", 886 "exec1" -> "host1", 887 "exec2" -> "host2", 888 "exec2" -> "host2")) { 889 val taskOption = manager.resourceOffer(k, v, NO_PREF) 890 assert(taskOption.isDefined) 891 val task = taskOption.get 892 assert(task.executorId === k) 893 tasks += task 894 } 895 assert(sched.startedTasks.toSet === (0 until 5).toSet) 896 // Complete 3 tasks and leave 2 tasks in running 897 for (id <- Set(0, 1, 2)) { 898 manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) 899 assert(sched.endedTasks(id) === Success) 900 } 901 902 def runningTaskForIndex(index: Int): TaskDescription = { 903 tasks.find { task => 904 task.index == index && !sched.endedTasks.contains(task.taskId) 905 }.getOrElse { 906 throw new RuntimeException(s"couldn't find index $index in " + 907 s"tasks: ${tasks.map { t => t.index -> t.taskId }} with endedTasks:" + 908 s" ${sched.endedTasks.keys}") 909 } 910 } 911 912 // have each of the running tasks fail 3 times (not enough to abort the stage) 913 (0 until 3).foreach { attempt => 914 Seq(3, 4).foreach { index => 915 val task = runningTaskForIndex(index) 916 logInfo(s"failing task $task") 917 val endReason = ExceptionFailure("a", "b", Array(), "c", None) 918 manager.handleFailedTask(task.taskId, TaskState.FAILED, endReason) 919 sched.endedTasks(task.taskId) = endReason 920 assert(!manager.isZombie) 921 val nextTask = manager.resourceOffer(s"exec2", s"host2", NO_PREF) 922 assert(nextTask.isDefined, s"no offer for attempt $attempt of $index") 923 tasks += nextTask.get 924 } 925 } 926 927 // we can't be sure which one of our running tasks will get another speculative copy 928 val originalTasks = Seq(3, 4).map { index => index -> runningTaskForIndex(index) }.toMap 929 930 // checkSpeculatableTasks checks that the task runtime is greater than the threshold for 931 // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for 932 // > 0ms, so advance the clock by 1ms here. 933 clock.advance(1) 934 assert(manager.checkSpeculatableTasks(0)) 935 // Offer resource to start the speculative attempt for the running task 936 val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) 937 assert(taskOption5.isDefined) 938 val speculativeTask = taskOption5.get 939 assert(speculativeTask.index === 3 || speculativeTask.index === 4) 940 assert(speculativeTask.taskId === 11) 941 assert(speculativeTask.executorId === "exec1") 942 assert(speculativeTask.attemptNumber === 4) 943 sched.backend = mock(classOf[SchedulerBackend]) 944 // Complete the speculative attempt for the running task 945 manager.handleSuccessfulTask(speculativeTask.taskId, createTaskResult(3, accumUpdatesByTask(3))) 946 // Verify that it kills other running attempt 947 val origTask = originalTasks(speculativeTask.index) 948 verify(sched.backend).killTask(origTask.taskId, "exec2", true) 949 // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be 950 // killed, so the FakeTaskScheduler is only told about the successful completion 951 // of the speculated task. 952 assert(sched.endedTasks(3) === Success) 953 // also because the scheduler is a mock, our manager isn't notified about the task killed event, 954 // so we do that manually 955 manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled) 956 // this task has "failed" 4 times, but one of them doesn't count, so keep running the stage 957 assert(manager.tasksSuccessful === 4) 958 assert(!manager.isZombie) 959 960 // now run another speculative task 961 val taskOpt6 = manager.resourceOffer("exec1", "host1", NO_PREF) 962 assert(taskOpt6.isDefined) 963 val speculativeTask2 = taskOpt6.get 964 assert(speculativeTask2.index === 3 || speculativeTask2.index === 4) 965 assert(speculativeTask2.index !== speculativeTask.index) 966 assert(speculativeTask2.attemptNumber === 4) 967 // Complete the speculative attempt for the running task 968 manager.handleSuccessfulTask(speculativeTask2.taskId, 969 createTaskResult(3, accumUpdatesByTask(3))) 970 // Verify that it kills other running attempt 971 val origTask2 = originalTasks(speculativeTask2.index) 972 verify(sched.backend).killTask(origTask2.taskId, "exec2", true) 973 assert(manager.tasksSuccessful === 5) 974 assert(manager.isZombie) 975 } 976 977 test("SPARK-17894: Verify TaskSetManagers for different stage attempts have unique names") { 978 sc = new SparkContext("local", "test") 979 sched = new FakeTaskScheduler(sc, ("exec1", "host1")) 980 val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0) 981 val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, new ManualClock) 982 assert(manager.name === "TaskSet_0.0") 983 984 // Make sure a task set with the same stage ID but different attempt ID has a unique name 985 val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 1) 986 val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, new ManualClock) 987 assert(manager2.name === "TaskSet_0.1") 988 989 // Make sure a task set with the same attempt ID but different stage ID also has a unique name 990 val taskSet3 = FakeTask.createTaskSet(numTasks = 1, stageId = 1, stageAttemptId = 1) 991 val manager3 = new TaskSetManager(sched, taskSet3, MAX_TASK_FAILURES, new ManualClock) 992 assert(manager3.name === "TaskSet_1.1") 993 } 994 995 private def createTaskResult( 996 id: Int, 997 accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { 998 val valueSer = SparkEnv.get.serializer.newInstance() 999 new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates) 1000 } 1001} 1002