1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
20 
21 import java.util.ArrayList;
22 import java.util.EnumSet;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.TreeSet;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.locks.ReentrantReadWriteLock;
29 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
30 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
31 
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.classification.InterfaceAudience.Private;
35 import org.apache.hadoop.classification.InterfaceStability.Unstable;
36 import org.apache.hadoop.net.Node;
37 import org.apache.hadoop.security.UserGroupInformation;
38 import org.apache.hadoop.yarn.api.records.ApplicationId;
39 import org.apache.hadoop.yarn.api.records.ContainerId;
40 import org.apache.hadoop.yarn.api.records.ContainerState;
41 import org.apache.hadoop.yarn.api.records.ContainerStatus;
42 import org.apache.hadoop.yarn.api.records.NodeId;
43 import org.apache.hadoop.yarn.api.records.NodeState;
44 import org.apache.hadoop.yarn.api.records.Resource;
45 import org.apache.hadoop.yarn.api.records.ResourceOption;
46 import org.apache.hadoop.yarn.event.EventHandler;
47 import org.apache.hadoop.yarn.factories.RecordFactory;
48 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
49 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
50 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
51 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
52 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
53 import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
54 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
55 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
56 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
57 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
58 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
59 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
60 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
61 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
62 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
63 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
64 import org.apache.hadoop.yarn.server.utils.BuilderUtils.ContainerIdComparator;
65 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
66 import org.apache.hadoop.yarn.state.MultipleArcTransition;
67 import org.apache.hadoop.yarn.state.SingleArcTransition;
68 import org.apache.hadoop.yarn.state.StateMachine;
69 import org.apache.hadoop.yarn.state.StateMachineFactory;
70 
71 import com.google.common.annotations.VisibleForTesting;
72 
73 /**
74  * This class is used to keep track of all the applications/containers
75  * running on a node.
76  *
77  */
78 @Private
79 @Unstable
80 @SuppressWarnings("unchecked")
81 public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
82 
83   private static final Log LOG = LogFactory.getLog(RMNodeImpl.class);
84 
85   private static final RecordFactory recordFactory = RecordFactoryProvider
86       .getRecordFactory(null);
87 
88   private final ReadLock readLock;
89   private final WriteLock writeLock;
90 
91   private final ConcurrentLinkedQueue<UpdatedContainerInfo> nodeUpdateQueue;
92   private volatile boolean nextHeartBeat = true;
93 
94   private final NodeId nodeId;
95   private final RMContext context;
96   private final String hostName;
97   private final int commandPort;
98   private int httpPort;
99   private final String nodeAddress; // The containerManager address
100   private String httpAddress;
101   private volatile Resource totalCapability;
102   private final Node node;
103 
104   private String healthReport;
105   private long lastHealthReportTime;
106   private String nodeManagerVersion;
107 
108   /* set of containers that have just launched */
109   private final Set<ContainerId> launchedContainers =
110     new HashSet<ContainerId>();
111 
112   /* set of containers that need to be cleaned */
113   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
114       new ContainerIdComparator());
115 
116   /*
117    * set of containers to notify NM to remove them from its context. Currently,
118    * this includes containers that were notified to AM about their completion
119    */
120   private final Set<ContainerId> containersToBeRemovedFromNM =
121       new HashSet<ContainerId>();
122 
123   /* the list of applications that have finished and need to be purged */
124   private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
125 
126   private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
127       .newRecordInstance(NodeHeartbeatResponse.class);
128 
129   private static final StateMachineFactory<RMNodeImpl,
130                                            NodeState,
131                                            RMNodeEventType,
132                                            RMNodeEvent> stateMachineFactory
133                  = new StateMachineFactory<RMNodeImpl,
134                                            NodeState,
135                                            RMNodeEventType,
136                                            RMNodeEvent>(NodeState.NEW)
137 
138      //Transitions from NEW state
139      .addTransition(NodeState.NEW, NodeState.RUNNING,
140          RMNodeEventType.STARTED, new AddNodeTransition())
141      .addTransition(NodeState.NEW, NodeState.NEW,
142          RMNodeEventType.RESOURCE_UPDATE,
143          new UpdateNodeResourceWhenUnusableTransition())
144 
145      //Transitions from RUNNING state
146      .addTransition(NodeState.RUNNING,
147          EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
148          RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
149      .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED,
150          RMNodeEventType.DECOMMISSION,
151          new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
152      .addTransition(NodeState.RUNNING, NodeState.LOST,
153          RMNodeEventType.EXPIRE,
154          new DeactivateNodeTransition(NodeState.LOST))
155      .addTransition(NodeState.RUNNING, NodeState.REBOOTED,
156          RMNodeEventType.REBOOTING,
157          new DeactivateNodeTransition(NodeState.REBOOTED))
158      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
159          RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
160      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
161          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
162      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
163          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
164          new AddContainersToBeRemovedFromNMTransition())
165      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
166          RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
167      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
168          RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
169 
170      //Transitions from REBOOTED state
171      .addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
172          RMNodeEventType.RESOURCE_UPDATE,
173          new UpdateNodeResourceWhenUnusableTransition())
174 
175      //Transitions from DECOMMISSIONED state
176      .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
177          RMNodeEventType.RESOURCE_UPDATE,
178          new UpdateNodeResourceWhenUnusableTransition())
179      .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
180          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
181          new AddContainersToBeRemovedFromNMTransition())
182 
183      //Transitions from LOST state
184      .addTransition(NodeState.LOST, NodeState.LOST,
185          RMNodeEventType.RESOURCE_UPDATE,
186          new UpdateNodeResourceWhenUnusableTransition())
187      .addTransition(NodeState.LOST, NodeState.LOST,
188          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
189          new AddContainersToBeRemovedFromNMTransition())
190 
191      //Transitions from UNHEALTHY state
192      .addTransition(NodeState.UNHEALTHY,
193          EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING),
194          RMNodeEventType.STATUS_UPDATE,
195          new StatusUpdateWhenUnHealthyTransition())
196      .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED,
197          RMNodeEventType.DECOMMISSION,
198          new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
199      .addTransition(NodeState.UNHEALTHY, NodeState.LOST,
200          RMNodeEventType.EXPIRE,
201          new DeactivateNodeTransition(NodeState.LOST))
202      .addTransition(NodeState.UNHEALTHY, NodeState.REBOOTED,
203          RMNodeEventType.REBOOTING,
204          new DeactivateNodeTransition(NodeState.REBOOTED))
205      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
206          RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
207      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
208          RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
209      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
210          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
211      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
212          RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
213      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
214          RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
215          new AddContainersToBeRemovedFromNMTransition())
216 
217      // create the topology tables
218      .installTopology();
219 
220   private final StateMachine<NodeState, RMNodeEventType,
221                              RMNodeEvent> stateMachine;
222 
RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion)223   public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
224       int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) {
225     this.nodeId = nodeId;
226     this.context = context;
227     this.hostName = hostName;
228     this.commandPort = cmPort;
229     this.httpPort = httpPort;
230     this.totalCapability = capability;
231     this.nodeAddress = hostName + ":" + cmPort;
232     this.httpAddress = hostName + ":" + httpPort;
233     this.node = node;
234     this.healthReport = "Healthy";
235     this.lastHealthReportTime = System.currentTimeMillis();
236     this.nodeManagerVersion = nodeManagerVersion;
237 
238     this.latestNodeHeartBeatResponse.setResponseId(0);
239 
240     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
241     this.readLock = lock.readLock();
242     this.writeLock = lock.writeLock();
243 
244     this.stateMachine = stateMachineFactory.make(this);
245 
246     this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
247   }
248 
249   @Override
toString()250   public String toString() {
251     return this.nodeId.toString();
252   }
253 
254   @Override
getHostName()255   public String getHostName() {
256     return hostName;
257   }
258 
259   @Override
getCommandPort()260   public int getCommandPort() {
261     return commandPort;
262   }
263 
264   @Override
getHttpPort()265   public int getHttpPort() {
266     return httpPort;
267   }
268 
269   @Override
getNodeID()270   public NodeId getNodeID() {
271     return this.nodeId;
272   }
273 
274   @Override
getNodeAddress()275   public String getNodeAddress() {
276     return this.nodeAddress;
277   }
278 
279   @Override
getHttpAddress()280   public String getHttpAddress() {
281     return this.httpAddress;
282   }
283 
284   @Override
getTotalCapability()285   public Resource getTotalCapability() {
286     return this.totalCapability;
287   }
288 
289   @Override
getRackName()290   public String getRackName() {
291     return node.getNetworkLocation();
292   }
293 
294   @Override
getNode()295   public Node getNode() {
296     return this.node;
297   }
298 
299   @Override
getHealthReport()300   public String getHealthReport() {
301     this.readLock.lock();
302 
303     try {
304       return this.healthReport;
305     } finally {
306       this.readLock.unlock();
307     }
308   }
309 
setHealthReport(String healthReport)310   public void setHealthReport(String healthReport) {
311     this.writeLock.lock();
312 
313     try {
314       this.healthReport = healthReport;
315     } finally {
316       this.writeLock.unlock();
317     }
318   }
319 
setLastHealthReportTime(long lastHealthReportTime)320   public void setLastHealthReportTime(long lastHealthReportTime) {
321     this.writeLock.lock();
322 
323     try {
324       this.lastHealthReportTime = lastHealthReportTime;
325     } finally {
326       this.writeLock.unlock();
327     }
328   }
329 
330   @Override
getLastHealthReportTime()331   public long getLastHealthReportTime() {
332     this.readLock.lock();
333 
334     try {
335       return this.lastHealthReportTime;
336     } finally {
337       this.readLock.unlock();
338     }
339   }
340 
341   @Override
getNodeManagerVersion()342   public String getNodeManagerVersion() {
343     return nodeManagerVersion;
344   }
345 
346   @Override
getState()347   public NodeState getState() {
348     this.readLock.lock();
349 
350     try {
351       return this.stateMachine.getCurrentState();
352     } finally {
353       this.readLock.unlock();
354     }
355   }
356 
357   @Override
getAppsToCleanup()358   public List<ApplicationId> getAppsToCleanup() {
359     this.readLock.lock();
360 
361     try {
362       return new ArrayList<ApplicationId>(this.finishedApplications);
363     } finally {
364       this.readLock.unlock();
365     }
366 
367   }
368 
369   @Override
getContainersToCleanUp()370   public List<ContainerId> getContainersToCleanUp() {
371 
372     this.readLock.lock();
373 
374     try {
375       return new ArrayList<ContainerId>(this.containersToClean);
376     } finally {
377       this.readLock.unlock();
378     }
379   };
380 
381   @Override
updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response)382   public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
383     this.writeLock.lock();
384 
385     try {
386       response.addAllContainersToCleanup(
387           new ArrayList<ContainerId>(this.containersToClean));
388       response.addAllApplicationsToCleanup(this.finishedApplications);
389       response.addContainersToBeRemovedFromNM(
390           new ArrayList<ContainerId>(this.containersToBeRemovedFromNM));
391       this.containersToClean.clear();
392       this.finishedApplications.clear();
393       this.containersToBeRemovedFromNM.clear();
394     } finally {
395       this.writeLock.unlock();
396     }
397   };
398 
399   @Override
getLastNodeHeartBeatResponse()400   public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
401 
402     this.readLock.lock();
403 
404     try {
405       return this.latestNodeHeartBeatResponse;
406     } finally {
407       this.readLock.unlock();
408     }
409   }
410 
411   @Override
resetLastNodeHeartBeatResponse()412   public void resetLastNodeHeartBeatResponse() {
413     this.writeLock.lock();
414     try {
415       latestNodeHeartBeatResponse.setResponseId(0);
416     } finally {
417       this.writeLock.unlock();
418     }
419   }
420 
handle(RMNodeEvent event)421   public void handle(RMNodeEvent event) {
422     LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
423     try {
424       writeLock.lock();
425       NodeState oldState = getState();
426       try {
427          stateMachine.doTransition(event.getType(), event);
428       } catch (InvalidStateTransitonException e) {
429         LOG.error("Can't handle this event at current state", e);
430         LOG.error("Invalid event " + event.getType() +
431             " on Node  " + this.nodeId);
432       }
433       if (oldState != getState()) {
434         LOG.info(nodeId + " Node Transitioned from " + oldState + " to "
435                  + getState());
436       }
437     }
438 
439     finally {
440       writeLock.unlock();
441     }
442   }
443 
updateMetricsForRejoinedNode(NodeState previousNodeState)444   private void updateMetricsForRejoinedNode(NodeState previousNodeState) {
445     ClusterMetrics metrics = ClusterMetrics.getMetrics();
446     metrics.incrNumActiveNodes();
447 
448     switch (previousNodeState) {
449     case LOST:
450       metrics.decrNumLostNMs();
451       break;
452     case REBOOTED:
453       metrics.decrNumRebootedNMs();
454       break;
455     case DECOMMISSIONED:
456       metrics.decrDecommisionedNMs();
457       break;
458     case UNHEALTHY:
459       metrics.decrNumUnhealthyNMs();
460       break;
461     default:
462       LOG.debug("Unexpected previous node state");
463     }
464   }
465 
updateMetricsForDeactivatedNode(NodeState initialState, NodeState finalState)466   private void updateMetricsForDeactivatedNode(NodeState initialState,
467                                                NodeState finalState) {
468     ClusterMetrics metrics = ClusterMetrics.getMetrics();
469 
470     switch (initialState) {
471       case RUNNING:
472         metrics.decrNumActiveNodes();
473         break;
474       case UNHEALTHY:
475         metrics.decrNumUnhealthyNMs();
476         break;
477       default:
478         LOG.debug("Unexpected inital state");
479     }
480 
481     switch (finalState) {
482     case DECOMMISSIONED:
483         metrics.incrDecommisionedNMs();
484       break;
485     case LOST:
486       metrics.incrNumLostNMs();
487       break;
488     case REBOOTED:
489       metrics.incrNumRebootedNMs();
490       break;
491     case UNHEALTHY:
492       metrics.incrNumUnhealthyNMs();
493       break;
494     default:
495       LOG.debug("Unexpected final state");
496     }
497   }
498 
handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context, ApplicationId appId, NodeId nodeId)499   private static void handleRunningAppOnNode(RMNodeImpl rmNode,
500       RMContext context, ApplicationId appId, NodeId nodeId) {
501     RMApp app = context.getRMApps().get(appId);
502 
503     // if we failed getting app by appId, maybe something wrong happened, just
504     // add the app to the finishedApplications list so that the app can be
505     // cleaned up on the NM
506     if (null == app) {
507       LOG.warn("Cannot get RMApp by appId=" + appId
508           + ", just added it to finishedApplications list for cleanup");
509       rmNode.finishedApplications.add(appId);
510       return;
511     }
512 
513     context.getDispatcher().getEventHandler()
514         .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
515   }
516 
updateNodeResourceFromEvent(RMNodeImpl rmNode, RMNodeResourceUpdateEvent event)517   private static void updateNodeResourceFromEvent(RMNodeImpl rmNode,
518      RMNodeResourceUpdateEvent event){
519       ResourceOption resourceOption = event.getResourceOption();
520       // Set resource on RMNode
521       rmNode.totalCapability = resourceOption.getResource();
522   }
523 
524   public static class AddNodeTransition implements
525       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
526 
527     @Override
transition(RMNodeImpl rmNode, RMNodeEvent event)528     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
529       // Inform the scheduler
530       RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
531       List<NMContainerStatus> containers = null;
532 
533       String host = rmNode.nodeId.getHost();
534       if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
535         // Old node rejoining
536         RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(host);
537         rmNode.context.getInactiveRMNodes().remove(host);
538         rmNode.updateMetricsForRejoinedNode(previouRMNode.getState());
539       } else {
540         // Increment activeNodes explicitly because this is a new node.
541         ClusterMetrics.getMetrics().incrNumActiveNodes();
542         containers = startEvent.getNMContainerStatuses();
543         if (containers != null && !containers.isEmpty()) {
544           for (NMContainerStatus container : containers) {
545             if (container.getContainerState() == ContainerState.RUNNING) {
546               rmNode.launchedContainers.add(container.getContainerId());
547             }
548           }
549         }
550       }
551 
552       if (null != startEvent.getRunningApplications()) {
553         for (ApplicationId appId : startEvent.getRunningApplications()) {
554           handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
555         }
556       }
557 
558       rmNode.context.getDispatcher().getEventHandler()
559         .handle(new NodeAddedSchedulerEvent(rmNode, containers));
560       rmNode.context.getDispatcher().getEventHandler().handle(
561         new NodesListManagerEvent(
562             NodesListManagerEventType.NODE_USABLE, rmNode));
563     }
564   }
565 
566   public static class ReconnectNodeTransition implements
567       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
568 
569     @Override
transition(RMNodeImpl rmNode, RMNodeEvent event)570     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
571       RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event;
572       RMNode newNode = reconnectEvent.getReconnectedNode();
573       rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
574       List<ApplicationId> runningApps = reconnectEvent.getRunningApplications();
575       boolean noRunningApps =
576           (runningApps == null) || (runningApps.size() == 0);
577 
578       // No application running on the node, so send node-removal event with
579       // cleaning up old container info.
580       if (noRunningApps) {
581         rmNode.nodeUpdateQueue.clear();
582         rmNode.context.getDispatcher().getEventHandler().handle(
583             new NodeRemovedSchedulerEvent(rmNode));
584 
585         if (rmNode.getHttpPort() == newNode.getHttpPort()) {
586           if (!rmNode.getTotalCapability().equals(
587               newNode.getTotalCapability())) {
588             rmNode.totalCapability = newNode.getTotalCapability();
589           }
590           if (rmNode.getState().equals(NodeState.RUNNING)) {
591             // Only add old node if old state is RUNNING
592             rmNode.context.getDispatcher().getEventHandler().handle(
593                 new NodeAddedSchedulerEvent(rmNode));
594           }
595         } else {
596           // Reconnected node differs, so replace old node and start new node
597           switch (rmNode.getState()) {
598             case RUNNING:
599               ClusterMetrics.getMetrics().decrNumActiveNodes();
600               break;
601             case UNHEALTHY:
602               ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
603               break;
604             default:
605               LOG.debug("Unexpected Rmnode state");
606             }
607             rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
608             rmNode.context.getDispatcher().getEventHandler().handle(
609                 new RMNodeStartedEvent(newNode.getNodeID(), null, null));
610         }
611       } else {
612         rmNode.httpPort = newNode.getHttpPort();
613         rmNode.httpAddress = newNode.getHttpAddress();
614         boolean isCapabilityChanged = false;
615         if (!rmNode.getTotalCapability().equals(
616             newNode.getTotalCapability())) {
617           rmNode.totalCapability = newNode.getTotalCapability();
618           isCapabilityChanged = true;
619         }
620 
621         handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode);
622 
623         for (ApplicationId appId : reconnectEvent.getRunningApplications()) {
624           handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
625         }
626 
627         if (isCapabilityChanged
628             && rmNode.getState().equals(NodeState.RUNNING)) {
629           // Update scheduler node's capacity for reconnect node.
630           rmNode.context
631               .getDispatcher()
632               .getEventHandler()
633               .handle(
634                   new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption
635                       .newInstance(newNode.getTotalCapability(), -1)));
636         }
637       }
638     }
639 
handleNMContainerStatus( List<NMContainerStatus> nmContainerStatuses, RMNodeImpl rmnode)640     private void handleNMContainerStatus(
641         List<NMContainerStatus> nmContainerStatuses, RMNodeImpl rmnode) {
642       List<ContainerStatus> containerStatuses =
643           new ArrayList<ContainerStatus>();
644       for (NMContainerStatus nmContainerStatus : nmContainerStatuses) {
645         containerStatuses.add(createContainerStatus(nmContainerStatus));
646       }
647       rmnode.handleContainerStatus(containerStatuses);
648     }
649 
createContainerStatus( NMContainerStatus remoteContainer)650     private ContainerStatus createContainerStatus(
651         NMContainerStatus remoteContainer) {
652       ContainerStatus cStatus =
653           ContainerStatus.newInstance(remoteContainer.getContainerId(),
654               remoteContainer.getContainerState(),
655               remoteContainer.getDiagnostics(),
656               remoteContainer.getContainerExitStatus());
657       return cStatus;
658     }
659   }
660 
661   public static class UpdateNodeResourceWhenRunningTransition
662       implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
663 
664     @Override
transition(RMNodeImpl rmNode, RMNodeEvent event)665     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
666       RMNodeResourceUpdateEvent updateEvent = (RMNodeResourceUpdateEvent)event;
667       updateNodeResourceFromEvent(rmNode, updateEvent);
668       // Notify new resourceOption to scheduler
669       rmNode.context.getDispatcher().getEventHandler().handle(
670           new NodeResourceUpdateSchedulerEvent(rmNode, updateEvent.getResourceOption()));
671     }
672   }
673 
674   public static class UpdateNodeResourceWhenUnusableTransition
675       implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
676 
677     @Override
transition(RMNodeImpl rmNode, RMNodeEvent event)678     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
679       // The node is not usable, only log a warn message
680       LOG.warn("Try to update resource on a "+ rmNode.getState().toString() +
681           " node: "+rmNode.toString());
682       updateNodeResourceFromEvent(rmNode, (RMNodeResourceUpdateEvent)event);
683       // No need to notify scheduler as schedulerNode is not function now
684       // and can sync later from RMnode.
685     }
686   }
687 
688   public static class CleanUpAppTransition
689     implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
690 
691     @Override
transition(RMNodeImpl rmNode, RMNodeEvent event)692     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
693       rmNode.finishedApplications.add(((
694           RMNodeCleanAppEvent) event).getAppId());
695     }
696   }
697 
698   public static class CleanUpContainerTransition implements
699       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
700 
701     @Override
transition(RMNodeImpl rmNode, RMNodeEvent event)702     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
703       rmNode.containersToClean.add(((
704           RMNodeCleanContainerEvent) event).getContainerId());
705     }
706   }
707 
708   public static class AddContainersToBeRemovedFromNMTransition implements
709       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
710 
711     @Override
transition(RMNodeImpl rmNode, RMNodeEvent event)712     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
713       rmNode.containersToBeRemovedFromNM.addAll(((
714           RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
715     }
716   }
717 
718   public static class DeactivateNodeTransition
719     implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
720 
721     private final NodeState finalState;
DeactivateNodeTransition(NodeState finalState)722     public DeactivateNodeTransition(NodeState finalState) {
723       this.finalState = finalState;
724     }
725 
726     @Override
transition(RMNodeImpl rmNode, RMNodeEvent event)727     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
728       // Inform the scheduler
729       rmNode.nodeUpdateQueue.clear();
730       // If the current state is NodeState.UNHEALTHY
731       // Then node is already been removed from the
732       // Scheduler
733       NodeState initialState = rmNode.getState();
734       if (!initialState.equals(NodeState.UNHEALTHY)) {
735         rmNode.context.getDispatcher().getEventHandler()
736           .handle(new NodeRemovedSchedulerEvent(rmNode));
737       }
738       rmNode.context.getDispatcher().getEventHandler().handle(
739           new NodesListManagerEvent(
740               NodesListManagerEventType.NODE_UNUSABLE, rmNode));
741 
742       // Deactivate the node
743       rmNode.context.getRMNodes().remove(rmNode.nodeId);
744       LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
745           + finalState);
746       rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
747 
748       //Update the metrics
749       rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
750     }
751   }
752 
753   public static class StatusUpdateWhenHealthyTransition implements
754       MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
755     @Override
transition(RMNodeImpl rmNode, RMNodeEvent event)756     public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
757 
758       RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
759 
760       // Switch the last heartbeatresponse.
761       rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
762 
763       NodeHealthStatus remoteNodeHealthStatus =
764           statusEvent.getNodeHealthStatus();
765       rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
766       rmNode.setLastHealthReportTime(
767           remoteNodeHealthStatus.getLastHealthReportTime());
768       if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
769         LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: "
770             + remoteNodeHealthStatus.getHealthReport());
771         rmNode.nodeUpdateQueue.clear();
772         // Inform the scheduler
773         rmNode.context.getDispatcher().getEventHandler().handle(
774             new NodeRemovedSchedulerEvent(rmNode));
775         rmNode.context.getDispatcher().getEventHandler().handle(
776             new NodesListManagerEvent(
777                 NodesListManagerEventType.NODE_UNUSABLE, rmNode));
778         // Update metrics
779         rmNode.updateMetricsForDeactivatedNode(rmNode.getState(),
780             NodeState.UNHEALTHY);
781         return NodeState.UNHEALTHY;
782       }
783 
784       rmNode.handleContainerStatus(statusEvent.getContainers());
785 
786       if(rmNode.nextHeartBeat) {
787         rmNode.nextHeartBeat = false;
788         rmNode.context.getDispatcher().getEventHandler().handle(
789             new NodeUpdateSchedulerEvent(rmNode));
790       }
791 
792       // Update DTRenewer in secure mode to keep these apps alive. Today this is
793       // needed for log-aggregation to finish long after the apps are gone.
794       if (UserGroupInformation.isSecurityEnabled()) {
795         rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
796           statusEvent.getKeepAliveAppIds());
797       }
798 
799       return NodeState.RUNNING;
800     }
801   }
802 
803   public static class StatusUpdateWhenUnHealthyTransition implements
804       MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
805 
806     @Override
transition(RMNodeImpl rmNode, RMNodeEvent event)807     public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
808       RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
809 
810       // Switch the last heartbeatresponse.
811       rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
812       NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
813       rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
814       rmNode.setLastHealthReportTime(
815           remoteNodeHealthStatus.getLastHealthReportTime());
816       if (remoteNodeHealthStatus.getIsNodeHealthy()) {
817         rmNode.context.getDispatcher().getEventHandler().handle(
818             new NodeAddedSchedulerEvent(rmNode));
819         rmNode.context.getDispatcher().getEventHandler().handle(
820                 new NodesListManagerEvent(
821                     NodesListManagerEventType.NODE_USABLE, rmNode));
822         // ??? how about updating metrics before notifying to ensure that
823         // notifiers get update metadata because they will very likely query it
824         // upon notification
825         // Update metrics
826         rmNode.updateMetricsForRejoinedNode(NodeState.UNHEALTHY);
827         return NodeState.RUNNING;
828       }
829 
830       return NodeState.UNHEALTHY;
831     }
832   }
833 
834   @Override
pullContainerUpdates()835   public List<UpdatedContainerInfo> pullContainerUpdates() {
836     List<UpdatedContainerInfo> latestContainerInfoList =
837         new ArrayList<UpdatedContainerInfo>();
838     UpdatedContainerInfo containerInfo;
839     while ((containerInfo = nodeUpdateQueue.poll()) != null) {
840       latestContainerInfoList.add(containerInfo);
841     }
842     this.nextHeartBeat = true;
843     return latestContainerInfoList;
844   }
845 
846   @VisibleForTesting
setNextHeartBeat(boolean nextHeartBeat)847   public void setNextHeartBeat(boolean nextHeartBeat) {
848     this.nextHeartBeat = nextHeartBeat;
849   }
850 
851   @VisibleForTesting
getQueueSize()852   public int getQueueSize() {
853     return nodeUpdateQueue.size();
854   }
855 
856   // For test only.
857   @VisibleForTesting
getLaunchedContainers()858   public Set<ContainerId> getLaunchedContainers() {
859     return this.launchedContainers;
860   }
861 
862   @Override
getNodeLabels()863   public Set<String> getNodeLabels() {
864     RMNodeLabelsManager nlm = context.getNodeLabelManager();
865     if (nlm == null || nlm.getLabelsOnNode(nodeId) == null) {
866       return CommonNodeLabelsManager.EMPTY_STRING_SET;
867     }
868     return nlm.getLabelsOnNode(nodeId);
869   }
870 
handleContainerStatus(List<ContainerStatus> containerStatuses)871   private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
872     // Filter the map to only obtain just launched containers and finished
873     // containers.
874     List<ContainerStatus> newlyLaunchedContainers =
875         new ArrayList<ContainerStatus>();
876     List<ContainerStatus> completedContainers =
877         new ArrayList<ContainerStatus>();
878     for (ContainerStatus remoteContainer : containerStatuses) {
879       ContainerId containerId = remoteContainer.getContainerId();
880 
881       // Don't bother with containers already scheduled for cleanup, or for
882       // applications already killed. The scheduler doens't need to know any
883       // more about this container
884       if (containersToClean.contains(containerId)) {
885         LOG.info("Container " + containerId + " already scheduled for "
886             + "cleanup, no further processing");
887         continue;
888       }
889       if (finishedApplications.contains(containerId.getApplicationAttemptId()
890           .getApplicationId())) {
891         LOG.info("Container " + containerId
892             + " belongs to an application that is already killed,"
893             + " no further processing");
894         continue;
895       }
896 
897       // Process running containers
898       if (remoteContainer.getState() == ContainerState.RUNNING) {
899         if (!launchedContainers.contains(containerId)) {
900           // Just launched container. RM knows about it the first time.
901           launchedContainers.add(containerId);
902           newlyLaunchedContainers.add(remoteContainer);
903         }
904       } else {
905         // A finished container
906         launchedContainers.remove(containerId);
907         completedContainers.add(remoteContainer);
908       }
909     }
910     if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
911       nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
912           completedContainers));
913     }
914   }
915 
916  }
917