1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.v2.app; 20 21 import java.util.Collection; 22 import java.util.Collections; 23 import java.util.HashMap; 24 import java.util.LinkedList; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.Set; 28 import java.util.concurrent.ConcurrentHashMap; 29 import java.util.concurrent.atomic.AtomicInteger; 30 import java.util.concurrent.atomic.AtomicLong; 31 32 import org.apache.commons.logging.Log; 33 import org.apache.commons.logging.LogFactory; 34 import org.apache.hadoop.conf.Configuration; 35 import org.apache.hadoop.fs.Path; 36 import org.apache.hadoop.mapred.TaskCompletionEvent; 37 import org.apache.hadoop.mapreduce.Counters; 38 import org.apache.hadoop.mapreduce.JobACL; 39 import org.apache.hadoop.mapreduce.MRJobConfig; 40 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; 41 import org.apache.hadoop.mapreduce.v2.api.records.JobId; 42 import org.apache.hadoop.mapreduce.v2.api.records.JobReport; 43 import org.apache.hadoop.mapreduce.v2.api.records.JobState; 44 import org.apache.hadoop.mapreduce.v2.api.records.Phase; 45 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; 46 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; 47 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; 48 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; 49 import org.apache.hadoop.mapreduce.v2.api.records.TaskId; 50 import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; 51 import org.apache.hadoop.mapreduce.v2.api.records.TaskState; 52 import org.apache.hadoop.mapreduce.v2.api.records.TaskType; 53 import org.apache.hadoop.mapreduce.v2.app.job.Job; 54 import org.apache.hadoop.mapreduce.v2.app.job.Task; 55 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; 56 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; 57 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; 58 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; 59 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; 60 import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator; 61 import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator; 62 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; 63 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; 64 import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator; 65 import org.apache.hadoop.security.UserGroupInformation; 66 import org.apache.hadoop.security.authorize.AccessControlList; 67 import org.apache.hadoop.service.CompositeService; 68 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 69 import org.apache.hadoop.yarn.api.records.ApplicationId; 70 import org.apache.hadoop.yarn.api.records.ContainerId; 71 import org.apache.hadoop.yarn.api.records.NodeId; 72 import org.apache.hadoop.yarn.api.records.Token; 73 import org.apache.hadoop.yarn.event.AsyncDispatcher; 74 import org.apache.hadoop.yarn.event.EventHandler; 75 import org.apache.hadoop.yarn.factories.RecordFactory; 76 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; 77 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; 78 import org.apache.hadoop.yarn.util.Clock; 79 import org.apache.hadoop.yarn.util.SystemClock; 80 import org.junit.Assert; 81 import org.junit.Test; 82 83 @SuppressWarnings({"unchecked", "rawtypes"}) 84 public class TestRuntimeEstimators { 85 86 private static int INITIAL_NUMBER_FREE_SLOTS = 600; 87 private static int MAP_SLOT_REQUIREMENT = 3; 88 // this has to be at least as much as map slot requirement 89 private static int REDUCE_SLOT_REQUIREMENT = 4; 90 private static int MAP_TASKS = 200; 91 private static int REDUCE_TASKS = 150; 92 93 MockClock clock; 94 95 Job myJob; 96 97 AppContext myAppContext; 98 99 private static final Log LOG = LogFactory.getLog(TestRuntimeEstimators.class); 100 101 private final AtomicInteger slotsInUse = new AtomicInteger(0); 102 103 AsyncDispatcher dispatcher; 104 105 DefaultSpeculator speculator; 106 107 TaskRuntimeEstimator estimator; 108 109 // This is a huge kluge. The real implementations have a decent approach 110 private final AtomicInteger completedMaps = new AtomicInteger(0); 111 private final AtomicInteger completedReduces = new AtomicInteger(0); 112 113 private final AtomicInteger successfulSpeculations 114 = new AtomicInteger(0); 115 private final AtomicLong taskTimeSavedBySpeculation 116 = new AtomicLong(0L); 117 118 private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); 119 coreTestEstimator(TaskRuntimeEstimator testedEstimator, int expectedSpeculations)120 private void coreTestEstimator 121 (TaskRuntimeEstimator testedEstimator, int expectedSpeculations) { 122 estimator = testedEstimator; 123 clock = new MockClock(); 124 dispatcher = new AsyncDispatcher(); 125 myJob = null; 126 slotsInUse.set(0); 127 completedMaps.set(0); 128 completedReduces.set(0); 129 successfulSpeculations.set(0); 130 taskTimeSavedBySpeculation.set(0); 131 132 clock.advanceTime(1000); 133 134 Configuration conf = new Configuration(); 135 136 myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS); 137 myJob = myAppContext.getAllJobs().values().iterator().next(); 138 139 estimator.contextualize(conf, myAppContext); 140 141 conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE, 500L); 142 conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE, 5000L); 143 conf.setDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS, 0.1); 144 conf.setDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS, 0.001); 145 conf.setInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS, 5); 146 speculator = new DefaultSpeculator(conf, myAppContext, estimator, clock); 147 Assert.assertEquals("wrong SPECULATIVE_RETRY_AFTER_NO_SPECULATE value", 148 500L, speculator.getSoonestRetryAfterNoSpeculate()); 149 Assert.assertEquals("wrong SPECULATIVE_RETRY_AFTER_SPECULATE value", 150 5000L, speculator.getSoonestRetryAfterSpeculate()); 151 Assert.assertEquals(speculator.getProportionRunningTasksSpeculatable(), 152 0.1, 0.00001); 153 Assert.assertEquals(speculator.getProportionTotalTasksSpeculatable(), 154 0.001, 0.00001); 155 Assert.assertEquals("wrong SPECULATIVE_MINIMUM_ALLOWED_TASKS value", 156 5, speculator.getMinimumAllowedSpeculativeTasks()); 157 158 dispatcher.register(Speculator.EventType.class, speculator); 159 160 dispatcher.register(TaskEventType.class, new SpeculationRequestEventHandler()); 161 162 dispatcher.init(conf); 163 dispatcher.start(); 164 165 166 167 speculator.init(conf); 168 speculator.start(); 169 170 // Now that the plumbing is hooked up, we do the following: 171 // do until all tasks are finished, ... 172 // 1: If we have spare capacity, assign as many map tasks as we can, then 173 // assign as many reduce tasks as we can. Note that an odd reduce 174 // task might be started while there are still map tasks, because 175 // map tasks take 3 slots and reduce tasks 2 slots. 176 // 2: Send a speculation event for every task attempt that's running 177 // note that new attempts might get started by the speculator 178 179 // discover undone tasks 180 int undoneMaps = MAP_TASKS; 181 int undoneReduces = REDUCE_TASKS; 182 183 // build a task sequence where all the maps precede any of the reduces 184 List<Task> allTasksSequence = new LinkedList<Task>(); 185 186 allTasksSequence.addAll(myJob.getTasks(TaskType.MAP).values()); 187 allTasksSequence.addAll(myJob.getTasks(TaskType.REDUCE).values()); 188 189 while (undoneMaps + undoneReduces > 0) { 190 undoneMaps = 0; undoneReduces = 0; 191 // start all attempts which are new but for which there is enough slots 192 for (Task task : allTasksSequence) { 193 if (!task.isFinished()) { 194 if (task.getType() == TaskType.MAP) { 195 ++undoneMaps; 196 } else { 197 ++undoneReduces; 198 } 199 } 200 for (TaskAttempt attempt : task.getAttempts().values()) { 201 if (attempt.getState() == TaskAttemptState.NEW 202 && INITIAL_NUMBER_FREE_SLOTS - slotsInUse.get() 203 >= taskTypeSlots(task.getType())) { 204 MyTaskAttemptImpl attemptImpl = (MyTaskAttemptImpl)attempt; 205 SpeculatorEvent event 206 = new SpeculatorEvent(attempt.getID(), false, clock.getTime()); 207 speculator.handle(event); 208 attemptImpl.startUp(); 209 } else { 210 // If a task attempt is in progress we should send the news to 211 // the Speculator. 212 TaskAttemptStatus status = new TaskAttemptStatus(); 213 status.id = attempt.getID(); 214 status.progress = attempt.getProgress(); 215 status.stateString = attempt.getState().name(); 216 status.taskState = attempt.getState(); 217 SpeculatorEvent event = new SpeculatorEvent(status, clock.getTime()); 218 speculator.handle(event); 219 } 220 } 221 } 222 223 long startTime = System.currentTimeMillis(); 224 225 // drain the speculator event queue 226 while (!speculator.eventQueueEmpty()) { 227 Thread.yield(); 228 if (System.currentTimeMillis() > startTime + 130000) { 229 return; 230 } 231 } 232 233 clock.advanceTime(1000L); 234 235 if (clock.getTime() % 10000L == 0L) { 236 speculator.scanForSpeculations(); 237 } 238 } 239 240 Assert.assertEquals("We got the wrong number of successful speculations.", 241 expectedSpeculations, successfulSpeculations.get()); 242 } 243 244 @Test testLegacyEstimator()245 public void testLegacyEstimator() throws Exception { 246 TaskRuntimeEstimator specificEstimator = new LegacyTaskRuntimeEstimator(); 247 coreTestEstimator(specificEstimator, 3); 248 } 249 250 @Test testExponentialEstimator()251 public void testExponentialEstimator() throws Exception { 252 TaskRuntimeEstimator specificEstimator 253 = new ExponentiallySmoothedTaskRuntimeEstimator(); 254 coreTestEstimator(specificEstimator, 3); 255 } 256 taskTypeSlots(TaskType type)257 int taskTypeSlots(TaskType type) { 258 return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT; 259 } 260 261 class SpeculationRequestEventHandler implements EventHandler<TaskEvent> { 262 263 @Override handle(TaskEvent event)264 public void handle(TaskEvent event) { 265 TaskId taskID = event.getTaskID(); 266 Task task = myJob.getTask(taskID); 267 268 Assert.assertEquals 269 ("Wrong type event", TaskEventType.T_ADD_SPEC_ATTEMPT, event.getType()); 270 271 System.out.println("SpeculationRequestEventHandler.handle adds a speculation task for " + taskID); 272 273 addAttempt(task); 274 } 275 } 276 addAttempt(Task task)277 void addAttempt(Task task) { 278 MyTaskImpl myTask = (MyTaskImpl) task; 279 280 myTask.addAttempt(); 281 } 282 283 class MyTaskImpl implements Task { 284 private final TaskId taskID; 285 private final Map<TaskAttemptId, TaskAttempt> attempts 286 = new ConcurrentHashMap<TaskAttemptId, TaskAttempt>(4); 287 MyTaskImpl(JobId jobID, int index, TaskType type)288 MyTaskImpl(JobId jobID, int index, TaskType type) { 289 taskID = recordFactory.newRecordInstance(TaskId.class); 290 taskID.setId(index); 291 taskID.setTaskType(type); 292 taskID.setJobId(jobID); 293 } 294 addAttempt()295 void addAttempt() { 296 TaskAttempt taskAttempt 297 = new MyTaskAttemptImpl(taskID, attempts.size(), clock); 298 TaskAttemptId taskAttemptID = taskAttempt.getID(); 299 300 attempts.put(taskAttemptID, taskAttempt); 301 302 System.out.println("TLTRE.MyTaskImpl.addAttempt " + getID()); 303 304 SpeculatorEvent event = new SpeculatorEvent(taskID, +1); 305 dispatcher.getEventHandler().handle(event); 306 } 307 308 @Override getID()309 public TaskId getID() { 310 return taskID; 311 } 312 313 @Override getReport()314 public TaskReport getReport() { 315 throw new UnsupportedOperationException("Not supported yet."); 316 } 317 318 @Override getCounters()319 public Counters getCounters() { 320 throw new UnsupportedOperationException("Not supported yet."); 321 } 322 323 @Override getProgress()324 public float getProgress() { 325 float result = 0.0F; 326 327 328 for (TaskAttempt attempt : attempts.values()) { 329 result = Math.max(result, attempt.getProgress()); 330 } 331 332 return result; 333 } 334 335 @Override getType()336 public TaskType getType() { 337 return taskID.getTaskType(); 338 } 339 340 @Override getAttempts()341 public Map<TaskAttemptId, TaskAttempt> getAttempts() { 342 Map<TaskAttemptId, TaskAttempt> result 343 = new HashMap<TaskAttemptId, TaskAttempt>(attempts.size()); 344 result.putAll(attempts); 345 return result; 346 } 347 348 @Override getAttempt(TaskAttemptId attemptID)349 public TaskAttempt getAttempt(TaskAttemptId attemptID) { 350 return attempts.get(attemptID); 351 } 352 353 @Override isFinished()354 public boolean isFinished() { 355 for (TaskAttempt attempt : attempts.values()) { 356 if (attempt.getState() == TaskAttemptState.SUCCEEDED) { 357 return true; 358 } 359 } 360 361 return false; 362 } 363 364 @Override canCommit(TaskAttemptId taskAttemptID)365 public boolean canCommit(TaskAttemptId taskAttemptID) { 366 throw new UnsupportedOperationException("Not supported yet."); 367 } 368 369 @Override getState()370 public TaskState getState() { 371 throw new UnsupportedOperationException("Not supported yet."); 372 } 373 374 } 375 376 class MyJobImpl implements Job { 377 private final JobId jobID; 378 private final Map<TaskId, Task> allTasks = new HashMap<TaskId, Task>(); 379 private final Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>(); 380 private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>(); 381 MyJobImpl(JobId jobID, int numMaps, int numReduces)382 MyJobImpl(JobId jobID, int numMaps, int numReduces) { 383 this.jobID = jobID; 384 for (int i = 0; i < numMaps; ++i) { 385 Task newTask = new MyTaskImpl(jobID, i, TaskType.MAP); 386 mapTasks.put(newTask.getID(), newTask); 387 allTasks.put(newTask.getID(), newTask); 388 } 389 for (int i = 0; i < numReduces; ++i) { 390 Task newTask = new MyTaskImpl(jobID, i, TaskType.REDUCE); 391 reduceTasks.put(newTask.getID(), newTask); 392 allTasks.put(newTask.getID(), newTask); 393 } 394 395 // give every task an attempt 396 for (Task task : allTasks.values()) { 397 MyTaskImpl myTaskImpl = (MyTaskImpl) task; 398 myTaskImpl.addAttempt(); 399 } 400 } 401 402 @Override getID()403 public JobId getID() { 404 return jobID; 405 } 406 407 @Override getState()408 public JobState getState() { 409 throw new UnsupportedOperationException("Not supported yet."); 410 } 411 412 @Override getReport()413 public JobReport getReport() { 414 throw new UnsupportedOperationException("Not supported yet."); 415 } 416 417 @Override getProgress()418 public float getProgress() { 419 return 0; 420 } 421 422 @Override getAllCounters()423 public Counters getAllCounters() { 424 throw new UnsupportedOperationException("Not supported yet."); 425 } 426 427 @Override getTasks()428 public Map<TaskId, Task> getTasks() { 429 return allTasks; 430 } 431 432 @Override getTasks(TaskType taskType)433 public Map<TaskId, Task> getTasks(TaskType taskType) { 434 return taskType == TaskType.MAP ? mapTasks : reduceTasks; 435 } 436 437 @Override getTask(TaskId taskID)438 public Task getTask(TaskId taskID) { 439 return allTasks.get(taskID); 440 } 441 442 @Override getDiagnostics()443 public List<String> getDiagnostics() { 444 throw new UnsupportedOperationException("Not supported yet."); 445 } 446 447 @Override getCompletedMaps()448 public int getCompletedMaps() { 449 return completedMaps.get(); 450 } 451 452 @Override getCompletedReduces()453 public int getCompletedReduces() { 454 return completedReduces.get(); 455 } 456 457 @Override 458 public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(int fromEventId, int maxEvents)459 getTaskAttemptCompletionEvents(int fromEventId, int maxEvents) { 460 throw new UnsupportedOperationException("Not supported yet."); 461 } 462 463 @Override 464 public TaskCompletionEvent[] getMapAttemptCompletionEvents(int startIndex, int maxEvents)465 getMapAttemptCompletionEvents(int startIndex, int maxEvents) { 466 throw new UnsupportedOperationException("Not supported yet."); 467 } 468 469 @Override getName()470 public String getName() { 471 throw new UnsupportedOperationException("Not supported yet."); 472 } 473 474 @Override getQueueName()475 public String getQueueName() { 476 throw new UnsupportedOperationException("Not supported yet."); 477 } 478 479 @Override getTotalMaps()480 public int getTotalMaps() { 481 return mapTasks.size(); 482 } 483 484 @Override getTotalReduces()485 public int getTotalReduces() { 486 return reduceTasks.size(); 487 } 488 489 @Override isUber()490 public boolean isUber() { 491 return false; 492 } 493 494 @Override checkAccess(UserGroupInformation callerUGI, JobACL jobOperation)495 public boolean checkAccess(UserGroupInformation callerUGI, 496 JobACL jobOperation) { 497 return true; 498 } 499 500 @Override getUserName()501 public String getUserName() { 502 throw new UnsupportedOperationException("Not supported yet."); 503 } 504 505 @Override getConfFile()506 public Path getConfFile() { 507 throw new UnsupportedOperationException("Not supported yet."); 508 } 509 510 @Override getJobACLs()511 public Map<JobACL, AccessControlList> getJobACLs() { 512 throw new UnsupportedOperationException("Not supported yet."); 513 } 514 515 @Override getAMInfos()516 public List<AMInfo> getAMInfos() { 517 throw new UnsupportedOperationException("Not supported yet."); 518 } 519 520 @Override loadConfFile()521 public Configuration loadConfFile() { 522 throw new UnsupportedOperationException(); 523 } 524 525 @Override setQueueName(String queueName)526 public void setQueueName(String queueName) { 527 // do nothing 528 } 529 } 530 531 /* 532 * We follow the pattern of the real XxxImpl . We create a job and initialize 533 * it with a full suite of tasks which in turn have one attempt each in the 534 * NEW state. Attempts transition only from NEW to RUNNING to SUCCEEDED . 535 */ 536 class MyTaskAttemptImpl implements TaskAttempt { 537 private final TaskAttemptId myAttemptID; 538 539 long startMockTime = Long.MIN_VALUE; 540 541 long shuffleCompletedTime = Long.MAX_VALUE; 542 543 TaskAttemptState overridingState = TaskAttemptState.NEW; 544 MyTaskAttemptImpl(TaskId taskID, int index, Clock clock)545 MyTaskAttemptImpl(TaskId taskID, int index, Clock clock) { 546 myAttemptID = recordFactory.newRecordInstance(TaskAttemptId.class); 547 myAttemptID.setId(index); 548 myAttemptID.setTaskId(taskID); 549 } 550 startUp()551 void startUp() { 552 startMockTime = clock.getTime(); 553 overridingState = null; 554 555 slotsInUse.addAndGet(taskTypeSlots(myAttemptID.getTaskId().getTaskType())); 556 557 System.out.println("TLTRE.MyTaskAttemptImpl.startUp starting " + getID()); 558 559 SpeculatorEvent event = new SpeculatorEvent(getID().getTaskId(), -1); 560 dispatcher.getEventHandler().handle(event); 561 } 562 563 @Override getNodeId()564 public NodeId getNodeId() throws UnsupportedOperationException{ 565 throw new UnsupportedOperationException(); 566 } 567 568 @Override getID()569 public TaskAttemptId getID() { 570 return myAttemptID; 571 } 572 573 @Override getReport()574 public TaskAttemptReport getReport() { 575 throw new UnsupportedOperationException("Not supported yet."); 576 } 577 578 @Override getDiagnostics()579 public List<String> getDiagnostics() { 580 throw new UnsupportedOperationException("Not supported yet."); 581 } 582 583 @Override getCounters()584 public Counters getCounters() { 585 throw new UnsupportedOperationException("Not supported yet."); 586 } 587 588 @Override getShufflePort()589 public int getShufflePort() { 590 throw new UnsupportedOperationException("Not supported yet."); 591 } 592 getCodeRuntime()593 private float getCodeRuntime() { 594 int taskIndex = myAttemptID.getTaskId().getId(); 595 int attemptIndex = myAttemptID.getId(); 596 597 float result = 200.0F; 598 599 switch (taskIndex % 4) { 600 case 0: 601 if (taskIndex % 40 == 0 && attemptIndex == 0) { 602 result = 600.0F; 603 break; 604 } 605 606 break; 607 case 2: 608 break; 609 610 case 1: 611 result = 150.0F; 612 break; 613 614 case 3: 615 result = 250.0F; 616 break; 617 } 618 619 return result; 620 } 621 getMapProgress()622 private float getMapProgress() { 623 float runtime = getCodeRuntime(); 624 625 return Math.min 626 ((float) (clock.getTime() - startMockTime) / (runtime * 1000.0F), 1.0F); 627 } 628 getReduceProgress()629 private float getReduceProgress() { 630 Job job = myAppContext.getJob(myAttemptID.getTaskId().getJobId()); 631 float runtime = getCodeRuntime(); 632 633 Collection<Task> allMapTasks = job.getTasks(TaskType.MAP).values(); 634 635 int numberMaps = allMapTasks.size(); 636 int numberDoneMaps = 0; 637 638 for (Task mapTask : allMapTasks) { 639 if (mapTask.isFinished()) { 640 ++numberDoneMaps; 641 } 642 } 643 644 if (numberMaps == numberDoneMaps) { 645 shuffleCompletedTime = Math.min(shuffleCompletedTime, clock.getTime()); 646 647 return Math.min 648 ((float) (clock.getTime() - shuffleCompletedTime) 649 / (runtime * 2000.0F) + 0.5F, 650 1.0F); 651 } else { 652 return ((float) numberDoneMaps) / numberMaps * 0.5F; 653 } 654 } 655 656 // we compute progress from time and an algorithm now 657 @Override getProgress()658 public float getProgress() { 659 if (overridingState == TaskAttemptState.NEW) { 660 return 0.0F; 661 } 662 return myAttemptID.getTaskId().getTaskType() == TaskType.MAP ? getMapProgress() : getReduceProgress(); 663 } 664 665 @Override getPhase()666 public Phase getPhase() { 667 throw new UnsupportedOperationException("Not supported yet."); 668 } 669 670 @Override getState()671 public TaskAttemptState getState() { 672 if (overridingState != null) { 673 return overridingState; 674 } 675 TaskAttemptState result 676 = getProgress() < 1.0F ? TaskAttemptState.RUNNING : TaskAttemptState.SUCCEEDED; 677 678 if (result == TaskAttemptState.SUCCEEDED) { 679 overridingState = TaskAttemptState.SUCCEEDED; 680 681 System.out.println("MyTaskAttemptImpl.getState() -- attempt " + myAttemptID + " finished."); 682 683 slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.getTaskId().getTaskType())); 684 685 (myAttemptID.getTaskId().getTaskType() == TaskType.MAP 686 ? completedMaps : completedReduces).getAndIncrement(); 687 688 // check for a spectacularly successful speculation 689 TaskId taskID = myAttemptID.getTaskId(); 690 691 Task task = myJob.getTask(taskID); 692 693 for (TaskAttempt otherAttempt : task.getAttempts().values()) { 694 if (otherAttempt != this 695 && otherAttempt.getState() == TaskAttemptState.RUNNING) { 696 // we had two instances running. Try to determine how much 697 // we might have saved by speculation 698 if (getID().getId() > otherAttempt.getID().getId()) { 699 // the speculation won 700 successfulSpeculations.getAndIncrement(); 701 float hisProgress = otherAttempt.getProgress(); 702 long hisStartTime = ((MyTaskAttemptImpl)otherAttempt).startMockTime; 703 System.out.println("TLTRE:A speculation finished at time " 704 + clock.getTime() 705 + ". The stalled attempt is at " + (hisProgress * 100.0) 706 + "% progress, and it started at " 707 + hisStartTime + ", which is " 708 + (clock.getTime() - hisStartTime) + " ago."); 709 long originalTaskEndEstimate 710 = (hisStartTime 711 + estimator.estimatedRuntime(otherAttempt.getID())); 712 System.out.println( 713 "TLTRE: We would have expected the original attempt to take " 714 + estimator.estimatedRuntime(otherAttempt.getID()) 715 + ", finishing at " + originalTaskEndEstimate); 716 long estimatedSavings = originalTaskEndEstimate - clock.getTime(); 717 taskTimeSavedBySpeculation.addAndGet(estimatedSavings); 718 System.out.println("TLTRE: The task is " + task.getID()); 719 slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.getTaskId().getTaskType())); 720 ((MyTaskAttemptImpl)otherAttempt).overridingState 721 = TaskAttemptState.KILLED; 722 } else { 723 System.out.println( 724 "TLTRE: The normal attempt beat the speculation in " 725 + task.getID()); 726 } 727 } 728 } 729 } 730 731 return result; 732 } 733 734 @Override isFinished()735 public boolean isFinished() { 736 return getProgress() == 1.0F; 737 } 738 739 @Override getAssignedContainerID()740 public ContainerId getAssignedContainerID() { 741 throw new UnsupportedOperationException("Not supported yet."); 742 } 743 744 @Override getNodeHttpAddress()745 public String getNodeHttpAddress() { 746 throw new UnsupportedOperationException("Not supported yet."); 747 } 748 749 @Override getNodeRackName()750 public String getNodeRackName() { 751 throw new UnsupportedOperationException("Not supported yet."); 752 } 753 754 @Override getLaunchTime()755 public long getLaunchTime() { 756 return startMockTime; 757 } 758 759 @Override getFinishTime()760 public long getFinishTime() { 761 throw new UnsupportedOperationException("Not supported yet."); 762 } 763 764 @Override getShuffleFinishTime()765 public long getShuffleFinishTime() { 766 throw new UnsupportedOperationException("Not supported yet."); 767 } 768 769 @Override getSortFinishTime()770 public long getSortFinishTime() { 771 throw new UnsupportedOperationException("Not supported yet."); 772 } 773 774 @Override getAssignedContainerMgrAddress()775 public String getAssignedContainerMgrAddress() { 776 throw new UnsupportedOperationException("Not supported yet."); 777 } 778 } 779 780 static class MockClock implements Clock { 781 private long currentTime = 0; 782 getTime()783 public long getTime() { 784 return currentTime; 785 } 786 setMeasuredTime(long newTime)787 void setMeasuredTime(long newTime) { 788 currentTime = newTime; 789 } 790 advanceTime(long increment)791 void advanceTime(long increment) { 792 currentTime += increment; 793 } 794 } 795 796 class MyAppMaster extends CompositeService { 797 final Clock clock; MyAppMaster(Clock clock)798 public MyAppMaster(Clock clock) { 799 super(MyAppMaster.class.getName()); 800 if (clock == null) { 801 clock = new SystemClock(); 802 } 803 this.clock = clock; 804 LOG.info("Created MyAppMaster"); 805 } 806 } 807 808 class MyAppContext implements AppContext { 809 private final ApplicationAttemptId myAppAttemptID; 810 private final ApplicationId myApplicationID; 811 private final JobId myJobID; 812 private final Map<JobId, Job> allJobs; 813 MyAppContext(int numberMaps, int numberReduces)814 MyAppContext(int numberMaps, int numberReduces) { 815 myApplicationID = ApplicationId.newInstance(clock.getTime(), 1); 816 817 myAppAttemptID = ApplicationAttemptId.newInstance(myApplicationID, 0); 818 myJobID = recordFactory.newRecordInstance(JobId.class); 819 myJobID.setAppId(myApplicationID); 820 821 Job myJob 822 = new MyJobImpl(myJobID, numberMaps, numberReduces); 823 824 allJobs = Collections.singletonMap(myJobID, myJob); 825 } 826 827 @Override getApplicationAttemptId()828 public ApplicationAttemptId getApplicationAttemptId() { 829 return myAppAttemptID; 830 } 831 832 @Override getApplicationID()833 public ApplicationId getApplicationID() { 834 return myApplicationID; 835 } 836 837 @Override getJob(JobId jobID)838 public Job getJob(JobId jobID) { 839 return allJobs.get(jobID); 840 } 841 842 @Override getAllJobs()843 public Map<JobId, Job> getAllJobs() { 844 return allJobs; 845 } 846 847 @Override getEventHandler()848 public EventHandler getEventHandler() { 849 return dispatcher.getEventHandler(); 850 } 851 852 @Override getUser()853 public CharSequence getUser() { 854 throw new UnsupportedOperationException("Not supported yet."); 855 } 856 857 @Override getClock()858 public Clock getClock() { 859 return clock; 860 } 861 862 @Override getApplicationName()863 public String getApplicationName() { 864 return null; 865 } 866 867 @Override getStartTime()868 public long getStartTime() { 869 return 0; 870 } 871 872 @Override getClusterInfo()873 public ClusterInfo getClusterInfo() { 874 return new ClusterInfo(); 875 } 876 877 @Override getBlacklistedNodes()878 public Set<String> getBlacklistedNodes() { 879 return null; 880 } 881 882 @Override getClientToAMTokenSecretManager()883 public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() { 884 return null; 885 } 886 887 @Override isLastAMRetry()888 public boolean isLastAMRetry() { 889 return false; 890 } 891 892 @Override hasSuccessfullyUnregistered()893 public boolean hasSuccessfullyUnregistered() { 894 // bogus - Not Required 895 return true; 896 } 897 898 @Override getNMHostname()899 public String getNMHostname() { 900 // bogus - Not Required 901 return null; 902 } 903 } 904 } 905