1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.resourcemanager.rmnode; 20 21 import java.util.ArrayList; 22 import java.util.EnumSet; 23 import java.util.HashSet; 24 import java.util.List; 25 import java.util.Set; 26 import java.util.TreeSet; 27 import java.util.concurrent.ConcurrentLinkedQueue; 28 import java.util.concurrent.locks.ReentrantReadWriteLock; 29 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; 30 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; 31 32 import org.apache.commons.logging.Log; 33 import org.apache.commons.logging.LogFactory; 34 import org.apache.hadoop.classification.InterfaceAudience.Private; 35 import org.apache.hadoop.classification.InterfaceStability.Unstable; 36 import org.apache.hadoop.net.Node; 37 import org.apache.hadoop.security.UserGroupInformation; 38 import org.apache.hadoop.yarn.api.records.ApplicationId; 39 import org.apache.hadoop.yarn.api.records.ContainerId; 40 import org.apache.hadoop.yarn.api.records.ContainerState; 41 import org.apache.hadoop.yarn.api.records.ContainerStatus; 42 import org.apache.hadoop.yarn.api.records.NodeId; 43 import org.apache.hadoop.yarn.api.records.NodeState; 44 import org.apache.hadoop.yarn.api.records.Resource; 45 import org.apache.hadoop.yarn.api.records.ResourceOption; 46 import org.apache.hadoop.yarn.event.EventHandler; 47 import org.apache.hadoop.yarn.factories.RecordFactory; 48 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; 49 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; 50 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; 51 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; 52 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; 53 import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; 54 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; 55 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; 56 import org.apache.hadoop.yarn.server.resourcemanager.RMContext; 57 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; 58 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; 59 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; 60 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; 61 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; 62 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; 63 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; 64 import org.apache.hadoop.yarn.server.utils.BuilderUtils.ContainerIdComparator; 65 import org.apache.hadoop.yarn.state.InvalidStateTransitonException; 66 import org.apache.hadoop.yarn.state.MultipleArcTransition; 67 import org.apache.hadoop.yarn.state.SingleArcTransition; 68 import org.apache.hadoop.yarn.state.StateMachine; 69 import org.apache.hadoop.yarn.state.StateMachineFactory; 70 71 import com.google.common.annotations.VisibleForTesting; 72 73 /** 74 * This class is used to keep track of all the applications/containers 75 * running on a node. 76 * 77 */ 78 @Private 79 @Unstable 80 @SuppressWarnings("unchecked") 81 public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { 82 83 private static final Log LOG = LogFactory.getLog(RMNodeImpl.class); 84 85 private static final RecordFactory recordFactory = RecordFactoryProvider 86 .getRecordFactory(null); 87 88 private final ReadLock readLock; 89 private final WriteLock writeLock; 90 91 private final ConcurrentLinkedQueue<UpdatedContainerInfo> nodeUpdateQueue; 92 private volatile boolean nextHeartBeat = true; 93 94 private final NodeId nodeId; 95 private final RMContext context; 96 private final String hostName; 97 private final int commandPort; 98 private int httpPort; 99 private final String nodeAddress; // The containerManager address 100 private String httpAddress; 101 private volatile Resource totalCapability; 102 private final Node node; 103 104 private String healthReport; 105 private long lastHealthReportTime; 106 private String nodeManagerVersion; 107 108 /* set of containers that have just launched */ 109 private final Set<ContainerId> launchedContainers = 110 new HashSet<ContainerId>(); 111 112 /* set of containers that need to be cleaned */ 113 private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>( 114 new ContainerIdComparator()); 115 116 /* 117 * set of containers to notify NM to remove them from its context. Currently, 118 * this includes containers that were notified to AM about their completion 119 */ 120 private final Set<ContainerId> containersToBeRemovedFromNM = 121 new HashSet<ContainerId>(); 122 123 /* the list of applications that have finished and need to be purged */ 124 private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>(); 125 126 private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory 127 .newRecordInstance(NodeHeartbeatResponse.class); 128 129 private static final StateMachineFactory<RMNodeImpl, 130 NodeState, 131 RMNodeEventType, 132 RMNodeEvent> stateMachineFactory 133 = new StateMachineFactory<RMNodeImpl, 134 NodeState, 135 RMNodeEventType, 136 RMNodeEvent>(NodeState.NEW) 137 138 //Transitions from NEW state 139 .addTransition(NodeState.NEW, NodeState.RUNNING, 140 RMNodeEventType.STARTED, new AddNodeTransition()) 141 .addTransition(NodeState.NEW, NodeState.NEW, 142 RMNodeEventType.RESOURCE_UPDATE, 143 new UpdateNodeResourceWhenUnusableTransition()) 144 145 //Transitions from RUNNING state 146 .addTransition(NodeState.RUNNING, 147 EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), 148 RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) 149 .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED, 150 RMNodeEventType.DECOMMISSION, 151 new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) 152 .addTransition(NodeState.RUNNING, NodeState.LOST, 153 RMNodeEventType.EXPIRE, 154 new DeactivateNodeTransition(NodeState.LOST)) 155 .addTransition(NodeState.RUNNING, NodeState.REBOOTED, 156 RMNodeEventType.REBOOTING, 157 new DeactivateNodeTransition(NodeState.REBOOTED)) 158 .addTransition(NodeState.RUNNING, NodeState.RUNNING, 159 RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) 160 .addTransition(NodeState.RUNNING, NodeState.RUNNING, 161 RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) 162 .addTransition(NodeState.RUNNING, NodeState.RUNNING, 163 RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, 164 new AddContainersToBeRemovedFromNMTransition()) 165 .addTransition(NodeState.RUNNING, NodeState.RUNNING, 166 RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) 167 .addTransition(NodeState.RUNNING, NodeState.RUNNING, 168 RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition()) 169 170 //Transitions from REBOOTED state 171 .addTransition(NodeState.REBOOTED, NodeState.REBOOTED, 172 RMNodeEventType.RESOURCE_UPDATE, 173 new UpdateNodeResourceWhenUnusableTransition()) 174 175 //Transitions from DECOMMISSIONED state 176 .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, 177 RMNodeEventType.RESOURCE_UPDATE, 178 new UpdateNodeResourceWhenUnusableTransition()) 179 .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, 180 RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, 181 new AddContainersToBeRemovedFromNMTransition()) 182 183 //Transitions from LOST state 184 .addTransition(NodeState.LOST, NodeState.LOST, 185 RMNodeEventType.RESOURCE_UPDATE, 186 new UpdateNodeResourceWhenUnusableTransition()) 187 .addTransition(NodeState.LOST, NodeState.LOST, 188 RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, 189 new AddContainersToBeRemovedFromNMTransition()) 190 191 //Transitions from UNHEALTHY state 192 .addTransition(NodeState.UNHEALTHY, 193 EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING), 194 RMNodeEventType.STATUS_UPDATE, 195 new StatusUpdateWhenUnHealthyTransition()) 196 .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED, 197 RMNodeEventType.DECOMMISSION, 198 new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) 199 .addTransition(NodeState.UNHEALTHY, NodeState.LOST, 200 RMNodeEventType.EXPIRE, 201 new DeactivateNodeTransition(NodeState.LOST)) 202 .addTransition(NodeState.UNHEALTHY, NodeState.REBOOTED, 203 RMNodeEventType.REBOOTING, 204 new DeactivateNodeTransition(NodeState.REBOOTED)) 205 .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, 206 RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) 207 .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, 208 RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) 209 .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, 210 RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) 211 .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, 212 RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) 213 .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, 214 RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, 215 new AddContainersToBeRemovedFromNMTransition()) 216 217 // create the topology tables 218 .installTopology(); 219 220 private final StateMachine<NodeState, RMNodeEventType, 221 RMNodeEvent> stateMachine; 222 RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion)223 public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, 224 int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { 225 this.nodeId = nodeId; 226 this.context = context; 227 this.hostName = hostName; 228 this.commandPort = cmPort; 229 this.httpPort = httpPort; 230 this.totalCapability = capability; 231 this.nodeAddress = hostName + ":" + cmPort; 232 this.httpAddress = hostName + ":" + httpPort; 233 this.node = node; 234 this.healthReport = "Healthy"; 235 this.lastHealthReportTime = System.currentTimeMillis(); 236 this.nodeManagerVersion = nodeManagerVersion; 237 238 this.latestNodeHeartBeatResponse.setResponseId(0); 239 240 ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 241 this.readLock = lock.readLock(); 242 this.writeLock = lock.writeLock(); 243 244 this.stateMachine = stateMachineFactory.make(this); 245 246 this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>(); 247 } 248 249 @Override toString()250 public String toString() { 251 return this.nodeId.toString(); 252 } 253 254 @Override getHostName()255 public String getHostName() { 256 return hostName; 257 } 258 259 @Override getCommandPort()260 public int getCommandPort() { 261 return commandPort; 262 } 263 264 @Override getHttpPort()265 public int getHttpPort() { 266 return httpPort; 267 } 268 269 @Override getNodeID()270 public NodeId getNodeID() { 271 return this.nodeId; 272 } 273 274 @Override getNodeAddress()275 public String getNodeAddress() { 276 return this.nodeAddress; 277 } 278 279 @Override getHttpAddress()280 public String getHttpAddress() { 281 return this.httpAddress; 282 } 283 284 @Override getTotalCapability()285 public Resource getTotalCapability() { 286 return this.totalCapability; 287 } 288 289 @Override getRackName()290 public String getRackName() { 291 return node.getNetworkLocation(); 292 } 293 294 @Override getNode()295 public Node getNode() { 296 return this.node; 297 } 298 299 @Override getHealthReport()300 public String getHealthReport() { 301 this.readLock.lock(); 302 303 try { 304 return this.healthReport; 305 } finally { 306 this.readLock.unlock(); 307 } 308 } 309 setHealthReport(String healthReport)310 public void setHealthReport(String healthReport) { 311 this.writeLock.lock(); 312 313 try { 314 this.healthReport = healthReport; 315 } finally { 316 this.writeLock.unlock(); 317 } 318 } 319 setLastHealthReportTime(long lastHealthReportTime)320 public void setLastHealthReportTime(long lastHealthReportTime) { 321 this.writeLock.lock(); 322 323 try { 324 this.lastHealthReportTime = lastHealthReportTime; 325 } finally { 326 this.writeLock.unlock(); 327 } 328 } 329 330 @Override getLastHealthReportTime()331 public long getLastHealthReportTime() { 332 this.readLock.lock(); 333 334 try { 335 return this.lastHealthReportTime; 336 } finally { 337 this.readLock.unlock(); 338 } 339 } 340 341 @Override getNodeManagerVersion()342 public String getNodeManagerVersion() { 343 return nodeManagerVersion; 344 } 345 346 @Override getState()347 public NodeState getState() { 348 this.readLock.lock(); 349 350 try { 351 return this.stateMachine.getCurrentState(); 352 } finally { 353 this.readLock.unlock(); 354 } 355 } 356 357 @Override getAppsToCleanup()358 public List<ApplicationId> getAppsToCleanup() { 359 this.readLock.lock(); 360 361 try { 362 return new ArrayList<ApplicationId>(this.finishedApplications); 363 } finally { 364 this.readLock.unlock(); 365 } 366 367 } 368 369 @Override getContainersToCleanUp()370 public List<ContainerId> getContainersToCleanUp() { 371 372 this.readLock.lock(); 373 374 try { 375 return new ArrayList<ContainerId>(this.containersToClean); 376 } finally { 377 this.readLock.unlock(); 378 } 379 }; 380 381 @Override updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response)382 public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) { 383 this.writeLock.lock(); 384 385 try { 386 response.addAllContainersToCleanup( 387 new ArrayList<ContainerId>(this.containersToClean)); 388 response.addAllApplicationsToCleanup(this.finishedApplications); 389 response.addContainersToBeRemovedFromNM( 390 new ArrayList<ContainerId>(this.containersToBeRemovedFromNM)); 391 this.containersToClean.clear(); 392 this.finishedApplications.clear(); 393 this.containersToBeRemovedFromNM.clear(); 394 } finally { 395 this.writeLock.unlock(); 396 } 397 }; 398 399 @Override getLastNodeHeartBeatResponse()400 public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { 401 402 this.readLock.lock(); 403 404 try { 405 return this.latestNodeHeartBeatResponse; 406 } finally { 407 this.readLock.unlock(); 408 } 409 } 410 411 @Override resetLastNodeHeartBeatResponse()412 public void resetLastNodeHeartBeatResponse() { 413 this.writeLock.lock(); 414 try { 415 latestNodeHeartBeatResponse.setResponseId(0); 416 } finally { 417 this.writeLock.unlock(); 418 } 419 } 420 handle(RMNodeEvent event)421 public void handle(RMNodeEvent event) { 422 LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType()); 423 try { 424 writeLock.lock(); 425 NodeState oldState = getState(); 426 try { 427 stateMachine.doTransition(event.getType(), event); 428 } catch (InvalidStateTransitonException e) { 429 LOG.error("Can't handle this event at current state", e); 430 LOG.error("Invalid event " + event.getType() + 431 " on Node " + this.nodeId); 432 } 433 if (oldState != getState()) { 434 LOG.info(nodeId + " Node Transitioned from " + oldState + " to " 435 + getState()); 436 } 437 } 438 439 finally { 440 writeLock.unlock(); 441 } 442 } 443 updateMetricsForRejoinedNode(NodeState previousNodeState)444 private void updateMetricsForRejoinedNode(NodeState previousNodeState) { 445 ClusterMetrics metrics = ClusterMetrics.getMetrics(); 446 metrics.incrNumActiveNodes(); 447 448 switch (previousNodeState) { 449 case LOST: 450 metrics.decrNumLostNMs(); 451 break; 452 case REBOOTED: 453 metrics.decrNumRebootedNMs(); 454 break; 455 case DECOMMISSIONED: 456 metrics.decrDecommisionedNMs(); 457 break; 458 case UNHEALTHY: 459 metrics.decrNumUnhealthyNMs(); 460 break; 461 default: 462 LOG.debug("Unexpected previous node state"); 463 } 464 } 465 updateMetricsForDeactivatedNode(NodeState initialState, NodeState finalState)466 private void updateMetricsForDeactivatedNode(NodeState initialState, 467 NodeState finalState) { 468 ClusterMetrics metrics = ClusterMetrics.getMetrics(); 469 470 switch (initialState) { 471 case RUNNING: 472 metrics.decrNumActiveNodes(); 473 break; 474 case UNHEALTHY: 475 metrics.decrNumUnhealthyNMs(); 476 break; 477 default: 478 LOG.debug("Unexpected inital state"); 479 } 480 481 switch (finalState) { 482 case DECOMMISSIONED: 483 metrics.incrDecommisionedNMs(); 484 break; 485 case LOST: 486 metrics.incrNumLostNMs(); 487 break; 488 case REBOOTED: 489 metrics.incrNumRebootedNMs(); 490 break; 491 case UNHEALTHY: 492 metrics.incrNumUnhealthyNMs(); 493 break; 494 default: 495 LOG.debug("Unexpected final state"); 496 } 497 } 498 handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context, ApplicationId appId, NodeId nodeId)499 private static void handleRunningAppOnNode(RMNodeImpl rmNode, 500 RMContext context, ApplicationId appId, NodeId nodeId) { 501 RMApp app = context.getRMApps().get(appId); 502 503 // if we failed getting app by appId, maybe something wrong happened, just 504 // add the app to the finishedApplications list so that the app can be 505 // cleaned up on the NM 506 if (null == app) { 507 LOG.warn("Cannot get RMApp by appId=" + appId 508 + ", just added it to finishedApplications list for cleanup"); 509 rmNode.finishedApplications.add(appId); 510 return; 511 } 512 513 context.getDispatcher().getEventHandler() 514 .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); 515 } 516 updateNodeResourceFromEvent(RMNodeImpl rmNode, RMNodeResourceUpdateEvent event)517 private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, 518 RMNodeResourceUpdateEvent event){ 519 ResourceOption resourceOption = event.getResourceOption(); 520 // Set resource on RMNode 521 rmNode.totalCapability = resourceOption.getResource(); 522 } 523 524 public static class AddNodeTransition implements 525 SingleArcTransition<RMNodeImpl, RMNodeEvent> { 526 527 @Override transition(RMNodeImpl rmNode, RMNodeEvent event)528 public void transition(RMNodeImpl rmNode, RMNodeEvent event) { 529 // Inform the scheduler 530 RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; 531 List<NMContainerStatus> containers = null; 532 533 String host = rmNode.nodeId.getHost(); 534 if (rmNode.context.getInactiveRMNodes().containsKey(host)) { 535 // Old node rejoining 536 RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(host); 537 rmNode.context.getInactiveRMNodes().remove(host); 538 rmNode.updateMetricsForRejoinedNode(previouRMNode.getState()); 539 } else { 540 // Increment activeNodes explicitly because this is a new node. 541 ClusterMetrics.getMetrics().incrNumActiveNodes(); 542 containers = startEvent.getNMContainerStatuses(); 543 if (containers != null && !containers.isEmpty()) { 544 for (NMContainerStatus container : containers) { 545 if (container.getContainerState() == ContainerState.RUNNING) { 546 rmNode.launchedContainers.add(container.getContainerId()); 547 } 548 } 549 } 550 } 551 552 if (null != startEvent.getRunningApplications()) { 553 for (ApplicationId appId : startEvent.getRunningApplications()) { 554 handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); 555 } 556 } 557 558 rmNode.context.getDispatcher().getEventHandler() 559 .handle(new NodeAddedSchedulerEvent(rmNode, containers)); 560 rmNode.context.getDispatcher().getEventHandler().handle( 561 new NodesListManagerEvent( 562 NodesListManagerEventType.NODE_USABLE, rmNode)); 563 } 564 } 565 566 public static class ReconnectNodeTransition implements 567 SingleArcTransition<RMNodeImpl, RMNodeEvent> { 568 569 @Override transition(RMNodeImpl rmNode, RMNodeEvent event)570 public void transition(RMNodeImpl rmNode, RMNodeEvent event) { 571 RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event; 572 RMNode newNode = reconnectEvent.getReconnectedNode(); 573 rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); 574 List<ApplicationId> runningApps = reconnectEvent.getRunningApplications(); 575 boolean noRunningApps = 576 (runningApps == null) || (runningApps.size() == 0); 577 578 // No application running on the node, so send node-removal event with 579 // cleaning up old container info. 580 if (noRunningApps) { 581 rmNode.nodeUpdateQueue.clear(); 582 rmNode.context.getDispatcher().getEventHandler().handle( 583 new NodeRemovedSchedulerEvent(rmNode)); 584 585 if (rmNode.getHttpPort() == newNode.getHttpPort()) { 586 if (!rmNode.getTotalCapability().equals( 587 newNode.getTotalCapability())) { 588 rmNode.totalCapability = newNode.getTotalCapability(); 589 } 590 if (rmNode.getState().equals(NodeState.RUNNING)) { 591 // Only add old node if old state is RUNNING 592 rmNode.context.getDispatcher().getEventHandler().handle( 593 new NodeAddedSchedulerEvent(rmNode)); 594 } 595 } else { 596 // Reconnected node differs, so replace old node and start new node 597 switch (rmNode.getState()) { 598 case RUNNING: 599 ClusterMetrics.getMetrics().decrNumActiveNodes(); 600 break; 601 case UNHEALTHY: 602 ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); 603 break; 604 default: 605 LOG.debug("Unexpected Rmnode state"); 606 } 607 rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); 608 rmNode.context.getDispatcher().getEventHandler().handle( 609 new RMNodeStartedEvent(newNode.getNodeID(), null, null)); 610 } 611 } else { 612 rmNode.httpPort = newNode.getHttpPort(); 613 rmNode.httpAddress = newNode.getHttpAddress(); 614 boolean isCapabilityChanged = false; 615 if (!rmNode.getTotalCapability().equals( 616 newNode.getTotalCapability())) { 617 rmNode.totalCapability = newNode.getTotalCapability(); 618 isCapabilityChanged = true; 619 } 620 621 handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode); 622 623 for (ApplicationId appId : reconnectEvent.getRunningApplications()) { 624 handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); 625 } 626 627 if (isCapabilityChanged 628 && rmNode.getState().equals(NodeState.RUNNING)) { 629 // Update scheduler node's capacity for reconnect node. 630 rmNode.context 631 .getDispatcher() 632 .getEventHandler() 633 .handle( 634 new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption 635 .newInstance(newNode.getTotalCapability(), -1))); 636 } 637 } 638 } 639 handleNMContainerStatus( List<NMContainerStatus> nmContainerStatuses, RMNodeImpl rmnode)640 private void handleNMContainerStatus( 641 List<NMContainerStatus> nmContainerStatuses, RMNodeImpl rmnode) { 642 List<ContainerStatus> containerStatuses = 643 new ArrayList<ContainerStatus>(); 644 for (NMContainerStatus nmContainerStatus : nmContainerStatuses) { 645 containerStatuses.add(createContainerStatus(nmContainerStatus)); 646 } 647 rmnode.handleContainerStatus(containerStatuses); 648 } 649 createContainerStatus( NMContainerStatus remoteContainer)650 private ContainerStatus createContainerStatus( 651 NMContainerStatus remoteContainer) { 652 ContainerStatus cStatus = 653 ContainerStatus.newInstance(remoteContainer.getContainerId(), 654 remoteContainer.getContainerState(), 655 remoteContainer.getDiagnostics(), 656 remoteContainer.getContainerExitStatus()); 657 return cStatus; 658 } 659 } 660 661 public static class UpdateNodeResourceWhenRunningTransition 662 implements SingleArcTransition<RMNodeImpl, RMNodeEvent> { 663 664 @Override transition(RMNodeImpl rmNode, RMNodeEvent event)665 public void transition(RMNodeImpl rmNode, RMNodeEvent event) { 666 RMNodeResourceUpdateEvent updateEvent = (RMNodeResourceUpdateEvent)event; 667 updateNodeResourceFromEvent(rmNode, updateEvent); 668 // Notify new resourceOption to scheduler 669 rmNode.context.getDispatcher().getEventHandler().handle( 670 new NodeResourceUpdateSchedulerEvent(rmNode, updateEvent.getResourceOption())); 671 } 672 } 673 674 public static class UpdateNodeResourceWhenUnusableTransition 675 implements SingleArcTransition<RMNodeImpl, RMNodeEvent> { 676 677 @Override transition(RMNodeImpl rmNode, RMNodeEvent event)678 public void transition(RMNodeImpl rmNode, RMNodeEvent event) { 679 // The node is not usable, only log a warn message 680 LOG.warn("Try to update resource on a "+ rmNode.getState().toString() + 681 " node: "+rmNode.toString()); 682 updateNodeResourceFromEvent(rmNode, (RMNodeResourceUpdateEvent)event); 683 // No need to notify scheduler as schedulerNode is not function now 684 // and can sync later from RMnode. 685 } 686 } 687 688 public static class CleanUpAppTransition 689 implements SingleArcTransition<RMNodeImpl, RMNodeEvent> { 690 691 @Override transition(RMNodeImpl rmNode, RMNodeEvent event)692 public void transition(RMNodeImpl rmNode, RMNodeEvent event) { 693 rmNode.finishedApplications.add((( 694 RMNodeCleanAppEvent) event).getAppId()); 695 } 696 } 697 698 public static class CleanUpContainerTransition implements 699 SingleArcTransition<RMNodeImpl, RMNodeEvent> { 700 701 @Override transition(RMNodeImpl rmNode, RMNodeEvent event)702 public void transition(RMNodeImpl rmNode, RMNodeEvent event) { 703 rmNode.containersToClean.add((( 704 RMNodeCleanContainerEvent) event).getContainerId()); 705 } 706 } 707 708 public static class AddContainersToBeRemovedFromNMTransition implements 709 SingleArcTransition<RMNodeImpl, RMNodeEvent> { 710 711 @Override transition(RMNodeImpl rmNode, RMNodeEvent event)712 public void transition(RMNodeImpl rmNode, RMNodeEvent event) { 713 rmNode.containersToBeRemovedFromNM.addAll((( 714 RMNodeFinishedContainersPulledByAMEvent) event).getContainers()); 715 } 716 } 717 718 public static class DeactivateNodeTransition 719 implements SingleArcTransition<RMNodeImpl, RMNodeEvent> { 720 721 private final NodeState finalState; DeactivateNodeTransition(NodeState finalState)722 public DeactivateNodeTransition(NodeState finalState) { 723 this.finalState = finalState; 724 } 725 726 @Override transition(RMNodeImpl rmNode, RMNodeEvent event)727 public void transition(RMNodeImpl rmNode, RMNodeEvent event) { 728 // Inform the scheduler 729 rmNode.nodeUpdateQueue.clear(); 730 // If the current state is NodeState.UNHEALTHY 731 // Then node is already been removed from the 732 // Scheduler 733 NodeState initialState = rmNode.getState(); 734 if (!initialState.equals(NodeState.UNHEALTHY)) { 735 rmNode.context.getDispatcher().getEventHandler() 736 .handle(new NodeRemovedSchedulerEvent(rmNode)); 737 } 738 rmNode.context.getDispatcher().getEventHandler().handle( 739 new NodesListManagerEvent( 740 NodesListManagerEventType.NODE_UNUSABLE, rmNode)); 741 742 // Deactivate the node 743 rmNode.context.getRMNodes().remove(rmNode.nodeId); 744 LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now " 745 + finalState); 746 rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode); 747 748 //Update the metrics 749 rmNode.updateMetricsForDeactivatedNode(initialState, finalState); 750 } 751 } 752 753 public static class StatusUpdateWhenHealthyTransition implements 754 MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> { 755 @Override transition(RMNodeImpl rmNode, RMNodeEvent event)756 public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { 757 758 RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event; 759 760 // Switch the last heartbeatresponse. 761 rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse(); 762 763 NodeHealthStatus remoteNodeHealthStatus = 764 statusEvent.getNodeHealthStatus(); 765 rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); 766 rmNode.setLastHealthReportTime( 767 remoteNodeHealthStatus.getLastHealthReportTime()); 768 if (!remoteNodeHealthStatus.getIsNodeHealthy()) { 769 LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: " 770 + remoteNodeHealthStatus.getHealthReport()); 771 rmNode.nodeUpdateQueue.clear(); 772 // Inform the scheduler 773 rmNode.context.getDispatcher().getEventHandler().handle( 774 new NodeRemovedSchedulerEvent(rmNode)); 775 rmNode.context.getDispatcher().getEventHandler().handle( 776 new NodesListManagerEvent( 777 NodesListManagerEventType.NODE_UNUSABLE, rmNode)); 778 // Update metrics 779 rmNode.updateMetricsForDeactivatedNode(rmNode.getState(), 780 NodeState.UNHEALTHY); 781 return NodeState.UNHEALTHY; 782 } 783 784 rmNode.handleContainerStatus(statusEvent.getContainers()); 785 786 if(rmNode.nextHeartBeat) { 787 rmNode.nextHeartBeat = false; 788 rmNode.context.getDispatcher().getEventHandler().handle( 789 new NodeUpdateSchedulerEvent(rmNode)); 790 } 791 792 // Update DTRenewer in secure mode to keep these apps alive. Today this is 793 // needed for log-aggregation to finish long after the apps are gone. 794 if (UserGroupInformation.isSecurityEnabled()) { 795 rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications( 796 statusEvent.getKeepAliveAppIds()); 797 } 798 799 return NodeState.RUNNING; 800 } 801 } 802 803 public static class StatusUpdateWhenUnHealthyTransition implements 804 MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> { 805 806 @Override transition(RMNodeImpl rmNode, RMNodeEvent event)807 public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { 808 RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event; 809 810 // Switch the last heartbeatresponse. 811 rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse(); 812 NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); 813 rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); 814 rmNode.setLastHealthReportTime( 815 remoteNodeHealthStatus.getLastHealthReportTime()); 816 if (remoteNodeHealthStatus.getIsNodeHealthy()) { 817 rmNode.context.getDispatcher().getEventHandler().handle( 818 new NodeAddedSchedulerEvent(rmNode)); 819 rmNode.context.getDispatcher().getEventHandler().handle( 820 new NodesListManagerEvent( 821 NodesListManagerEventType.NODE_USABLE, rmNode)); 822 // ??? how about updating metrics before notifying to ensure that 823 // notifiers get update metadata because they will very likely query it 824 // upon notification 825 // Update metrics 826 rmNode.updateMetricsForRejoinedNode(NodeState.UNHEALTHY); 827 return NodeState.RUNNING; 828 } 829 830 return NodeState.UNHEALTHY; 831 } 832 } 833 834 @Override pullContainerUpdates()835 public List<UpdatedContainerInfo> pullContainerUpdates() { 836 List<UpdatedContainerInfo> latestContainerInfoList = 837 new ArrayList<UpdatedContainerInfo>(); 838 UpdatedContainerInfo containerInfo; 839 while ((containerInfo = nodeUpdateQueue.poll()) != null) { 840 latestContainerInfoList.add(containerInfo); 841 } 842 this.nextHeartBeat = true; 843 return latestContainerInfoList; 844 } 845 846 @VisibleForTesting setNextHeartBeat(boolean nextHeartBeat)847 public void setNextHeartBeat(boolean nextHeartBeat) { 848 this.nextHeartBeat = nextHeartBeat; 849 } 850 851 @VisibleForTesting getQueueSize()852 public int getQueueSize() { 853 return nodeUpdateQueue.size(); 854 } 855 856 // For test only. 857 @VisibleForTesting getLaunchedContainers()858 public Set<ContainerId> getLaunchedContainers() { 859 return this.launchedContainers; 860 } 861 862 @Override getNodeLabels()863 public Set<String> getNodeLabels() { 864 RMNodeLabelsManager nlm = context.getNodeLabelManager(); 865 if (nlm == null || nlm.getLabelsOnNode(nodeId) == null) { 866 return CommonNodeLabelsManager.EMPTY_STRING_SET; 867 } 868 return nlm.getLabelsOnNode(nodeId); 869 } 870 handleContainerStatus(List<ContainerStatus> containerStatuses)871 private void handleContainerStatus(List<ContainerStatus> containerStatuses) { 872 // Filter the map to only obtain just launched containers and finished 873 // containers. 874 List<ContainerStatus> newlyLaunchedContainers = 875 new ArrayList<ContainerStatus>(); 876 List<ContainerStatus> completedContainers = 877 new ArrayList<ContainerStatus>(); 878 for (ContainerStatus remoteContainer : containerStatuses) { 879 ContainerId containerId = remoteContainer.getContainerId(); 880 881 // Don't bother with containers already scheduled for cleanup, or for 882 // applications already killed. The scheduler doens't need to know any 883 // more about this container 884 if (containersToClean.contains(containerId)) { 885 LOG.info("Container " + containerId + " already scheduled for " 886 + "cleanup, no further processing"); 887 continue; 888 } 889 if (finishedApplications.contains(containerId.getApplicationAttemptId() 890 .getApplicationId())) { 891 LOG.info("Container " + containerId 892 + " belongs to an application that is already killed," 893 + " no further processing"); 894 continue; 895 } 896 897 // Process running containers 898 if (remoteContainer.getState() == ContainerState.RUNNING) { 899 if (!launchedContainers.contains(containerId)) { 900 // Just launched container. RM knows about it the first time. 901 launchedContainers.add(containerId); 902 newlyLaunchedContainers.add(remoteContainer); 903 } 904 } else { 905 // A finished container 906 launchedContainers.remove(containerId); 907 completedContainers.add(remoteContainer); 908 } 909 } 910 if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { 911 nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers, 912 completedContainers)); 913 } 914 } 915 916 } 917