1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.yarn.server.nodemanager;
20 
21 import java.io.IOException;
22 import java.net.ConnectException;
23 import java.nio.ByteBuffer;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Iterator;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.Random;
34 import java.util.Set;
35 
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.classification.InterfaceAudience.Private;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.io.DataInputByteBuffer;
41 import org.apache.hadoop.ipc.RPC;
42 import org.apache.hadoop.security.Credentials;
43 import org.apache.hadoop.security.UserGroupInformation;
44 import org.apache.hadoop.service.AbstractService;
45 import org.apache.hadoop.util.VersionUtil;
46 import org.apache.hadoop.yarn.api.records.ApplicationId;
47 import org.apache.hadoop.yarn.api.records.ContainerId;
48 import org.apache.hadoop.yarn.api.records.ContainerState;
49 import org.apache.hadoop.yarn.api.records.ContainerStatus;
50 import org.apache.hadoop.yarn.api.records.NodeId;
51 import org.apache.hadoop.yarn.api.records.Resource;
52 import org.apache.hadoop.yarn.conf.YarnConfiguration;
53 import org.apache.hadoop.yarn.event.Dispatcher;
54 import org.apache.hadoop.yarn.exceptions.YarnException;
55 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
56 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
57 import org.apache.hadoop.yarn.server.api.ResourceTracker;
58 import org.apache.hadoop.yarn.server.api.ServerRMProxy;
59 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
60 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
61 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
62 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
63 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
64 import org.apache.hadoop.yarn.server.api.records.MasterKey;
65 import org.apache.hadoop.yarn.server.api.records.NodeAction;
66 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
67 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
68 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
69 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
70 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
71 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
72 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
73 import org.apache.hadoop.yarn.util.YarnVersionInfo;
74 
75 import com.google.common.annotations.VisibleForTesting;
76 
77 public class NodeStatusUpdaterImpl extends AbstractService implements
78     NodeStatusUpdater {
79 
80   public static final String YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS =
81       YarnConfiguration.NM_PREFIX + "duration-to-track-stopped-containers";
82 
83   private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class);
84 
85   private final Object heartbeatMonitor = new Object();
86 
87   private final Context context;
88   private final Dispatcher dispatcher;
89 
90   private NodeId nodeId;
91   private long nextHeartBeatInterval;
92   private ResourceTracker resourceTracker;
93   private Resource totalResource;
94   private int httpPort;
95   private String nodeManagerVersionId;
96   private String minimumResourceManagerVersion;
97   private volatile boolean isStopped;
98   private boolean tokenKeepAliveEnabled;
99   private long tokenRemovalDelayMs;
100   /** Keeps track of when the next keep alive request should be sent for an app*/
101   private Map<ApplicationId, Long> appTokenKeepAliveMap =
102       new HashMap<ApplicationId, Long>();
103   private Random keepAliveDelayRandom = new Random();
104   // It will be used to track recently stopped containers on node manager, this
105   // is to avoid the misleading no-such-container exception messages on NM, when
106   // the AM finishes it informs the RM to stop the may-be-already-completed
107   // containers.
108   private final Map<ContainerId, Long> recentlyStoppedContainers;
109   // Save the reported completed containers in case of lost heartbeat responses.
110   // These completed containers will be sent again till a successful response.
111   private final Map<ContainerId, ContainerStatus> pendingCompletedContainers;
112   // Duration for which to track recently stopped container.
113   private long durationToTrackStoppedContainers;
114 
115   private final NodeHealthCheckerService healthChecker;
116   private final NodeManagerMetrics metrics;
117 
118   private Runnable statusUpdaterRunnable;
119   private Thread  statusUpdater;
120   private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
121   Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
122 
NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics)123   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
124       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
125     super(NodeStatusUpdaterImpl.class.getName());
126     this.healthChecker = healthChecker;
127     this.context = context;
128     this.dispatcher = dispatcher;
129     this.metrics = metrics;
130     this.recentlyStoppedContainers =
131         new LinkedHashMap<ContainerId, Long>();
132     this.pendingCompletedContainers =
133         new HashMap<ContainerId, ContainerStatus>();
134   }
135 
136   @Override
serviceInit(Configuration conf)137   protected void serviceInit(Configuration conf) throws Exception {
138     int memoryMb =
139         conf.getInt(
140             YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
141     float vMemToPMem =
142         conf.getFloat(
143             YarnConfiguration.NM_VMEM_PMEM_RATIO,
144             YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
145     int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);
146 
147     int virtualCores =
148         conf.getInt(
149             YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
150 
151     this.totalResource = Resource.newInstance(memoryMb, virtualCores);
152     metrics.addResource(totalResource);
153     this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
154     this.tokenRemovalDelayMs =
155         conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
156             YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
157 
158     this.minimumResourceManagerVersion = conf.get(
159         YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
160         YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
161 
162     // Default duration to track stopped containers on nodemanager is 10Min.
163     // This should not be assigned very large value as it will remember all the
164     // containers stopped during that time.
165     durationToTrackStoppedContainers =
166         conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
167           600000);
168     if (durationToTrackStoppedContainers < 0) {
169       String message = "Invalid configuration for "
170         + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default "
171           + "value is 10Min(600000).";
172       LOG.error(message);
173       throw new YarnException(message);
174     }
175     if (LOG.isDebugEnabled()) {
176       LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :"
177         + durationToTrackStoppedContainers);
178     }
179     super.serviceInit(conf);
180     LOG.info("Initialized nodemanager for " + nodeId + ":" +
181         " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
182         " virtual-cores=" + virtualCores);
183   }
184 
185   @Override
serviceStart()186   protected void serviceStart() throws Exception {
187 
188     // NodeManager is the last service to start, so NodeId is available.
189     this.nodeId = this.context.getNodeId();
190     this.httpPort = this.context.getHttpPort();
191     this.nodeManagerVersionId = YarnVersionInfo.getVersion();
192     try {
193       // Registration has to be in start so that ContainerManager can get the
194       // perNM tokens needed to authenticate ContainerTokens.
195       this.resourceTracker = getRMClient();
196       registerWithRM();
197       super.serviceStart();
198       startStatusUpdater();
199     } catch (Exception e) {
200       String errorMessage = "Unexpected error starting NodeStatusUpdater";
201       LOG.error(errorMessage, e);
202       throw new YarnRuntimeException(e);
203     }
204   }
205 
206   @Override
serviceStop()207   protected void serviceStop() throws Exception {
208     // Interrupt the updater.
209     this.isStopped = true;
210     stopRMProxy();
211     super.serviceStop();
212   }
213 
rebootNodeStatusUpdaterAndRegisterWithRM()214   protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
215     // Interrupt the updater.
216     this.isStopped = true;
217 
218     try {
219       statusUpdater.join();
220       registerWithRM();
221       statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
222       this.isStopped = false;
223       statusUpdater.start();
224       LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
225     } catch (Exception e) {
226       String errorMessage = "Unexpected error rebooting NodeStatusUpdater";
227       LOG.error(errorMessage, e);
228       throw new YarnRuntimeException(e);
229     }
230   }
231 
232   @VisibleForTesting
stopRMProxy()233   protected void stopRMProxy() {
234     if(this.resourceTracker != null) {
235       RPC.stopProxy(this.resourceTracker);
236     }
237   }
238 
239   @Private
isTokenKeepAliveEnabled(Configuration conf)240   protected boolean isTokenKeepAliveEnabled(Configuration conf) {
241     return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
242         YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
243         && UserGroupInformation.isSecurityEnabled();
244   }
245 
246   @VisibleForTesting
getRMClient()247   protected ResourceTracker getRMClient() throws IOException {
248     Configuration conf = getConfig();
249     return ServerRMProxy.createRMProxy(conf, ResourceTracker.class);
250   }
251 
252   @VisibleForTesting
registerWithRM()253   protected void registerWithRM()
254       throws YarnException, IOException {
255     List<NMContainerStatus> containerReports = getNMContainerStatuses();
256     RegisterNodeManagerRequest request =
257         RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
258           nodeManagerVersionId, containerReports, getRunningApplications());
259     if (containerReports != null) {
260       LOG.info("Registering with RM using containers :" + containerReports);
261     }
262     RegisterNodeManagerResponse regNMResponse =
263         resourceTracker.registerNodeManager(request);
264     this.rmIdentifier = regNMResponse.getRMIdentifier();
265     // if the Resourcemanager instructs NM to shutdown.
266     if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
267       String message =
268           "Message from ResourceManager: "
269               + regNMResponse.getDiagnosticsMessage();
270       throw new YarnRuntimeException(
271         "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, "
272             + message);
273     }
274 
275     // if ResourceManager version is too old then shutdown
276     if (!minimumResourceManagerVersion.equals("NONE")){
277       if (minimumResourceManagerVersion.equals("EqualToNM")){
278         minimumResourceManagerVersion = nodeManagerVersionId;
279       }
280       String rmVersion = regNMResponse.getRMVersion();
281       if (rmVersion == null) {
282         String message = "The Resource Manager's did not return a version. "
283             + "Valid version cannot be checked.";
284         throw new YarnRuntimeException("Shutting down the Node Manager. "
285             + message);
286       }
287       if (VersionUtil.compareVersions(rmVersion,minimumResourceManagerVersion) < 0) {
288         String message = "The Resource Manager's version ("
289             + rmVersion +") is less than the minimum "
290             + "allowed version " + minimumResourceManagerVersion;
291         throw new YarnRuntimeException("Shutting down the Node Manager on RM "
292             + "version error, " + message);
293       }
294     }
295     MasterKey masterKey = regNMResponse.getContainerTokenMasterKey();
296     // do this now so that its set before we start heartbeating to RM
297     // It is expected that status updater is started by this point and
298     // RM gives the shared secret in registration during
299     // StatusUpdater#start().
300     if (masterKey != null) {
301       this.context.getContainerTokenSecretManager().setMasterKey(masterKey);
302     }
303 
304     masterKey = regNMResponse.getNMTokenMasterKey();
305     if (masterKey != null) {
306       this.context.getNMTokenSecretManager().setMasterKey(masterKey);
307     }
308 
309     LOG.info("Registered with ResourceManager as " + this.nodeId
310         + " with total resource of " + this.totalResource);
311     LOG.info("Notifying ContainerManager to unblock new container-requests");
312     ((ContainerManagerImpl) this.context.getContainerManager())
313       .setBlockNewContainerRequests(false);
314   }
315 
createKeepAliveApplicationList()316   private List<ApplicationId> createKeepAliveApplicationList() {
317     if (!tokenKeepAliveEnabled) {
318       return Collections.emptyList();
319     }
320 
321     List<ApplicationId> appList = new ArrayList<ApplicationId>();
322     for (Iterator<Entry<ApplicationId, Long>> i =
323         this.appTokenKeepAliveMap.entrySet().iterator(); i.hasNext();) {
324       Entry<ApplicationId, Long> e = i.next();
325       ApplicationId appId = e.getKey();
326       Long nextKeepAlive = e.getValue();
327       if (!this.context.getApplications().containsKey(appId)) {
328         // Remove if the application has finished.
329         i.remove();
330       } else if (System.currentTimeMillis() > nextKeepAlive) {
331         // KeepAlive list for the next hearbeat.
332         appList.add(appId);
333         trackAppForKeepAlive(appId);
334       }
335     }
336     return appList;
337   }
338 
getNodeStatus(int responseId)339   private NodeStatus getNodeStatus(int responseId) throws IOException {
340 
341     NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
342     nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
343     nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
344     nodeHealthStatus.setLastHealthReportTime(healthChecker
345       .getLastHealthReportTime());
346     if (LOG.isDebugEnabled()) {
347       LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
348           + ", " + nodeHealthStatus.getHealthReport());
349     }
350     List<ContainerStatus> containersStatuses = getContainerStatuses();
351     NodeStatus nodeStatus =
352         NodeStatus.newInstance(nodeId, responseId, containersStatuses,
353           createKeepAliveApplicationList(), nodeHealthStatus);
354 
355     return nodeStatus;
356   }
357 
358   // Iterate through the NMContext and clone and get all the containers'
359   // statuses. If it's a completed container, add into the
360   // recentlyStoppedContainers collections.
361   @VisibleForTesting
getContainerStatuses()362   protected List<ContainerStatus> getContainerStatuses() throws IOException {
363     List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
364     for (Container container : this.context.getContainers().values()) {
365       ContainerId containerId = container.getContainerId();
366       ApplicationId applicationId = containerId.getApplicationAttemptId()
367           .getApplicationId();
368       org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
369           container.cloneAndGetContainerStatus();
370       if (containerStatus.getState() == ContainerState.COMPLETE) {
371         if (isApplicationStopped(applicationId)) {
372           if (LOG.isDebugEnabled()) {
373             LOG.debug(applicationId + " is completing, " + " remove "
374                 + containerId + " from NM context.");
375           }
376           context.getContainers().remove(containerId);
377           pendingCompletedContainers.put(containerId, containerStatus);
378         } else {
379           if (!isContainerRecentlyStopped(containerId)) {
380             pendingCompletedContainers.put(containerId, containerStatus);
381           }
382         }
383         // Adding to finished containers cache. Cache will keep it around at
384         // least for #durationToTrackStoppedContainers duration. In the
385         // subsequent call to stop container it will get removed from cache.
386         addCompletedContainer(containerId);
387       } else {
388         containerStatuses.add(containerStatus);
389       }
390     }
391     containerStatuses.addAll(pendingCompletedContainers.values());
392     if (LOG.isDebugEnabled()) {
393       LOG.debug("Sending out " + containerStatuses.size()
394           + " container statuses: " + containerStatuses);
395     }
396     return containerStatuses;
397   }
398 
getRunningApplications()399   private List<ApplicationId> getRunningApplications() {
400     List<ApplicationId> runningApplications = new ArrayList<ApplicationId>();
401     runningApplications.addAll(this.context.getApplications().keySet());
402     return runningApplications;
403   }
404 
405   // These NMContainerStatus are sent on NM registration and used by YARN only.
getNMContainerStatuses()406   private List<NMContainerStatus> getNMContainerStatuses() throws IOException {
407     List<NMContainerStatus> containerStatuses =
408         new ArrayList<NMContainerStatus>();
409     for (Container container : this.context.getContainers().values()) {
410       ContainerId containerId = container.getContainerId();
411       ApplicationId applicationId = containerId.getApplicationAttemptId()
412           .getApplicationId();
413       if (!this.context.getApplications().containsKey(applicationId)) {
414         context.getContainers().remove(containerId);
415         continue;
416       }
417       NMContainerStatus status =
418           container.getNMContainerStatus();
419       containerStatuses.add(status);
420       if (status.getContainerState() == ContainerState.COMPLETE) {
421         // Adding to finished containers cache. Cache will keep it around at
422         // least for #durationToTrackStoppedContainers duration. In the
423         // subsequent call to stop container it will get removed from cache.
424         addCompletedContainer(containerId);
425       }
426     }
427     LOG.info("Sending out " + containerStatuses.size()
428       + " NM container statuses: " + containerStatuses);
429     return containerStatuses;
430   }
431 
isApplicationStopped(ApplicationId applicationId)432   private boolean isApplicationStopped(ApplicationId applicationId) {
433     if (!this.context.getApplications().containsKey(applicationId)) {
434       return true;
435     }
436 
437     ApplicationState applicationState = this.context.getApplications().get(
438         applicationId).getApplicationState();
439     if (applicationState == ApplicationState.FINISHING_CONTAINERS_WAIT
440         || applicationState == ApplicationState.APPLICATION_RESOURCES_CLEANINGUP
441         || applicationState == ApplicationState.FINISHED) {
442       return true;
443     } else {
444       return false;
445     }
446   }
447 
448   @Override
addCompletedContainer(ContainerId containerId)449   public void addCompletedContainer(ContainerId containerId) {
450     synchronized (recentlyStoppedContainers) {
451       removeVeryOldStoppedContainersFromCache();
452       if (!recentlyStoppedContainers.containsKey(containerId)) {
453         recentlyStoppedContainers.put(containerId,
454             System.currentTimeMillis() + durationToTrackStoppedContainers);
455       }
456     }
457   }
458 
459   @VisibleForTesting
460   @Private
removeOrTrackCompletedContainersFromContext( List<ContainerId> containerIds)461   public void removeOrTrackCompletedContainersFromContext(
462       List<ContainerId> containerIds) throws IOException {
463     Set<ContainerId> removedContainers = new HashSet<ContainerId>();
464 
465     pendingContainersToRemove.addAll(containerIds);
466     Iterator<ContainerId> iter = pendingContainersToRemove.iterator();
467     while (iter.hasNext()) {
468       ContainerId containerId = iter.next();
469       // remove the container only if the container is at DONE state
470       Container nmContainer = context.getContainers().get(containerId);
471       if (nmContainer == null) {
472         iter.remove();
473       } else if (nmContainer.getContainerState().equals(
474         org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) {
475         context.getContainers().remove(containerId);
476         removedContainers.add(containerId);
477         iter.remove();
478       }
479     }
480 
481     if (!removedContainers.isEmpty()) {
482       LOG.info("Removed completed containers from NM context: "
483           + removedContainers);
484     }
485     pendingCompletedContainers.clear();
486   }
487 
trackAppsForKeepAlive(List<ApplicationId> appIds)488   private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
489     if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
490       for (ApplicationId appId : appIds) {
491         trackAppForKeepAlive(appId);
492       }
493     }
494   }
495 
trackAppForKeepAlive(ApplicationId appId)496   private void trackAppForKeepAlive(ApplicationId appId) {
497     // Next keepAlive request for app between 0.7 & 0.9 of when the token will
498     // likely expire.
499     long nextTime = System.currentTimeMillis()
500     + (long) (0.7 * tokenRemovalDelayMs + (0.2 * tokenRemovalDelayMs
501         * keepAliveDelayRandom.nextInt(100))/100);
502     appTokenKeepAliveMap.put(appId, nextTime);
503   }
504 
505   @Override
sendOutofBandHeartBeat()506   public void sendOutofBandHeartBeat() {
507     synchronized (this.heartbeatMonitor) {
508       this.heartbeatMonitor.notify();
509     }
510   }
511 
isContainerRecentlyStopped(ContainerId containerId)512   public boolean isContainerRecentlyStopped(ContainerId containerId) {
513     synchronized (recentlyStoppedContainers) {
514       return recentlyStoppedContainers.containsKey(containerId);
515     }
516   }
517 
518   @Override
clearFinishedContainersFromCache()519   public void clearFinishedContainersFromCache() {
520     synchronized (recentlyStoppedContainers) {
521       recentlyStoppedContainers.clear();
522     }
523   }
524 
525   @Private
526   @VisibleForTesting
removeVeryOldStoppedContainersFromCache()527   public void removeVeryOldStoppedContainersFromCache() {
528     synchronized (recentlyStoppedContainers) {
529       long currentTime = System.currentTimeMillis();
530       Iterator<ContainerId> i =
531           recentlyStoppedContainers.keySet().iterator();
532       while (i.hasNext()) {
533         ContainerId cid = i.next();
534         if (recentlyStoppedContainers.get(cid) < currentTime) {
535           if (!context.getContainers().containsKey(cid)) {
536             i.remove();
537             try {
538               context.getNMStateStore().removeContainer(cid);
539             } catch (IOException e) {
540               LOG.error("Unable to remove container " + cid + " in store", e);
541             }
542           }
543         } else {
544           break;
545         }
546       }
547     }
548   }
549 
550   @Override
getRMIdentifier()551   public long getRMIdentifier() {
552     return this.rmIdentifier;
553   }
554 
parseCredentials( Map<ApplicationId, ByteBuffer> systemCredentials)555   private static Map<ApplicationId, Credentials> parseCredentials(
556       Map<ApplicationId, ByteBuffer> systemCredentials) throws IOException {
557     Map<ApplicationId, Credentials> map =
558         new HashMap<ApplicationId, Credentials>();
559     for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) {
560       Credentials credentials = new Credentials();
561       DataInputByteBuffer buf = new DataInputByteBuffer();
562       ByteBuffer buffer = entry.getValue();
563       buffer.rewind();
564       buf.reset(buffer);
565       credentials.readTokenStorageStream(buf);
566       map.put(entry.getKey(), credentials);
567     }
568     if (LOG.isDebugEnabled()) {
569       for (Map.Entry<ApplicationId, Credentials> entry : map.entrySet()) {
570         LOG.debug("Retrieved credentials form RM for " + entry.getKey() + ": "
571             + entry.getValue().getAllTokens());
572       }
573     }
574     return map;
575   }
576 
startStatusUpdater()577   protected void startStatusUpdater() {
578 
579     statusUpdaterRunnable = new Runnable() {
580       @Override
581       @SuppressWarnings("unchecked")
582       public void run() {
583         int lastHeartBeatID = 0;
584         while (!isStopped) {
585           // Send heartbeat
586           try {
587             NodeHeartbeatResponse response = null;
588             NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID);
589 
590             NodeHeartbeatRequest request =
591                 NodeHeartbeatRequest.newInstance(nodeStatus,
592                   NodeStatusUpdaterImpl.this.context
593                     .getContainerTokenSecretManager().getCurrentKey(),
594                   NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
595                     .getCurrentKey());
596             response = resourceTracker.nodeHeartbeat(request);
597             //get next heartbeat interval from response
598             nextHeartBeatInterval = response.getNextHeartBeatInterval();
599             updateMasterKeys(response);
600 
601             if (response.getNodeAction() == NodeAction.SHUTDOWN) {
602               LOG
603                 .warn("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat,"
604                     + " hence shutting down.");
605               LOG.warn("Message from ResourceManager: "
606                   + response.getDiagnosticsMessage());
607               context.setDecommissioned(true);
608               dispatcher.getEventHandler().handle(
609                   new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
610               break;
611             }
612             if (response.getNodeAction() == NodeAction.RESYNC) {
613               LOG.warn("Node is out of sync with ResourceManager,"
614                   + " hence resyncing.");
615               LOG.warn("Message from ResourceManager: "
616                   + response.getDiagnosticsMessage());
617               // Invalidate the RMIdentifier while resync
618               NodeStatusUpdaterImpl.this.rmIdentifier =
619                   ResourceManagerConstants.RM_INVALID_IDENTIFIER;
620               dispatcher.getEventHandler().handle(
621                   new NodeManagerEvent(NodeManagerEventType.RESYNC));
622               pendingCompletedContainers.clear();
623               break;
624             }
625 
626             // Explicitly put this method after checking the resync response. We
627             // don't want to remove the completed containers before resync
628             // because these completed containers will be reported back to RM
629             // when NM re-registers with RM.
630             // Only remove the cleanedup containers that are acked
631             removeOrTrackCompletedContainersFromContext(response
632                   .getContainersToBeRemovedFromNM());
633 
634             lastHeartBeatID = response.getResponseId();
635             List<ContainerId> containersToCleanup = response
636                 .getContainersToCleanup();
637             if (!containersToCleanup.isEmpty()) {
638               dispatcher.getEventHandler().handle(
639                   new CMgrCompletedContainersEvent(containersToCleanup,
640                     CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
641             }
642             List<ApplicationId> appsToCleanup =
643                 response.getApplicationsToCleanup();
644             //Only start tracking for keepAlive on FINISH_APP
645             trackAppsForKeepAlive(appsToCleanup);
646             if (!appsToCleanup.isEmpty()) {
647               dispatcher.getEventHandler().handle(
648                   new CMgrCompletedAppsEvent(appsToCleanup,
649                       CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
650             }
651 
652             Map<ApplicationId, ByteBuffer> systemCredentials =
653                 response.getSystemCredentialsForApps();
654             if (systemCredentials != null && !systemCredentials.isEmpty()) {
655               ((NMContext) context)
656                 .setSystemCrendentialsForApps(parseCredentials(systemCredentials));
657             }
658           } catch (ConnectException e) {
659             //catch and throw the exception if tried MAX wait time to connect RM
660             dispatcher.getEventHandler().handle(
661                 new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
662             throw new YarnRuntimeException(e);
663           } catch (Throwable e) {
664 
665             // TODO Better error handling. Thread can die with the rest of the
666             // NM still running.
667             LOG.error("Caught exception in status-updater", e);
668           } finally {
669             synchronized (heartbeatMonitor) {
670               nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?
671                   YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS :
672                     nextHeartBeatInterval;
673               try {
674                 heartbeatMonitor.wait(nextHeartBeatInterval);
675               } catch (InterruptedException e) {
676                 // Do Nothing
677               }
678             }
679           }
680         }
681       }
682 
683       private void updateMasterKeys(NodeHeartbeatResponse response) {
684         // See if the master-key has rolled over
685         MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
686         if (updatedMasterKey != null) {
687           // Will be non-null only on roll-over on RM side
688           context.getContainerTokenSecretManager().setMasterKey(updatedMasterKey);
689         }
690 
691         updatedMasterKey = response.getNMTokenMasterKey();
692         if (updatedMasterKey != null) {
693           context.getNMTokenSecretManager().setMasterKey(updatedMasterKey);
694         }
695       }
696     };
697     statusUpdater =
698         new Thread(statusUpdaterRunnable, "Node Status Updater");
699     statusUpdater.start();
700   }
701 
702 
703 }
704