1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.nodemanager; 20 21 import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse; 22 import static org.mockito.Mockito.mock; 23 import static org.mockito.Mockito.when; 24 25 import java.io.EOFException; 26 import java.io.File; 27 import java.io.IOException; 28 import java.net.InetAddress; 29 import java.net.InetSocketAddress; 30 import java.net.UnknownHostException; 31 import java.nio.ByteBuffer; 32 import java.util.ArrayList; 33 import java.util.Collections; 34 import java.util.HashMap; 35 import java.util.HashSet; 36 import java.util.LinkedList; 37 import java.util.List; 38 import java.util.Map; 39 import java.util.Set; 40 import java.util.concurrent.ConcurrentMap; 41 import java.util.concurrent.CountDownLatch; 42 import java.util.concurrent.CyclicBarrier; 43 import java.util.concurrent.ExecutorService; 44 import java.util.concurrent.Executors; 45 import java.util.concurrent.TimeUnit; 46 import java.util.concurrent.atomic.AtomicBoolean; 47 import java.util.concurrent.atomic.AtomicInteger; 48 49 import org.apache.commons.logging.Log; 50 import org.apache.commons.logging.LogFactory; 51 import org.apache.hadoop.conf.Configuration; 52 import org.apache.hadoop.fs.FileContext; 53 import org.apache.hadoop.fs.Path; 54 import org.apache.hadoop.io.DataOutputBuffer; 55 import org.apache.hadoop.io.Text; 56 import org.apache.hadoop.io.retry.RetryPolicy; 57 import org.apache.hadoop.io.retry.RetryProxy; 58 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; 59 import org.apache.hadoop.net.NetUtils; 60 import org.apache.hadoop.security.Credentials; 61 import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; 62 import org.apache.hadoop.service.Service.STATE; 63 import org.apache.hadoop.service.ServiceOperations; 64 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 65 import org.apache.hadoop.yarn.api.records.ApplicationId; 66 import org.apache.hadoop.yarn.api.records.ContainerId; 67 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 68 import org.apache.hadoop.yarn.api.records.ContainerState; 69 import org.apache.hadoop.yarn.api.records.ContainerStatus; 70 import org.apache.hadoop.yarn.api.records.NodeId; 71 import org.apache.hadoop.yarn.api.records.Resource; 72 import org.apache.hadoop.yarn.api.records.Token; 73 import org.apache.hadoop.yarn.client.RMProxy; 74 import org.apache.hadoop.yarn.conf.YarnConfiguration; 75 import org.apache.hadoop.yarn.event.Dispatcher; 76 import org.apache.hadoop.yarn.event.EventHandler; 77 import org.apache.hadoop.yarn.exceptions.YarnException; 78 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 79 import org.apache.hadoop.yarn.factories.RecordFactory; 80 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; 81 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; 82 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; 83 import org.apache.hadoop.yarn.server.api.ResourceTracker; 84 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; 85 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; 86 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; 87 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; 88 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; 89 import org.apache.hadoop.yarn.server.api.records.MasterKey; 90 import org.apache.hadoop.yarn.server.api.records.NodeAction; 91 import org.apache.hadoop.yarn.server.api.records.NodeStatus; 92 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; 93 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; 94 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; 95 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; 96 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; 97 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; 98 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; 99 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; 100 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; 101 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; 102 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; 103 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; 104 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; 105 import org.apache.hadoop.yarn.server.utils.BuilderUtils; 106 import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; 107 import org.junit.After; 108 import org.junit.Assert; 109 import org.junit.Before; 110 import org.junit.Test; 111 112 @SuppressWarnings("rawtypes") 113 public class TestNodeStatusUpdater { 114 115 // temp fix until metrics system can auto-detect itself running in unit test: 116 static { 117 DefaultMetricsSystem.setMiniClusterMode(true); 118 } 119 120 static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class); 121 static final File basedir = 122 new File("target", TestNodeStatusUpdater.class.getName()); 123 static final File nmLocalDir = new File(basedir, "nm0"); 124 static final File tmpDir = new File(basedir, "tmpDir"); 125 static final File remoteLogsDir = new File(basedir, "remotelogs"); 126 static final File logsDir = new File(basedir, "logs"); 127 private static final RecordFactory recordFactory = RecordFactoryProvider 128 .getRecordFactory(null); 129 130 volatile int heartBeatID = 0; 131 volatile Throwable nmStartError = null; 132 private final List<NodeId> registeredNodes = new ArrayList<NodeId>(); 133 private boolean triggered = false; 134 private Configuration conf; 135 private NodeManager nm; 136 private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); 137 138 @Before setUp()139 public void setUp() { 140 nmLocalDir.mkdirs(); 141 tmpDir.mkdirs(); 142 logsDir.mkdirs(); 143 remoteLogsDir.mkdirs(); 144 conf = createNMConfig(); 145 } 146 147 @After tearDown()148 public void tearDown() { 149 this.registeredNodes.clear(); 150 heartBeatID = 0; 151 ServiceOperations.stop(nm); 152 assertionFailedInThread.set(false); 153 DefaultMetricsSystem.shutdown(); 154 } 155 createMasterKey()156 public static MasterKey createMasterKey() { 157 MasterKey masterKey = new MasterKeyPBImpl(); 158 masterKey.setKeyId(123); 159 masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123) 160 .byteValue() })); 161 return masterKey; 162 } 163 164 private class MyResourceTracker implements ResourceTracker { 165 166 private final Context context; 167 MyResourceTracker(Context context)168 public MyResourceTracker(Context context) { 169 this.context = context; 170 } 171 172 @Override registerNodeManager( RegisterNodeManagerRequest request)173 public RegisterNodeManagerResponse registerNodeManager( 174 RegisterNodeManagerRequest request) throws YarnException, 175 IOException { 176 NodeId nodeId = request.getNodeId(); 177 Resource resource = request.getResource(); 178 LOG.info("Registering " + nodeId.toString()); 179 // NOTE: this really should be checking against the config value 180 InetSocketAddress expected = NetUtils.getConnectAddress( 181 conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1)); 182 Assert.assertEquals(NetUtils.getHostPortString(expected), nodeId.toString()); 183 Assert.assertEquals(5 * 1024, resource.getMemory()); 184 registeredNodes.add(nodeId); 185 186 RegisterNodeManagerResponse response = recordFactory 187 .newRecordInstance(RegisterNodeManagerResponse.class); 188 response.setContainerTokenMasterKey(createMasterKey()); 189 response.setNMTokenMasterKey(createMasterKey()); 190 return response; 191 } 192 getAppToContainerStatusMap( List<ContainerStatus> containers)193 private Map<ApplicationId, List<ContainerStatus>> getAppToContainerStatusMap( 194 List<ContainerStatus> containers) { 195 Map<ApplicationId, List<ContainerStatus>> map = 196 new HashMap<ApplicationId, List<ContainerStatus>>(); 197 for (ContainerStatus cs : containers) { 198 ApplicationId applicationId = 199 cs.getContainerId().getApplicationAttemptId().getApplicationId(); 200 List<ContainerStatus> appContainers = map.get(applicationId); 201 if (appContainers == null) { 202 appContainers = new ArrayList<ContainerStatus>(); 203 map.put(applicationId, appContainers); 204 } 205 appContainers.add(cs); 206 } 207 return map; 208 } 209 210 @Override nodeHeartbeat(NodeHeartbeatRequest request)211 public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) 212 throws YarnException, IOException { 213 NodeStatus nodeStatus = request.getNodeStatus(); 214 LOG.info("Got heartbeat number " + heartBeatID); 215 NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class); 216 Dispatcher mockDispatcher = mock(Dispatcher.class); 217 EventHandler mockEventHandler = mock(EventHandler.class); 218 when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler); 219 NMStateStoreService stateStore = new NMNullStateStoreService(); 220 nodeStatus.setResponseId(heartBeatID++); 221 Map<ApplicationId, List<ContainerStatus>> appToContainers = 222 getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); 223 224 ApplicationId appId1 = ApplicationId.newInstance(0, 1); 225 ApplicationId appId2 = ApplicationId.newInstance(0, 2); 226 227 if (heartBeatID == 1) { 228 Assert.assertEquals(0, nodeStatus.getContainersStatuses().size()); 229 230 // Give a container to the NM. 231 ApplicationAttemptId appAttemptID = 232 ApplicationAttemptId.newInstance(appId1, 0); 233 ContainerId firstContainerID = 234 ContainerId.newContainerId(appAttemptID, heartBeatID); 235 ContainerLaunchContext launchContext = recordFactory 236 .newRecordInstance(ContainerLaunchContext.class); 237 Resource resource = BuilderUtils.newResource(2, 1); 238 long currentTime = System.currentTimeMillis(); 239 String user = "testUser"; 240 ContainerTokenIdentifier containerToken = BuilderUtils 241 .newContainerTokenIdentifier(BuilderUtils.newContainerToken( 242 firstContainerID, InetAddress.getByName("localhost") 243 .getCanonicalHostName(), 1234, user, resource, 244 currentTime + 10000, 123, "password".getBytes(), currentTime)); 245 Container container = new ContainerImpl(conf, mockDispatcher, 246 stateStore, launchContext, null, mockMetrics, containerToken); 247 this.context.getContainers().put(firstContainerID, container); 248 } else if (heartBeatID == 2) { 249 // Checks on the RM end 250 Assert.assertEquals("Number of applications should only be one!", 1, 251 nodeStatus.getContainersStatuses().size()); 252 Assert.assertEquals("Number of container for the app should be one!", 253 1, appToContainers.get(appId1).size()); 254 255 // Checks on the NM end 256 ConcurrentMap<ContainerId, Container> activeContainers = 257 this.context.getContainers(); 258 Assert.assertEquals(1, activeContainers.size()); 259 260 // Give another container to the NM. 261 ApplicationAttemptId appAttemptID = 262 ApplicationAttemptId.newInstance(appId2, 0); 263 ContainerId secondContainerID = 264 ContainerId.newContainerId(appAttemptID, heartBeatID); 265 ContainerLaunchContext launchContext = recordFactory 266 .newRecordInstance(ContainerLaunchContext.class); 267 long currentTime = System.currentTimeMillis(); 268 String user = "testUser"; 269 Resource resource = BuilderUtils.newResource(3, 1); 270 ContainerTokenIdentifier containerToken = BuilderUtils 271 .newContainerTokenIdentifier(BuilderUtils.newContainerToken( 272 secondContainerID, InetAddress.getByName("localhost") 273 .getCanonicalHostName(), 1234, user, resource, 274 currentTime + 10000, 123, "password".getBytes(), currentTime)); 275 Container container = new ContainerImpl(conf, mockDispatcher, 276 stateStore, launchContext, null, mockMetrics, containerToken); 277 this.context.getContainers().put(secondContainerID, container); 278 } else if (heartBeatID == 3) { 279 // Checks on the RM end 280 Assert.assertEquals("Number of applications should have two!", 2, 281 appToContainers.size()); 282 Assert.assertEquals("Number of container for the app-1 should be only one!", 283 1, appToContainers.get(appId1).size()); 284 Assert.assertEquals("Number of container for the app-2 should be only one!", 285 1, appToContainers.get(appId2).size()); 286 287 // Checks on the NM end 288 ConcurrentMap<ContainerId, Container> activeContainers = 289 this.context.getContainers(); 290 Assert.assertEquals(2, activeContainers.size()); 291 } 292 293 NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. 294 newNodeHeartbeatResponse(heartBeatID, null, null, null, null, null, 295 1000L); 296 return nhResponse; 297 } 298 } 299 300 private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl { 301 public ResourceTracker resourceTracker; 302 private Context context; 303 MyNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics)304 public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, 305 NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { 306 super(context, dispatcher, healthChecker, metrics); 307 this.context = context; 308 resourceTracker = new MyResourceTracker(this.context); 309 } 310 311 @Override getRMClient()312 protected ResourceTracker getRMClient() { 313 return resourceTracker; 314 } 315 316 @Override stopRMProxy()317 protected void stopRMProxy() { 318 return; 319 } 320 } 321 322 // Test NodeStatusUpdater sends the right container statuses each time it 323 // heart beats. 324 private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl { 325 public ResourceTracker resourceTracker; 326 MyNodeStatusUpdater2(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics)327 public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher, 328 NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { 329 super(context, dispatcher, healthChecker, metrics); 330 resourceTracker = new MyResourceTracker4(context); 331 } 332 333 @Override getRMClient()334 protected ResourceTracker getRMClient() { 335 return resourceTracker; 336 } 337 338 @Override stopRMProxy()339 protected void stopRMProxy() { 340 return; 341 } 342 } 343 344 private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl { 345 public ResourceTracker resourceTracker; 346 private Context context; 347 MyNodeStatusUpdater3(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics)348 public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher, 349 NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { 350 super(context, dispatcher, healthChecker, metrics); 351 this.context = context; 352 this.resourceTracker = new MyResourceTracker3(this.context); 353 } 354 355 @Override getRMClient()356 protected ResourceTracker getRMClient() { 357 return resourceTracker; 358 } 359 360 @Override stopRMProxy()361 protected void stopRMProxy() { 362 return; 363 } 364 365 @Override isTokenKeepAliveEnabled(Configuration conf)366 protected boolean isTokenKeepAliveEnabled(Configuration conf) { 367 return true; 368 } 369 } 370 371 private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl { 372 373 private final long rmStartIntervalMS; 374 private final boolean rmNeverStart; 375 public ResourceTracker resourceTracker; MyNodeStatusUpdater4(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, long rmStartIntervalMS, boolean rmNeverStart)376 public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher, 377 NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, 378 long rmStartIntervalMS, boolean rmNeverStart) { 379 super(context, dispatcher, healthChecker, metrics); 380 this.rmStartIntervalMS = rmStartIntervalMS; 381 this.rmNeverStart = rmNeverStart; 382 } 383 384 @Override serviceStart()385 protected void serviceStart() throws Exception { 386 //record the startup time 387 super.serviceStart(); 388 } 389 390 @Override getRMClient()391 protected ResourceTracker getRMClient() throws IOException { 392 RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); 393 resourceTracker = 394 (ResourceTracker) RetryProxy.create(ResourceTracker.class, 395 new MyResourceTracker6(rmStartIntervalMS, rmNeverStart), 396 retryPolicy); 397 return resourceTracker; 398 } 399 isTriggered()400 private boolean isTriggered() { 401 return triggered; 402 } 403 404 @Override stopRMProxy()405 protected void stopRMProxy() { 406 return; 407 } 408 } 409 410 411 412 private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl { 413 private ResourceTracker resourceTracker; 414 private Configuration conf; 415 MyNodeStatusUpdater5(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf)416 public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher, 417 NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf) { 418 super(context, dispatcher, healthChecker, metrics); 419 resourceTracker = new MyResourceTracker5(); 420 this.conf = conf; 421 } 422 423 @Override getRMClient()424 protected ResourceTracker getRMClient() { 425 RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); 426 return (ResourceTracker) RetryProxy.create(ResourceTracker.class, 427 resourceTracker, retryPolicy); 428 } 429 430 @Override stopRMProxy()431 protected void stopRMProxy() { 432 return; 433 } 434 } 435 436 private class MyNodeManager extends NodeManager { 437 438 private MyNodeStatusUpdater3 nodeStatusUpdater; 439 @Override createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker)440 protected NodeStatusUpdater createNodeStatusUpdater(Context context, 441 Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { 442 this.nodeStatusUpdater = 443 new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics); 444 return this.nodeStatusUpdater; 445 } 446 getNodeStatusUpdater()447 public MyNodeStatusUpdater3 getNodeStatusUpdater() { 448 return this.nodeStatusUpdater; 449 } 450 } 451 452 private class MyNodeManager2 extends NodeManager { 453 public boolean isStopped = false; 454 private NodeStatusUpdater nodeStatusUpdater; 455 private CyclicBarrier syncBarrier; 456 private Configuration conf; 457 MyNodeManager2(CyclicBarrier syncBarrier, Configuration conf)458 public MyNodeManager2 (CyclicBarrier syncBarrier, Configuration conf) { 459 this.syncBarrier = syncBarrier; 460 this.conf = conf; 461 } 462 @Override createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker)463 protected NodeStatusUpdater createNodeStatusUpdater(Context context, 464 Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { 465 nodeStatusUpdater = 466 new MyNodeStatusUpdater5(context, dispatcher, healthChecker, 467 metrics, conf); 468 return nodeStatusUpdater; 469 } 470 471 @Override serviceStop()472 protected void serviceStop() throws Exception { 473 System.out.println("Called stooppppp"); 474 super.serviceStop(); 475 isStopped = true; 476 ConcurrentMap<ApplicationId, Application> applications = 477 getNMContext().getApplications(); 478 // ensure that applications are empty 479 if(!applications.isEmpty()) { 480 assertionFailedInThread.set(true); 481 } 482 syncBarrier.await(10000, TimeUnit.MILLISECONDS); 483 } 484 } 485 // 486 private class MyResourceTracker2 implements ResourceTracker { 487 public NodeAction heartBeatNodeAction = NodeAction.NORMAL; 488 public NodeAction registerNodeAction = NodeAction.NORMAL; 489 public String shutDownMessage = ""; 490 public String rmVersion = "3.0.1"; 491 492 @Override registerNodeManager( RegisterNodeManagerRequest request)493 public RegisterNodeManagerResponse registerNodeManager( 494 RegisterNodeManagerRequest request) throws YarnException, 495 IOException { 496 497 RegisterNodeManagerResponse response = recordFactory 498 .newRecordInstance(RegisterNodeManagerResponse.class); 499 response.setNodeAction(registerNodeAction ); 500 response.setContainerTokenMasterKey(createMasterKey()); 501 response.setNMTokenMasterKey(createMasterKey()); 502 response.setDiagnosticsMessage(shutDownMessage); 503 response.setRMVersion(rmVersion); 504 return response; 505 } 506 @Override nodeHeartbeat(NodeHeartbeatRequest request)507 public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) 508 throws YarnException, IOException { 509 NodeStatus nodeStatus = request.getNodeStatus(); 510 nodeStatus.setResponseId(heartBeatID++); 511 512 NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. 513 newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, 514 null, null, null, 1000L); 515 nhResponse.setDiagnosticsMessage(shutDownMessage); 516 return nhResponse; 517 } 518 } 519 520 private class MyResourceTracker3 implements ResourceTracker { 521 public NodeAction heartBeatNodeAction = NodeAction.NORMAL; 522 public NodeAction registerNodeAction = NodeAction.NORMAL; 523 private Map<ApplicationId, List<Long>> keepAliveRequests = 524 new HashMap<ApplicationId, List<Long>>(); 525 private ApplicationId appId = BuilderUtils.newApplicationId(1, 1); 526 private final Context context; 527 MyResourceTracker3(Context context)528 MyResourceTracker3(Context context) { 529 this.context = context; 530 } 531 532 @Override registerNodeManager( RegisterNodeManagerRequest request)533 public RegisterNodeManagerResponse registerNodeManager( 534 RegisterNodeManagerRequest request) throws YarnException, 535 IOException { 536 537 RegisterNodeManagerResponse response = 538 recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); 539 response.setNodeAction(registerNodeAction); 540 response.setContainerTokenMasterKey(createMasterKey()); 541 response.setNMTokenMasterKey(createMasterKey()); 542 return response; 543 } 544 545 @Override nodeHeartbeat(NodeHeartbeatRequest request)546 public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) 547 throws YarnException, IOException { 548 LOG.info("Got heartBeatId: [" + heartBeatID +"]"); 549 NodeStatus nodeStatus = request.getNodeStatus(); 550 nodeStatus.setResponseId(heartBeatID++); 551 NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. 552 newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, 553 null, null, null, 1000L); 554 555 if (nodeStatus.getKeepAliveApplications() != null 556 && nodeStatus.getKeepAliveApplications().size() > 0) { 557 for (ApplicationId appId : nodeStatus.getKeepAliveApplications()) { 558 List<Long> list = keepAliveRequests.get(appId); 559 if (list == null) { 560 list = new LinkedList<Long>(); 561 keepAliveRequests.put(appId, list); 562 } 563 list.add(System.currentTimeMillis()); 564 } 565 } 566 if (heartBeatID == 2) { 567 LOG.info("Sending FINISH_APP for application: [" + appId + "]"); 568 this.context.getApplications().put(appId, mock(Application.class)); 569 nhResponse.addAllApplicationsToCleanup(Collections.singletonList(appId)); 570 } 571 return nhResponse; 572 } 573 } 574 575 // Test NodeStatusUpdater sends the right container statuses each time it 576 // heart beats. 577 private Credentials expectedCredentials = new Credentials(); 578 private class MyResourceTracker4 implements ResourceTracker { 579 580 public NodeAction registerNodeAction = NodeAction.NORMAL; 581 public NodeAction heartBeatNodeAction = NodeAction.NORMAL; 582 private Context context; 583 private final ContainerStatus containerStatus2 = 584 createContainerStatus(2, ContainerState.RUNNING); 585 private final ContainerStatus containerStatus3 = 586 createContainerStatus(3, ContainerState.COMPLETE); 587 private final ContainerStatus containerStatus4 = 588 createContainerStatus(4, ContainerState.RUNNING); 589 private final ContainerStatus containerStatus5 = 590 createContainerStatus(5, ContainerState.COMPLETE); 591 MyResourceTracker4(Context context)592 public MyResourceTracker4(Context context) { 593 // create app Credentials 594 org.apache.hadoop.security.token.Token<DelegationTokenIdentifier> token1 = 595 new org.apache.hadoop.security.token.Token<DelegationTokenIdentifier>(); 596 token1.setKind(new Text("kind1")); 597 expectedCredentials.addToken(new Text("token1"), token1); 598 this.context = context; 599 } 600 601 @Override registerNodeManager( RegisterNodeManagerRequest request)602 public RegisterNodeManagerResponse registerNodeManager( 603 RegisterNodeManagerRequest request) throws YarnException, IOException { 604 RegisterNodeManagerResponse response = 605 recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); 606 response.setNodeAction(registerNodeAction); 607 response.setContainerTokenMasterKey(createMasterKey()); 608 response.setNMTokenMasterKey(createMasterKey()); 609 return response; 610 } 611 612 @Override nodeHeartbeat(NodeHeartbeatRequest request)613 public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) 614 throws YarnException, IOException { 615 List<ContainerId> finishedContainersPulledByAM = new ArrayList 616 <ContainerId>(); 617 try { 618 if (heartBeatID == 0) { 619 Assert.assertEquals(0, request.getNodeStatus().getContainersStatuses() 620 .size()); 621 Assert.assertEquals(0, context.getContainers().size()); 622 } else if (heartBeatID == 1) { 623 List<ContainerStatus> statuses = 624 request.getNodeStatus().getContainersStatuses(); 625 Assert.assertEquals(2, statuses.size()); 626 Assert.assertEquals(2, context.getContainers().size()); 627 628 boolean container2Exist = false, container3Exist = false; 629 for (ContainerStatus status : statuses) { 630 if (status.getContainerId().equals( 631 containerStatus2.getContainerId())) { 632 Assert.assertTrue(status.getState().equals( 633 containerStatus2.getState())); 634 container2Exist = true; 635 } 636 if (status.getContainerId().equals( 637 containerStatus3.getContainerId())) { 638 Assert.assertTrue(status.getState().equals( 639 containerStatus3.getState())); 640 container3Exist = true; 641 } 642 } 643 Assert.assertTrue(container2Exist && container3Exist); 644 645 // should throw exception that can be retried by the 646 // nodeStatusUpdaterRunnable, otherwise nm just shuts down and the 647 // test passes. 648 throw new YarnRuntimeException("Lost the heartbeat response"); 649 } else if (heartBeatID == 2 || heartBeatID == 3) { 650 List<ContainerStatus> statuses = 651 request.getNodeStatus().getContainersStatuses(); 652 if (heartBeatID == 2) { 653 // NM should send completed containers again, since the last 654 // heartbeat is lost. 655 Assert.assertEquals(4, statuses.size()); 656 } else { 657 // NM should not send completed containers again, since the last 658 // heartbeat is successful. 659 Assert.assertEquals(2, statuses.size()); 660 } 661 Assert.assertEquals(4, context.getContainers().size()); 662 663 boolean container2Exist = false, container3Exist = false, 664 container4Exist = false, container5Exist = false; 665 for (ContainerStatus status : statuses) { 666 if (status.getContainerId().equals( 667 containerStatus2.getContainerId())) { 668 Assert.assertTrue(status.getState().equals( 669 containerStatus2.getState())); 670 container2Exist = true; 671 } 672 if (status.getContainerId().equals( 673 containerStatus3.getContainerId())) { 674 Assert.assertTrue(status.getState().equals( 675 containerStatus3.getState())); 676 container3Exist = true; 677 } 678 if (status.getContainerId().equals( 679 containerStatus4.getContainerId())) { 680 Assert.assertTrue(status.getState().equals( 681 containerStatus4.getState())); 682 container4Exist = true; 683 } 684 if (status.getContainerId().equals( 685 containerStatus5.getContainerId())) { 686 Assert.assertTrue(status.getState().equals( 687 containerStatus5.getState())); 688 container5Exist = true; 689 } 690 } 691 if (heartBeatID == 2) { 692 Assert.assertTrue(container2Exist && container3Exist 693 && container4Exist && container5Exist); 694 } else { 695 // NM do not send completed containers again 696 Assert.assertTrue(container2Exist && !container3Exist 697 && container4Exist && !container5Exist); 698 } 699 700 if (heartBeatID == 3) { 701 finishedContainersPulledByAM.add(containerStatus3.getContainerId()); 702 } 703 } else if (heartBeatID == 4) { 704 List<ContainerStatus> statuses = 705 request.getNodeStatus().getContainersStatuses(); 706 Assert.assertEquals(2, statuses.size()); 707 // Container 3 is acked by AM, hence removed from context 708 Assert.assertEquals(3, context.getContainers().size()); 709 710 boolean container3Exist = false; 711 for (ContainerStatus status : statuses) { 712 if (status.getContainerId().equals( 713 containerStatus3.getContainerId())) { 714 container3Exist = true; 715 } 716 } 717 Assert.assertFalse(container3Exist); 718 } 719 } catch (AssertionError error) { 720 error.printStackTrace(); 721 assertionFailedInThread.set(true); 722 } finally { 723 heartBeatID++; 724 } 725 NodeStatus nodeStatus = request.getNodeStatus(); 726 nodeStatus.setResponseId(heartBeatID); 727 NodeHeartbeatResponse nhResponse = 728 YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, 729 heartBeatNodeAction, null, null, null, null, 1000L); 730 nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM); 731 Map<ApplicationId, ByteBuffer> appCredentials = 732 new HashMap<ApplicationId, ByteBuffer>(); 733 DataOutputBuffer dob = new DataOutputBuffer(); 734 expectedCredentials.writeTokenStorageToStream(dob); 735 ByteBuffer byteBuffer1 = 736 ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 737 appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1); 738 nhResponse.setSystemCredentialsForApps(appCredentials); 739 return nhResponse; 740 } 741 } 742 743 private class MyResourceTracker5 implements ResourceTracker { 744 public NodeAction registerNodeAction = NodeAction.NORMAL; 745 @Override registerNodeManager( RegisterNodeManagerRequest request)746 public RegisterNodeManagerResponse registerNodeManager( 747 RegisterNodeManagerRequest request) throws YarnException, 748 IOException { 749 750 RegisterNodeManagerResponse response = recordFactory 751 .newRecordInstance(RegisterNodeManagerResponse.class); 752 response.setNodeAction(registerNodeAction ); 753 response.setContainerTokenMasterKey(createMasterKey()); 754 response.setNMTokenMasterKey(createMasterKey()); 755 return response; 756 } 757 758 @Override nodeHeartbeat(NodeHeartbeatRequest request)759 public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) 760 throws YarnException, IOException { 761 heartBeatID++; 762 if(heartBeatID == 1) { 763 // EOFException should be retried as well. 764 throw new EOFException("NodeHeartbeat exception"); 765 } 766 else { 767 throw new java.net.ConnectException( 768 "NodeHeartbeat exception"); 769 } 770 } 771 } 772 773 private class MyResourceTracker6 implements ResourceTracker { 774 775 private long rmStartIntervalMS; 776 private boolean rmNeverStart; 777 private final long waitStartTime; 778 MyResourceTracker6(long rmStartIntervalMS, boolean rmNeverStart)779 public MyResourceTracker6(long rmStartIntervalMS, boolean rmNeverStart) { 780 this.rmStartIntervalMS = rmStartIntervalMS; 781 this.rmNeverStart = rmNeverStart; 782 this.waitStartTime = System.currentTimeMillis(); 783 } 784 785 @Override registerNodeManager( RegisterNodeManagerRequest request)786 public RegisterNodeManagerResponse registerNodeManager( 787 RegisterNodeManagerRequest request) throws YarnException, IOException, 788 IOException { 789 if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS 790 || rmNeverStart) { 791 throw new java.net.ConnectException("Faking RM start failure as start " 792 + "delay timer has not expired."); 793 } else { 794 NodeId nodeId = request.getNodeId(); 795 Resource resource = request.getResource(); 796 LOG.info("Registering " + nodeId.toString()); 797 // NOTE: this really should be checking against the config value 798 InetSocketAddress expected = NetUtils.getConnectAddress( 799 conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1)); 800 Assert.assertEquals(NetUtils.getHostPortString(expected), 801 nodeId.toString()); 802 Assert.assertEquals(5 * 1024, resource.getMemory()); 803 registeredNodes.add(nodeId); 804 805 RegisterNodeManagerResponse response = recordFactory 806 .newRecordInstance(RegisterNodeManagerResponse.class); 807 triggered = true; 808 return response; 809 } 810 } 811 812 @Override nodeHeartbeat(NodeHeartbeatRequest request)813 public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) 814 throws YarnException, IOException { 815 NodeStatus nodeStatus = request.getNodeStatus(); 816 nodeStatus.setResponseId(heartBeatID++); 817 818 NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. 819 newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null, 820 null, null, null, 1000L); 821 return nhResponse; 822 } 823 } 824 825 @Before clearError()826 public void clearError() { 827 nmStartError = null; 828 } 829 830 @After deleteBaseDir()831 public void deleteBaseDir() throws IOException { 832 FileContext lfs = FileContext.getLocalFSFileContext(); 833 lfs.delete(new Path(basedir.getPath()), true); 834 } 835 836 @Test(timeout = 90000) testRecentlyFinishedContainers()837 public void testRecentlyFinishedContainers() throws Exception { 838 NodeManager nm = new NodeManager(); 839 YarnConfiguration conf = new YarnConfiguration(); 840 conf.set( 841 NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, 842 "10000"); 843 nm.init(conf); 844 NodeStatusUpdaterImpl nodeStatusUpdater = 845 (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); 846 ApplicationId appId = ApplicationId.newInstance(0, 0); 847 ApplicationAttemptId appAttemptId = 848 ApplicationAttemptId.newInstance(appId, 0); 849 ContainerId cId = ContainerId.newContainerId(appAttemptId, 0); 850 nm.getNMContext().getApplications().putIfAbsent(appId, 851 mock(Application.class)); 852 nm.getNMContext().getContainers().putIfAbsent(cId, mock(Container.class)); 853 854 nodeStatusUpdater.addCompletedContainer(cId); 855 Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); 856 857 nm.getNMContext().getContainers().remove(cId); 858 long time1 = System.currentTimeMillis(); 859 int waitInterval = 15; 860 while (waitInterval-- > 0 861 && nodeStatusUpdater.isContainerRecentlyStopped(cId)) { 862 nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); 863 Thread.sleep(1000); 864 } 865 long time2 = System.currentTimeMillis(); 866 // By this time the container will be removed from cache. need to verify. 867 Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); 868 Assert.assertTrue((time2 - time1) >= 10000 && (time2 - time1) <= 250000); 869 } 870 871 @Test(timeout = 90000) testRemovePreviousCompletedContainersFromContext()872 public void testRemovePreviousCompletedContainersFromContext() throws Exception { 873 NodeManager nm = new NodeManager(); 874 YarnConfiguration conf = new YarnConfiguration(); 875 conf.set( 876 NodeStatusUpdaterImpl 877 .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, 878 "10000"); 879 nm.init(conf); 880 NodeStatusUpdaterImpl nodeStatusUpdater = 881 (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); 882 ApplicationId appId = ApplicationId.newInstance(0, 0); 883 ApplicationAttemptId appAttemptId = 884 ApplicationAttemptId.newInstance(appId, 0); 885 886 ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); 887 Token containerToken = 888 BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser", 889 BuilderUtils.newResource(1024, 1), 0, 123, 890 "password".getBytes(), 0); 891 Container anyCompletedContainer = new ContainerImpl(conf, null, 892 null, null, null, null, 893 BuilderUtils.newContainerTokenIdentifier(containerToken)) { 894 895 @Override 896 public ContainerState getCurrentState() { 897 return ContainerState.COMPLETE; 898 } 899 900 @Override 901 public org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState getContainerState() { 902 return org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE; 903 } 904 }; 905 906 ContainerId runningContainerId = 907 ContainerId.newContainerId(appAttemptId, 3); 908 Token runningContainerToken = 909 BuilderUtils.newContainerToken(runningContainerId, "anyHost", 910 1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123, 911 "password".getBytes(), 0); 912 Container runningContainer = 913 new ContainerImpl(conf, null, null, null, null, null, 914 BuilderUtils.newContainerTokenIdentifier(runningContainerToken)) { 915 @Override 916 public ContainerState getCurrentState() { 917 return ContainerState.RUNNING; 918 } 919 920 @Override 921 public org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState getContainerState() { 922 return org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.RUNNING; 923 } 924 }; 925 926 nm.getNMContext().getApplications().putIfAbsent(appId, 927 mock(Application.class)); 928 nm.getNMContext().getContainers().put(cId, anyCompletedContainer); 929 nm.getNMContext().getContainers() 930 .put(runningContainerId, runningContainer); 931 932 Assert.assertEquals(2, nodeStatusUpdater.getContainerStatuses().size()); 933 934 List<ContainerId> ackedContainers = new ArrayList<ContainerId>(); 935 ackedContainers.add(cId); 936 ackedContainers.add(runningContainerId); 937 938 nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(ackedContainers); 939 940 Set<ContainerId> containerIdSet = new HashSet<ContainerId>(); 941 List<ContainerStatus> containerStatuses = nodeStatusUpdater.getContainerStatuses(); 942 for (ContainerStatus status : containerStatuses) { 943 containerIdSet.add(status.getContainerId()); 944 } 945 946 Assert.assertEquals(1, containerStatuses.size()); 947 // completed container is removed; 948 Assert.assertFalse(containerIdSet.contains(cId)); 949 // running container is not removed; 950 Assert.assertTrue(containerIdSet.contains(runningContainerId)); 951 } 952 953 @Test(timeout = 10000) testCompletedContainersIsRecentlyStopped()954 public void testCompletedContainersIsRecentlyStopped() throws Exception { 955 NodeManager nm = new NodeManager(); 956 nm.init(conf); 957 NodeStatusUpdaterImpl nodeStatusUpdater = 958 (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); 959 ApplicationId appId = ApplicationId.newInstance(0, 0); 960 Application completedApp = mock(Application.class); 961 when(completedApp.getApplicationState()).thenReturn( 962 ApplicationState.FINISHED); 963 ApplicationAttemptId appAttemptId = 964 ApplicationAttemptId.newInstance(appId, 0); 965 ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); 966 Token containerToken = 967 BuilderUtils.newContainerToken(containerId, "host", 1234, "user", 968 BuilderUtils.newResource(1024, 1), 0, 123, 969 "password".getBytes(), 0); 970 Container completedContainer = new ContainerImpl(conf, null, 971 null, null, null, null, 972 BuilderUtils.newContainerTokenIdentifier(containerToken)) { 973 @Override 974 public ContainerState getCurrentState() { 975 return ContainerState.COMPLETE; 976 } 977 }; 978 979 nm.getNMContext().getApplications().putIfAbsent(appId, completedApp); 980 nm.getNMContext().getContainers().put(containerId, completedContainer); 981 982 Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); 983 Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped( 984 containerId)); 985 } 986 987 @Test testCleanedupApplicationContainerCleanup()988 public void testCleanedupApplicationContainerCleanup() throws IOException { 989 NodeManager nm = new NodeManager(); 990 YarnConfiguration conf = new YarnConfiguration(); 991 conf.set(NodeStatusUpdaterImpl 992 .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, 993 "1000000"); 994 nm.init(conf); 995 996 NodeStatusUpdaterImpl nodeStatusUpdater = 997 (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); 998 ApplicationId appId = ApplicationId.newInstance(0, 0); 999 ApplicationAttemptId appAttemptId = 1000 ApplicationAttemptId.newInstance(appId, 0); 1001 1002 ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); 1003 Token containerToken = 1004 BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser", 1005 BuilderUtils.newResource(1024, 1), 0, 123, 1006 "password".getBytes(), 0); 1007 Container anyCompletedContainer = new ContainerImpl(conf, null, 1008 null, null, null, null, 1009 BuilderUtils.newContainerTokenIdentifier(containerToken)) { 1010 1011 @Override 1012 public ContainerState getCurrentState() { 1013 return ContainerState.COMPLETE; 1014 } 1015 }; 1016 1017 Application application = mock(Application.class); 1018 when(application.getApplicationState()).thenReturn(ApplicationState.RUNNING); 1019 nm.getNMContext().getApplications().putIfAbsent(appId, application); 1020 nm.getNMContext().getContainers().put(cId, anyCompletedContainer); 1021 1022 Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); 1023 1024 when(application.getApplicationState()).thenReturn( 1025 ApplicationState.FINISHING_CONTAINERS_WAIT); 1026 // The completed container will be saved in case of lost heartbeat. 1027 Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); 1028 Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); 1029 1030 nm.getNMContext().getContainers().put(cId, anyCompletedContainer); 1031 nm.getNMContext().getApplications().remove(appId); 1032 // The completed container will be saved in case of lost heartbeat. 1033 Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); 1034 Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); 1035 } 1036 1037 @Test testNMRegistration()1038 public void testNMRegistration() throws InterruptedException { 1039 nm = new NodeManager() { 1040 @Override 1041 protected NodeStatusUpdater createNodeStatusUpdater(Context context, 1042 Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { 1043 return new MyNodeStatusUpdater(context, dispatcher, healthChecker, 1044 metrics); 1045 } 1046 }; 1047 1048 YarnConfiguration conf = createNMConfig(); 1049 nm.init(conf); 1050 1051 // verify that the last service is the nodeStatusUpdater (ie registration 1052 // with RM) 1053 Object[] services = nm.getServices().toArray(); 1054 Object lastService = services[services.length-1]; 1055 Assert.assertTrue("last service is NOT the node status updater", 1056 lastService instanceof NodeStatusUpdater); 1057 1058 new Thread() { 1059 public void run() { 1060 try { 1061 nm.start(); 1062 } catch (Throwable e) { 1063 TestNodeStatusUpdater.this.nmStartError = e; 1064 throw new YarnRuntimeException(e); 1065 } 1066 } 1067 }.start(); 1068 1069 System.out.println(" ----- thread already started.." 1070 + nm.getServiceState()); 1071 1072 int waitCount = 0; 1073 while (nm.getServiceState() == STATE.INITED && waitCount++ != 50) { 1074 LOG.info("Waiting for NM to start.."); 1075 if (nmStartError != null) { 1076 LOG.error("Error during startup. ", nmStartError); 1077 Assert.fail(nmStartError.getCause().getMessage()); 1078 } 1079 Thread.sleep(2000); 1080 } 1081 if (nm.getServiceState() != STATE.STARTED) { 1082 // NM could have failed. 1083 Assert.fail("NodeManager failed to start"); 1084 } 1085 1086 waitCount = 0; 1087 while (heartBeatID <= 3 && waitCount++ != 200) { 1088 Thread.sleep(1000); 1089 } 1090 Assert.assertFalse(heartBeatID <= 3); 1091 Assert.assertEquals("Number of registered NMs is wrong!!", 1, 1092 this.registeredNodes.size()); 1093 1094 nm.stop(); 1095 } 1096 1097 @Test testStopReentrant()1098 public void testStopReentrant() throws Exception { 1099 final AtomicInteger numCleanups = new AtomicInteger(0); 1100 nm = new NodeManager() { 1101 @Override 1102 protected NodeStatusUpdater createNodeStatusUpdater(Context context, 1103 Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { 1104 MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( 1105 context, dispatcher, healthChecker, metrics); 1106 MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); 1107 myResourceTracker2.heartBeatNodeAction = NodeAction.SHUTDOWN; 1108 myNodeStatusUpdater.resourceTracker = myResourceTracker2; 1109 return myNodeStatusUpdater; 1110 } 1111 1112 @Override 1113 protected ContainerManagerImpl createContainerManager(Context context, 1114 ContainerExecutor exec, DeletionService del, 1115 NodeStatusUpdater nodeStatusUpdater, 1116 ApplicationACLsManager aclsManager, 1117 LocalDirsHandlerService dirsHandler) { 1118 return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, 1119 metrics, aclsManager, dirsHandler) { 1120 1121 @Override 1122 public void cleanUpApplicationsOnNMShutDown() { 1123 super.cleanUpApplicationsOnNMShutDown(); 1124 numCleanups.incrementAndGet(); 1125 } 1126 }; 1127 } 1128 }; 1129 1130 YarnConfiguration conf = createNMConfig(); 1131 nm.init(conf); 1132 nm.start(); 1133 1134 int waitCount = 0; 1135 while (heartBeatID < 1 && waitCount++ != 200) { 1136 Thread.sleep(500); 1137 } 1138 Assert.assertFalse(heartBeatID < 1); 1139 1140 // Meanwhile call stop directly as the shutdown hook would 1141 nm.stop(); 1142 1143 // NM takes a while to reach the STOPPED state. 1144 waitCount = 0; 1145 while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { 1146 LOG.info("Waiting for NM to stop.."); 1147 Thread.sleep(1000); 1148 } 1149 1150 Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); 1151 Assert.assertEquals(numCleanups.get(), 1); 1152 } 1153 1154 @Test 1155 public void testNodeDecommision() throws Exception { 1156 nm = getNodeManager(NodeAction.SHUTDOWN); 1157 YarnConfiguration conf = createNMConfig(); 1158 nm.init(conf); 1159 Assert.assertEquals(STATE.INITED, nm.getServiceState()); 1160 nm.start(); 1161 1162 int waitCount = 0; 1163 while (heartBeatID < 1 && waitCount++ != 200) { 1164 Thread.sleep(500); 1165 } 1166 Assert.assertFalse(heartBeatID < 1); 1167 Assert.assertTrue(nm.getNMContext().getDecommissioned()); 1168 1169 // NM takes a while to reach the STOPPED state. 1170 waitCount = 0; 1171 while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { 1172 LOG.info("Waiting for NM to stop.."); 1173 Thread.sleep(1000); 1174 } 1175 1176 Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); 1177 } 1178 1179 private abstract class NodeManagerWithCustomNodeStatusUpdater extends NodeManager { 1180 private NodeStatusUpdater updater; 1181 1182 private NodeManagerWithCustomNodeStatusUpdater() { 1183 } 1184 1185 @Override 1186 protected NodeStatusUpdater createNodeStatusUpdater(Context context, 1187 Dispatcher dispatcher, 1188 NodeHealthCheckerService healthChecker) { 1189 updater = createUpdater(context, dispatcher, healthChecker); 1190 return updater; 1191 } 1192 1193 public NodeStatusUpdater getUpdater() { 1194 return updater; 1195 } 1196 1197 abstract NodeStatusUpdater createUpdater(Context context, 1198 Dispatcher dispatcher, 1199 NodeHealthCheckerService healthChecker); 1200 } 1201 1202 @Test 1203 public void testNMShutdownForRegistrationFailure() throws Exception { 1204 1205 nm = new NodeManagerWithCustomNodeStatusUpdater() { 1206 @Override 1207 protected NodeStatusUpdater createUpdater(Context context, 1208 Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { 1209 MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater( 1210 context, dispatcher, healthChecker, metrics); 1211 MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); 1212 myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN; 1213 myResourceTracker2.shutDownMessage = "RM Shutting Down Node"; 1214 nodeStatusUpdater.resourceTracker = myResourceTracker2; 1215 return nodeStatusUpdater; 1216 } 1217 }; 1218 verifyNodeStartFailure( 1219 "Recieved SHUTDOWN signal from Resourcemanager ," 1220 + "Registration of NodeManager failed, " 1221 + "Message from ResourceManager: RM Shutting Down Node"); 1222 } 1223 1224 @Test (timeout = 150000) 1225 public void testNMConnectionToRM() throws Exception { 1226 final long delta = 50000; 1227 final long connectionWaitMs = 5000; 1228 final long connectionRetryIntervalMs = 1000; 1229 //Waiting for rmStartIntervalMS, RM will be started 1230 final long rmStartIntervalMS = 2*1000; 1231 conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, 1232 connectionWaitMs); 1233 conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 1234 connectionRetryIntervalMs); 1235 1236 //Test NM try to connect to RM Several times, but finally fail 1237 NodeManagerWithCustomNodeStatusUpdater nmWithUpdater; 1238 nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() { 1239 @Override 1240 protected NodeStatusUpdater createUpdater(Context context, 1241 Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { 1242 NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( 1243 context, dispatcher, healthChecker, metrics, 1244 rmStartIntervalMS, true); 1245 return nodeStatusUpdater; 1246 } 1247 }; 1248 nm.init(conf); 1249 long waitStartTime = System.currentTimeMillis(); 1250 try { 1251 nm.start(); 1252 Assert.fail("NM should have failed to start due to RM connect failure"); 1253 } catch(Exception e) { 1254 long t = System.currentTimeMillis(); 1255 long duration = t - waitStartTime; 1256 boolean waitTimeValid = (duration >= connectionWaitMs) 1257 && (duration < (connectionWaitMs + delta)); 1258 if(!waitTimeValid) { 1259 //either the exception was too early, or it had a different cause. 1260 //reject with the inner stack trace 1261 throw new Exception("NM should have tried re-connecting to RM during " + 1262 "period of at least " + connectionWaitMs + " ms, but " + 1263 "stopped retrying within " + (connectionWaitMs + delta) + 1264 " ms: " + e, e); 1265 } 1266 } 1267 1268 //Test NM connect to RM, fail at first several attempts, 1269 //but finally success. 1270 nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() { 1271 @Override 1272 protected NodeStatusUpdater createUpdater(Context context, 1273 Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { 1274 NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( 1275 context, dispatcher, healthChecker, metrics, rmStartIntervalMS, 1276 false); 1277 return nodeStatusUpdater; 1278 } 1279 }; 1280 nm.init(conf); 1281 NodeStatusUpdater updater = nmWithUpdater.getUpdater(); 1282 Assert.assertNotNull("Updater not yet created ", updater); 1283 waitStartTime = System.currentTimeMillis(); 1284 try { 1285 nm.start(); 1286 } catch (Exception ex){ 1287 LOG.error("NM should have started successfully " + 1288 "after connecting to RM.", ex); 1289 throw ex; 1290 } 1291 long duration = System.currentTimeMillis() - waitStartTime; 1292 MyNodeStatusUpdater4 myUpdater = (MyNodeStatusUpdater4) updater; 1293 Assert.assertTrue("NM started before updater triggered", 1294 myUpdater.isTriggered()); 1295 Assert.assertTrue("NM should have connected to RM after " 1296 +"the start interval of " + rmStartIntervalMS 1297 +": actual " + duration 1298 + " " + myUpdater, 1299 (duration >= rmStartIntervalMS)); 1300 Assert.assertTrue("NM should have connected to RM less than " 1301 + (rmStartIntervalMS + delta) 1302 +" milliseconds of RM starting up: actual " + duration 1303 + " " + myUpdater, 1304 (duration < (rmStartIntervalMS + delta))); 1305 } 1306 1307 /** 1308 * Verifies that if for some reason NM fails to start ContainerManager RPC 1309 * server, RM is oblivious to NM's presence. The behaviour is like this 1310 * because otherwise, NM will report to RM even if all its servers are not 1311 * started properly, RM will think that the NM is alive and will retire the NM 1312 * only after NM_EXPIRY interval. See MAPREDUCE-2749. 1313 */ 1314 @Test 1315 public void testNoRegistrationWhenNMServicesFail() throws Exception { 1316 1317 nm = new NodeManager() { 1318 @Override 1319 protected NodeStatusUpdater createNodeStatusUpdater(Context context, 1320 Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { 1321 return new MyNodeStatusUpdater(context, dispatcher, healthChecker, 1322 metrics); 1323 } 1324 1325 @Override 1326 protected ContainerManagerImpl createContainerManager(Context context, 1327 ContainerExecutor exec, DeletionService del, 1328 NodeStatusUpdater nodeStatusUpdater, 1329 ApplicationACLsManager aclsManager, 1330 LocalDirsHandlerService diskhandler) { 1331 return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, 1332 metrics, aclsManager, diskhandler) { 1333 @Override 1334 protected void serviceStart() { 1335 // Simulating failure of starting RPC server 1336 throw new YarnRuntimeException("Starting of RPC Server failed"); 1337 } 1338 }; 1339 } 1340 }; 1341 1342 verifyNodeStartFailure("Starting of RPC Server failed"); 1343 } 1344 1345 @Test 1346 public void testApplicationKeepAlive() throws Exception { 1347 MyNodeManager nm = new MyNodeManager(); 1348 try { 1349 YarnConfiguration conf = createNMConfig(); 1350 conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); 1351 conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1352 4000l); 1353 nm.init(conf); 1354 nm.start(); 1355 // HB 2 -> app cancelled by RM. 1356 while (heartBeatID < 12) { 1357 Thread.sleep(1000l); 1358 } 1359 MyResourceTracker3 rt = 1360 (MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient(); 1361 rt.context.getApplications().remove(rt.appId); 1362 Assert.assertEquals(1, rt.keepAliveRequests.size()); 1363 int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size(); 1364 LOG.info("Number of Keep Alive Requests: [" + numKeepAliveRequests + "]"); 1365 Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3); 1366 while (heartBeatID < 20) { 1367 Thread.sleep(1000l); 1368 } 1369 int numKeepAliveRequests2 = rt.keepAliveRequests.get(rt.appId).size(); 1370 Assert.assertEquals(numKeepAliveRequests, numKeepAliveRequests2); 1371 } finally { 1372 if (nm.getServiceState() == STATE.STARTED) 1373 nm.stop(); 1374 } 1375 } 1376 1377 /** 1378 * Test completed containerStatus get back up when heart beat lost, and will 1379 * be sent via next heart beat. 1380 */ 1381 @Test(timeout = 200000) 1382 public void testCompletedContainerStatusBackup() throws Exception { 1383 nm = new NodeManager() { 1384 @Override 1385 protected NodeStatusUpdater createNodeStatusUpdater(Context context, 1386 Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { 1387 MyNodeStatusUpdater2 myNodeStatusUpdater = 1388 new MyNodeStatusUpdater2(context, dispatcher, healthChecker, 1389 metrics); 1390 return myNodeStatusUpdater; 1391 } 1392 1393 @Override 1394 protected NMContext createNMContext( 1395 NMContainerTokenSecretManager containerTokenSecretManager, 1396 NMTokenSecretManagerInNM nmTokenSecretManager, 1397 NMStateStoreService store) { 1398 return new MyNMContext(containerTokenSecretManager, 1399 nmTokenSecretManager); 1400 } 1401 }; 1402 1403 YarnConfiguration conf = createNMConfig(); 1404 nm.init(conf); 1405 nm.start(); 1406 1407 int waitCount = 0; 1408 while (heartBeatID <= 4 && waitCount++ != 20) { 1409 Thread.sleep(500); 1410 } 1411 if (heartBeatID <= 4) { 1412 Assert.fail("Failed to get all heartbeats in time, " + 1413 "heartbeatID:" + heartBeatID); 1414 } 1415 if(assertionFailedInThread.get()) { 1416 Assert.fail("ContainerStatus Backup failed"); 1417 } 1418 Assert.assertNotNull(nm.getNMContext().getSystemCredentialsForApps() 1419 .get(ApplicationId.newInstance(1234, 1)).getToken(new Text("token1"))); 1420 nm.stop(); 1421 } 1422 1423 @Test(timeout = 200000) 1424 public void testNodeStatusUpdaterRetryAndNMShutdown() 1425 throws Exception { 1426 final long connectionWaitSecs = 1000; 1427 final long connectionRetryIntervalMs = 1000; 1428 YarnConfiguration conf = createNMConfig(); 1429 conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, 1430 connectionWaitSecs); 1431 conf.setLong(YarnConfiguration 1432 .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 1433 connectionRetryIntervalMs); 1434 conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000); 1435 conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); 1436 CyclicBarrier syncBarrier = new CyclicBarrier(2); 1437 nm = new MyNodeManager2(syncBarrier, conf); 1438 nm.init(conf); 1439 nm.start(); 1440 // start a container 1441 ContainerId cId = TestNodeManagerShutdown.createContainerId(); 1442 FileContext localFS = FileContext.getLocalFSFileContext(); 1443 TestNodeManagerShutdown.startContainer(nm, cId, localFS, nmLocalDir, 1444 new File("start_file.txt")); 1445 1446 try { 1447 syncBarrier.await(10000, TimeUnit.MILLISECONDS); 1448 } catch (Exception e) { 1449 } 1450 Assert.assertFalse("Containers not cleaned up when NM stopped", 1451 assertionFailedInThread.get()); 1452 Assert.assertTrue(((MyNodeManager2) nm).isStopped); 1453 Assert.assertTrue("calculate heartBeatCount based on" + 1454 " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2); 1455 } 1456 1457 @Test 1458 public void testRMVersionLessThanMinimum() throws InterruptedException { 1459 final AtomicInteger numCleanups = new AtomicInteger(0); 1460 YarnConfiguration conf = createNMConfig(); 1461 conf.set(YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION, "3.0.0"); 1462 nm = new NodeManager() { 1463 @Override 1464 protected NodeStatusUpdater createNodeStatusUpdater(Context context, 1465 Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { 1466 MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( 1467 context, dispatcher, healthChecker, metrics); 1468 MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); 1469 myResourceTracker2.heartBeatNodeAction = NodeAction.NORMAL; 1470 myResourceTracker2.rmVersion = "3.0.0"; 1471 myNodeStatusUpdater.resourceTracker = myResourceTracker2; 1472 return myNodeStatusUpdater; 1473 } 1474 1475 @Override 1476 protected ContainerManagerImpl createContainerManager(Context context, 1477 ContainerExecutor exec, DeletionService del, 1478 NodeStatusUpdater nodeStatusUpdater, 1479 ApplicationACLsManager aclsManager, 1480 LocalDirsHandlerService dirsHandler) { 1481 return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, 1482 metrics, aclsManager, dirsHandler) { 1483 1484 @Override 1485 public void cleanUpApplicationsOnNMShutDown() { 1486 super.cleanUpApplicationsOnNMShutDown(); 1487 numCleanups.incrementAndGet(); 1488 } 1489 }; 1490 } 1491 }; 1492 1493 nm.init(conf); 1494 nm.start(); 1495 1496 // NM takes a while to reach the STARTED state. 1497 int waitCount = 0; 1498 while (nm.getServiceState() != STATE.STARTED && waitCount++ != 20) { 1499 LOG.info("Waiting for NM to stop.."); 1500 Thread.sleep(1000); 1501 } 1502 Assert.assertTrue(nm.getServiceState() == STATE.STARTED); 1503 nm.stop(); 1504 } 1505 1506 @Test 1507 public void testConcurrentAccessToSystemCredentials(){ 1508 final Map<ApplicationId, ByteBuffer> testCredentials = new HashMap<>(); 1509 ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[300]); 1510 ApplicationId applicationId = ApplicationId.newInstance(123456, 120); 1511 testCredentials.put(applicationId, byteBuffer); 1512 1513 final List<Throwable> exceptions = Collections.synchronizedList(new 1514 ArrayList<Throwable>()); 1515 1516 final int NUM_THREADS = 10; 1517 final CountDownLatch allDone = new CountDownLatch(NUM_THREADS); 1518 final ExecutorService threadPool = Executors.newFixedThreadPool( 1519 NUM_THREADS); 1520 1521 final AtomicBoolean stop = new AtomicBoolean(false); 1522 1523 try { 1524 for (int i = 0; i < NUM_THREADS; i++) { 1525 threadPool.submit(new Runnable() { 1526 @Override 1527 public void run() { 1528 try { 1529 for (int i = 0; i < 100 && !stop.get(); i++) { 1530 NodeHeartbeatResponse nodeHeartBeatResponse = 1531 newNodeHeartbeatResponse(0, NodeAction.NORMAL, 1532 null, null, null, null, 0); 1533 nodeHeartBeatResponse.setSystemCredentialsForApps( 1534 testCredentials); 1535 NodeHeartbeatResponseProto proto = 1536 ((NodeHeartbeatResponsePBImpl)nodeHeartBeatResponse) 1537 .getProto(); 1538 Assert.assertNotNull(proto); 1539 } 1540 } catch (Throwable t) { 1541 exceptions.add(t); 1542 stop.set(true); 1543 } finally { 1544 allDone.countDown(); 1545 } 1546 } 1547 }); 1548 } 1549 1550 int testTimeout = 2; 1551 Assert.assertTrue("Timeout waiting for more than " + testTimeout + " " + 1552 "seconds", 1553 allDone.await(testTimeout, TimeUnit.SECONDS)); 1554 } catch (InterruptedException ie) { 1555 exceptions.add(ie); 1556 } finally { 1557 threadPool.shutdownNow(); 1558 } 1559 Assert.assertTrue("Test failed with exception(s)" + exceptions, 1560 exceptions.isEmpty()); 1561 } 1562 1563 // Add new containers info into NM context each time node heart beats. 1564 private class MyNMContext extends NMContext { 1565 1566 public MyNMContext( 1567 NMContainerTokenSecretManager containerTokenSecretManager, 1568 NMTokenSecretManagerInNM nmTokenSecretManager) { 1569 super(containerTokenSecretManager, nmTokenSecretManager, null, null, 1570 new NMNullStateStoreService()); 1571 } 1572 1573 @Override 1574 public ConcurrentMap<ContainerId, Container> getContainers() { 1575 if (heartBeatID == 0) { 1576 return containers; 1577 } else if (heartBeatID == 1) { 1578 ContainerStatus containerStatus2 = 1579 createContainerStatus(2, ContainerState.RUNNING); 1580 putMockContainer(containerStatus2); 1581 1582 ContainerStatus containerStatus3 = 1583 createContainerStatus(3, ContainerState.COMPLETE); 1584 putMockContainer(containerStatus3); 1585 return containers; 1586 } else if (heartBeatID == 2) { 1587 ContainerStatus containerStatus4 = 1588 createContainerStatus(4, ContainerState.RUNNING); 1589 putMockContainer(containerStatus4); 1590 1591 ContainerStatus containerStatus5 = 1592 createContainerStatus(5, ContainerState.COMPLETE); 1593 putMockContainer(containerStatus5); 1594 return containers; 1595 } else if (heartBeatID == 3 || heartBeatID == 4) { 1596 return containers; 1597 } else { 1598 containers.clear(); 1599 return containers; 1600 } 1601 } 1602 1603 private void putMockContainer(ContainerStatus containerStatus) { 1604 Container container = getMockContainer(containerStatus); 1605 containers.put(containerStatus.getContainerId(), container); 1606 applications.putIfAbsent(containerStatus.getContainerId() 1607 .getApplicationAttemptId().getApplicationId(), 1608 mock(Application.class)); 1609 } 1610 } 1611 1612 public static ContainerStatus createContainerStatus(int id, 1613 ContainerState containerState) { 1614 ApplicationId applicationId = ApplicationId.newInstance(0, 1); 1615 ApplicationAttemptId applicationAttemptId = 1616 ApplicationAttemptId.newInstance(applicationId, 1); 1617 ContainerId contaierId = ContainerId.newContainerId(applicationAttemptId, id); 1618 ContainerStatus containerStatus = 1619 BuilderUtils.newContainerStatus(contaierId, containerState, 1620 "test_containerStatus: id=" + id + ", containerState: " 1621 + containerState, 0); 1622 return containerStatus; 1623 } 1624 1625 public static Container getMockContainer(ContainerStatus containerStatus) { 1626 ContainerImpl container = mock(ContainerImpl.class); 1627 when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus); 1628 when(container.getCurrentState()).thenReturn(containerStatus.getState()); 1629 when(container.getContainerId()).thenReturn( 1630 containerStatus.getContainerId()); 1631 if (containerStatus.getState().equals(ContainerState.COMPLETE)) { 1632 when(container.getContainerState()) 1633 .thenReturn(org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE); 1634 } else if (containerStatus.getState().equals(ContainerState.RUNNING)) { 1635 when(container.getContainerState()) 1636 .thenReturn(org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.RUNNING); 1637 } 1638 return container; 1639 } 1640 1641 private void verifyNodeStartFailure(String errMessage) throws Exception { 1642 Assert.assertNotNull("nm is null", nm); 1643 YarnConfiguration conf = createNMConfig(); 1644 nm.init(conf); 1645 try { 1646 nm.start(); 1647 Assert.fail("NM should have failed to start. Didn't get exception!!"); 1648 } catch (Exception e) { 1649 //the version in trunk looked in the cause for equality 1650 // and assumed failures were nested. 1651 //this version assumes that error strings propagate to the base and 1652 //use a contains() test only. It should be less brittle 1653 if(!e.getMessage().contains(errMessage)) { 1654 throw e; 1655 } 1656 } 1657 1658 // the service should be stopped 1659 Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm 1660 .getServiceState()); 1661 1662 Assert.assertEquals("Number of registered nodes is wrong!", 0, 1663 this.registeredNodes.size()); 1664 } 1665 1666 private YarnConfiguration createNMConfig() { 1667 YarnConfiguration conf = new YarnConfiguration(); 1668 String localhostAddress = null; 1669 try { 1670 localhostAddress = InetAddress.getByName("localhost").getCanonicalHostName(); 1671 } catch (UnknownHostException e) { 1672 Assert.fail("Unable to get localhost address: " + e.getMessage()); 1673 } 1674 conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB 1675 conf.set(YarnConfiguration.NM_ADDRESS, localhostAddress + ":12345"); 1676 conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346"); 1677 conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); 1678 conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, 1679 remoteLogsDir.getAbsolutePath()); 1680 conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath()); 1681 conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); 1682 return conf; 1683 } 1684 1685 private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) { 1686 return new NodeManager() { 1687 @Override 1688 protected NodeStatusUpdater createNodeStatusUpdater(Context context, 1689 Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { 1690 MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater( 1691 context, dispatcher, healthChecker, metrics); 1692 MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2(); 1693 myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction; 1694 myNodeStatusUpdater.resourceTracker = myResourceTracker2; 1695 return myNodeStatusUpdater; 1696 } 1697 }; 1698 } 1699 } 1700