1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.nodemanager; 20 21 import java.io.IOException; 22 import java.net.ConnectException; 23 import java.nio.ByteBuffer; 24 import java.util.ArrayList; 25 import java.util.Collections; 26 import java.util.HashMap; 27 import java.util.HashSet; 28 import java.util.Iterator; 29 import java.util.LinkedHashMap; 30 import java.util.List; 31 import java.util.Map; 32 import java.util.Map.Entry; 33 import java.util.Random; 34 import java.util.Set; 35 36 import org.apache.commons.logging.Log; 37 import org.apache.commons.logging.LogFactory; 38 import org.apache.hadoop.classification.InterfaceAudience.Private; 39 import org.apache.hadoop.conf.Configuration; 40 import org.apache.hadoop.io.DataInputByteBuffer; 41 import org.apache.hadoop.ipc.RPC; 42 import org.apache.hadoop.security.Credentials; 43 import org.apache.hadoop.security.UserGroupInformation; 44 import org.apache.hadoop.service.AbstractService; 45 import org.apache.hadoop.util.VersionUtil; 46 import org.apache.hadoop.yarn.api.records.ApplicationId; 47 import org.apache.hadoop.yarn.api.records.ContainerId; 48 import org.apache.hadoop.yarn.api.records.ContainerState; 49 import org.apache.hadoop.yarn.api.records.ContainerStatus; 50 import org.apache.hadoop.yarn.api.records.NodeId; 51 import org.apache.hadoop.yarn.api.records.Resource; 52 import org.apache.hadoop.yarn.conf.YarnConfiguration; 53 import org.apache.hadoop.yarn.event.Dispatcher; 54 import org.apache.hadoop.yarn.exceptions.YarnException; 55 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 56 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; 57 import org.apache.hadoop.yarn.server.api.ResourceTracker; 58 import org.apache.hadoop.yarn.server.api.ServerRMProxy; 59 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; 60 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; 61 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; 62 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; 63 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; 64 import org.apache.hadoop.yarn.server.api.records.MasterKey; 65 import org.apache.hadoop.yarn.server.api.records.NodeAction; 66 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; 67 import org.apache.hadoop.yarn.server.api.records.NodeStatus; 68 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; 69 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; 70 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; 71 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; 72 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; 73 import org.apache.hadoop.yarn.util.YarnVersionInfo; 74 75 import com.google.common.annotations.VisibleForTesting; 76 77 public class NodeStatusUpdaterImpl extends AbstractService implements 78 NodeStatusUpdater { 79 80 public static final String YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS = 81 YarnConfiguration.NM_PREFIX + "duration-to-track-stopped-containers"; 82 83 private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class); 84 85 private final Object heartbeatMonitor = new Object(); 86 87 private final Context context; 88 private final Dispatcher dispatcher; 89 90 private NodeId nodeId; 91 private long nextHeartBeatInterval; 92 private ResourceTracker resourceTracker; 93 private Resource totalResource; 94 private int httpPort; 95 private String nodeManagerVersionId; 96 private String minimumResourceManagerVersion; 97 private volatile boolean isStopped; 98 private boolean tokenKeepAliveEnabled; 99 private long tokenRemovalDelayMs; 100 /** Keeps track of when the next keep alive request should be sent for an app*/ 101 private Map<ApplicationId, Long> appTokenKeepAliveMap = 102 new HashMap<ApplicationId, Long>(); 103 private Random keepAliveDelayRandom = new Random(); 104 // It will be used to track recently stopped containers on node manager, this 105 // is to avoid the misleading no-such-container exception messages on NM, when 106 // the AM finishes it informs the RM to stop the may-be-already-completed 107 // containers. 108 private final Map<ContainerId, Long> recentlyStoppedContainers; 109 // Save the reported completed containers in case of lost heartbeat responses. 110 // These completed containers will be sent again till a successful response. 111 private final Map<ContainerId, ContainerStatus> pendingCompletedContainers; 112 // Duration for which to track recently stopped container. 113 private long durationToTrackStoppedContainers; 114 115 private final NodeHealthCheckerService healthChecker; 116 private final NodeManagerMetrics metrics; 117 118 private Runnable statusUpdaterRunnable; 119 private Thread statusUpdater; 120 private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; 121 Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>(); 122 NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics)123 public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, 124 NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { 125 super(NodeStatusUpdaterImpl.class.getName()); 126 this.healthChecker = healthChecker; 127 this.context = context; 128 this.dispatcher = dispatcher; 129 this.metrics = metrics; 130 this.recentlyStoppedContainers = 131 new LinkedHashMap<ContainerId, Long>(); 132 this.pendingCompletedContainers = 133 new HashMap<ContainerId, ContainerStatus>(); 134 } 135 136 @Override serviceInit(Configuration conf)137 protected void serviceInit(Configuration conf) throws Exception { 138 int memoryMb = 139 conf.getInt( 140 YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB); 141 float vMemToPMem = 142 conf.getFloat( 143 YarnConfiguration.NM_VMEM_PMEM_RATIO, 144 YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); 145 int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem); 146 147 int virtualCores = 148 conf.getInt( 149 YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES); 150 151 this.totalResource = Resource.newInstance(memoryMb, virtualCores); 152 metrics.addResource(totalResource); 153 this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf); 154 this.tokenRemovalDelayMs = 155 conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 156 YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); 157 158 this.minimumResourceManagerVersion = conf.get( 159 YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION, 160 YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION); 161 162 // Default duration to track stopped containers on nodemanager is 10Min. 163 // This should not be assigned very large value as it will remember all the 164 // containers stopped during that time. 165 durationToTrackStoppedContainers = 166 conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, 167 600000); 168 if (durationToTrackStoppedContainers < 0) { 169 String message = "Invalid configuration for " 170 + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default " 171 + "value is 10Min(600000)."; 172 LOG.error(message); 173 throw new YarnException(message); 174 } 175 if (LOG.isDebugEnabled()) { 176 LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :" 177 + durationToTrackStoppedContainers); 178 } 179 super.serviceInit(conf); 180 LOG.info("Initialized nodemanager for " + nodeId + ":" + 181 " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb + 182 " virtual-cores=" + virtualCores); 183 } 184 185 @Override serviceStart()186 protected void serviceStart() throws Exception { 187 188 // NodeManager is the last service to start, so NodeId is available. 189 this.nodeId = this.context.getNodeId(); 190 this.httpPort = this.context.getHttpPort(); 191 this.nodeManagerVersionId = YarnVersionInfo.getVersion(); 192 try { 193 // Registration has to be in start so that ContainerManager can get the 194 // perNM tokens needed to authenticate ContainerTokens. 195 this.resourceTracker = getRMClient(); 196 registerWithRM(); 197 super.serviceStart(); 198 startStatusUpdater(); 199 } catch (Exception e) { 200 String errorMessage = "Unexpected error starting NodeStatusUpdater"; 201 LOG.error(errorMessage, e); 202 throw new YarnRuntimeException(e); 203 } 204 } 205 206 @Override serviceStop()207 protected void serviceStop() throws Exception { 208 // Interrupt the updater. 209 this.isStopped = true; 210 stopRMProxy(); 211 super.serviceStop(); 212 } 213 rebootNodeStatusUpdaterAndRegisterWithRM()214 protected void rebootNodeStatusUpdaterAndRegisterWithRM() { 215 // Interrupt the updater. 216 this.isStopped = true; 217 218 try { 219 statusUpdater.join(); 220 registerWithRM(); 221 statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); 222 this.isStopped = false; 223 statusUpdater.start(); 224 LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); 225 } catch (Exception e) { 226 String errorMessage = "Unexpected error rebooting NodeStatusUpdater"; 227 LOG.error(errorMessage, e); 228 throw new YarnRuntimeException(e); 229 } 230 } 231 232 @VisibleForTesting stopRMProxy()233 protected void stopRMProxy() { 234 if(this.resourceTracker != null) { 235 RPC.stopProxy(this.resourceTracker); 236 } 237 } 238 239 @Private isTokenKeepAliveEnabled(Configuration conf)240 protected boolean isTokenKeepAliveEnabled(Configuration conf) { 241 return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, 242 YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED) 243 && UserGroupInformation.isSecurityEnabled(); 244 } 245 246 @VisibleForTesting getRMClient()247 protected ResourceTracker getRMClient() throws IOException { 248 Configuration conf = getConfig(); 249 return ServerRMProxy.createRMProxy(conf, ResourceTracker.class); 250 } 251 252 @VisibleForTesting registerWithRM()253 protected void registerWithRM() 254 throws YarnException, IOException { 255 List<NMContainerStatus> containerReports = getNMContainerStatuses(); 256 RegisterNodeManagerRequest request = 257 RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, 258 nodeManagerVersionId, containerReports, getRunningApplications()); 259 if (containerReports != null) { 260 LOG.info("Registering with RM using containers :" + containerReports); 261 } 262 RegisterNodeManagerResponse regNMResponse = 263 resourceTracker.registerNodeManager(request); 264 this.rmIdentifier = regNMResponse.getRMIdentifier(); 265 // if the Resourcemanager instructs NM to shutdown. 266 if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) { 267 String message = 268 "Message from ResourceManager: " 269 + regNMResponse.getDiagnosticsMessage(); 270 throw new YarnRuntimeException( 271 "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, " 272 + message); 273 } 274 275 // if ResourceManager version is too old then shutdown 276 if (!minimumResourceManagerVersion.equals("NONE")){ 277 if (minimumResourceManagerVersion.equals("EqualToNM")){ 278 minimumResourceManagerVersion = nodeManagerVersionId; 279 } 280 String rmVersion = regNMResponse.getRMVersion(); 281 if (rmVersion == null) { 282 String message = "The Resource Manager's did not return a version. " 283 + "Valid version cannot be checked."; 284 throw new YarnRuntimeException("Shutting down the Node Manager. " 285 + message); 286 } 287 if (VersionUtil.compareVersions(rmVersion,minimumResourceManagerVersion) < 0) { 288 String message = "The Resource Manager's version (" 289 + rmVersion +") is less than the minimum " 290 + "allowed version " + minimumResourceManagerVersion; 291 throw new YarnRuntimeException("Shutting down the Node Manager on RM " 292 + "version error, " + message); 293 } 294 } 295 MasterKey masterKey = regNMResponse.getContainerTokenMasterKey(); 296 // do this now so that its set before we start heartbeating to RM 297 // It is expected that status updater is started by this point and 298 // RM gives the shared secret in registration during 299 // StatusUpdater#start(). 300 if (masterKey != null) { 301 this.context.getContainerTokenSecretManager().setMasterKey(masterKey); 302 } 303 304 masterKey = regNMResponse.getNMTokenMasterKey(); 305 if (masterKey != null) { 306 this.context.getNMTokenSecretManager().setMasterKey(masterKey); 307 } 308 309 LOG.info("Registered with ResourceManager as " + this.nodeId 310 + " with total resource of " + this.totalResource); 311 LOG.info("Notifying ContainerManager to unblock new container-requests"); 312 ((ContainerManagerImpl) this.context.getContainerManager()) 313 .setBlockNewContainerRequests(false); 314 } 315 createKeepAliveApplicationList()316 private List<ApplicationId> createKeepAliveApplicationList() { 317 if (!tokenKeepAliveEnabled) { 318 return Collections.emptyList(); 319 } 320 321 List<ApplicationId> appList = new ArrayList<ApplicationId>(); 322 for (Iterator<Entry<ApplicationId, Long>> i = 323 this.appTokenKeepAliveMap.entrySet().iterator(); i.hasNext();) { 324 Entry<ApplicationId, Long> e = i.next(); 325 ApplicationId appId = e.getKey(); 326 Long nextKeepAlive = e.getValue(); 327 if (!this.context.getApplications().containsKey(appId)) { 328 // Remove if the application has finished. 329 i.remove(); 330 } else if (System.currentTimeMillis() > nextKeepAlive) { 331 // KeepAlive list for the next hearbeat. 332 appList.add(appId); 333 trackAppForKeepAlive(appId); 334 } 335 } 336 return appList; 337 } 338 getNodeStatus(int responseId)339 private NodeStatus getNodeStatus(int responseId) throws IOException { 340 341 NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); 342 nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); 343 nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); 344 nodeHealthStatus.setLastHealthReportTime(healthChecker 345 .getLastHealthReportTime()); 346 if (LOG.isDebugEnabled()) { 347 LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy() 348 + ", " + nodeHealthStatus.getHealthReport()); 349 } 350 List<ContainerStatus> containersStatuses = getContainerStatuses(); 351 NodeStatus nodeStatus = 352 NodeStatus.newInstance(nodeId, responseId, containersStatuses, 353 createKeepAliveApplicationList(), nodeHealthStatus); 354 355 return nodeStatus; 356 } 357 358 // Iterate through the NMContext and clone and get all the containers' 359 // statuses. If it's a completed container, add into the 360 // recentlyStoppedContainers collections. 361 @VisibleForTesting getContainerStatuses()362 protected List<ContainerStatus> getContainerStatuses() throws IOException { 363 List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>(); 364 for (Container container : this.context.getContainers().values()) { 365 ContainerId containerId = container.getContainerId(); 366 ApplicationId applicationId = containerId.getApplicationAttemptId() 367 .getApplicationId(); 368 org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = 369 container.cloneAndGetContainerStatus(); 370 if (containerStatus.getState() == ContainerState.COMPLETE) { 371 if (isApplicationStopped(applicationId)) { 372 if (LOG.isDebugEnabled()) { 373 LOG.debug(applicationId + " is completing, " + " remove " 374 + containerId + " from NM context."); 375 } 376 context.getContainers().remove(containerId); 377 pendingCompletedContainers.put(containerId, containerStatus); 378 } else { 379 if (!isContainerRecentlyStopped(containerId)) { 380 pendingCompletedContainers.put(containerId, containerStatus); 381 } 382 } 383 // Adding to finished containers cache. Cache will keep it around at 384 // least for #durationToTrackStoppedContainers duration. In the 385 // subsequent call to stop container it will get removed from cache. 386 addCompletedContainer(containerId); 387 } else { 388 containerStatuses.add(containerStatus); 389 } 390 } 391 containerStatuses.addAll(pendingCompletedContainers.values()); 392 if (LOG.isDebugEnabled()) { 393 LOG.debug("Sending out " + containerStatuses.size() 394 + " container statuses: " + containerStatuses); 395 } 396 return containerStatuses; 397 } 398 getRunningApplications()399 private List<ApplicationId> getRunningApplications() { 400 List<ApplicationId> runningApplications = new ArrayList<ApplicationId>(); 401 runningApplications.addAll(this.context.getApplications().keySet()); 402 return runningApplications; 403 } 404 405 // These NMContainerStatus are sent on NM registration and used by YARN only. getNMContainerStatuses()406 private List<NMContainerStatus> getNMContainerStatuses() throws IOException { 407 List<NMContainerStatus> containerStatuses = 408 new ArrayList<NMContainerStatus>(); 409 for (Container container : this.context.getContainers().values()) { 410 ContainerId containerId = container.getContainerId(); 411 ApplicationId applicationId = containerId.getApplicationAttemptId() 412 .getApplicationId(); 413 if (!this.context.getApplications().containsKey(applicationId)) { 414 context.getContainers().remove(containerId); 415 continue; 416 } 417 NMContainerStatus status = 418 container.getNMContainerStatus(); 419 containerStatuses.add(status); 420 if (status.getContainerState() == ContainerState.COMPLETE) { 421 // Adding to finished containers cache. Cache will keep it around at 422 // least for #durationToTrackStoppedContainers duration. In the 423 // subsequent call to stop container it will get removed from cache. 424 addCompletedContainer(containerId); 425 } 426 } 427 LOG.info("Sending out " + containerStatuses.size() 428 + " NM container statuses: " + containerStatuses); 429 return containerStatuses; 430 } 431 isApplicationStopped(ApplicationId applicationId)432 private boolean isApplicationStopped(ApplicationId applicationId) { 433 if (!this.context.getApplications().containsKey(applicationId)) { 434 return true; 435 } 436 437 ApplicationState applicationState = this.context.getApplications().get( 438 applicationId).getApplicationState(); 439 if (applicationState == ApplicationState.FINISHING_CONTAINERS_WAIT 440 || applicationState == ApplicationState.APPLICATION_RESOURCES_CLEANINGUP 441 || applicationState == ApplicationState.FINISHED) { 442 return true; 443 } else { 444 return false; 445 } 446 } 447 448 @Override addCompletedContainer(ContainerId containerId)449 public void addCompletedContainer(ContainerId containerId) { 450 synchronized (recentlyStoppedContainers) { 451 removeVeryOldStoppedContainersFromCache(); 452 if (!recentlyStoppedContainers.containsKey(containerId)) { 453 recentlyStoppedContainers.put(containerId, 454 System.currentTimeMillis() + durationToTrackStoppedContainers); 455 } 456 } 457 } 458 459 @VisibleForTesting 460 @Private removeOrTrackCompletedContainersFromContext( List<ContainerId> containerIds)461 public void removeOrTrackCompletedContainersFromContext( 462 List<ContainerId> containerIds) throws IOException { 463 Set<ContainerId> removedContainers = new HashSet<ContainerId>(); 464 465 pendingContainersToRemove.addAll(containerIds); 466 Iterator<ContainerId> iter = pendingContainersToRemove.iterator(); 467 while (iter.hasNext()) { 468 ContainerId containerId = iter.next(); 469 // remove the container only if the container is at DONE state 470 Container nmContainer = context.getContainers().get(containerId); 471 if (nmContainer == null) { 472 iter.remove(); 473 } else if (nmContainer.getContainerState().equals( 474 org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) { 475 context.getContainers().remove(containerId); 476 removedContainers.add(containerId); 477 iter.remove(); 478 } 479 } 480 481 if (!removedContainers.isEmpty()) { 482 LOG.info("Removed completed containers from NM context: " 483 + removedContainers); 484 } 485 pendingCompletedContainers.clear(); 486 } 487 trackAppsForKeepAlive(List<ApplicationId> appIds)488 private void trackAppsForKeepAlive(List<ApplicationId> appIds) { 489 if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) { 490 for (ApplicationId appId : appIds) { 491 trackAppForKeepAlive(appId); 492 } 493 } 494 } 495 trackAppForKeepAlive(ApplicationId appId)496 private void trackAppForKeepAlive(ApplicationId appId) { 497 // Next keepAlive request for app between 0.7 & 0.9 of when the token will 498 // likely expire. 499 long nextTime = System.currentTimeMillis() 500 + (long) (0.7 * tokenRemovalDelayMs + (0.2 * tokenRemovalDelayMs 501 * keepAliveDelayRandom.nextInt(100))/100); 502 appTokenKeepAliveMap.put(appId, nextTime); 503 } 504 505 @Override sendOutofBandHeartBeat()506 public void sendOutofBandHeartBeat() { 507 synchronized (this.heartbeatMonitor) { 508 this.heartbeatMonitor.notify(); 509 } 510 } 511 isContainerRecentlyStopped(ContainerId containerId)512 public boolean isContainerRecentlyStopped(ContainerId containerId) { 513 synchronized (recentlyStoppedContainers) { 514 return recentlyStoppedContainers.containsKey(containerId); 515 } 516 } 517 518 @Override clearFinishedContainersFromCache()519 public void clearFinishedContainersFromCache() { 520 synchronized (recentlyStoppedContainers) { 521 recentlyStoppedContainers.clear(); 522 } 523 } 524 525 @Private 526 @VisibleForTesting removeVeryOldStoppedContainersFromCache()527 public void removeVeryOldStoppedContainersFromCache() { 528 synchronized (recentlyStoppedContainers) { 529 long currentTime = System.currentTimeMillis(); 530 Iterator<ContainerId> i = 531 recentlyStoppedContainers.keySet().iterator(); 532 while (i.hasNext()) { 533 ContainerId cid = i.next(); 534 if (recentlyStoppedContainers.get(cid) < currentTime) { 535 if (!context.getContainers().containsKey(cid)) { 536 i.remove(); 537 try { 538 context.getNMStateStore().removeContainer(cid); 539 } catch (IOException e) { 540 LOG.error("Unable to remove container " + cid + " in store", e); 541 } 542 } 543 } else { 544 break; 545 } 546 } 547 } 548 } 549 550 @Override getRMIdentifier()551 public long getRMIdentifier() { 552 return this.rmIdentifier; 553 } 554 parseCredentials( Map<ApplicationId, ByteBuffer> systemCredentials)555 private static Map<ApplicationId, Credentials> parseCredentials( 556 Map<ApplicationId, ByteBuffer> systemCredentials) throws IOException { 557 Map<ApplicationId, Credentials> map = 558 new HashMap<ApplicationId, Credentials>(); 559 for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) { 560 Credentials credentials = new Credentials(); 561 DataInputByteBuffer buf = new DataInputByteBuffer(); 562 ByteBuffer buffer = entry.getValue(); 563 buffer.rewind(); 564 buf.reset(buffer); 565 credentials.readTokenStorageStream(buf); 566 map.put(entry.getKey(), credentials); 567 } 568 if (LOG.isDebugEnabled()) { 569 for (Map.Entry<ApplicationId, Credentials> entry : map.entrySet()) { 570 LOG.debug("Retrieved credentials form RM for " + entry.getKey() + ": " 571 + entry.getValue().getAllTokens()); 572 } 573 } 574 return map; 575 } 576 startStatusUpdater()577 protected void startStatusUpdater() { 578 579 statusUpdaterRunnable = new Runnable() { 580 @Override 581 @SuppressWarnings("unchecked") 582 public void run() { 583 int lastHeartBeatID = 0; 584 while (!isStopped) { 585 // Send heartbeat 586 try { 587 NodeHeartbeatResponse response = null; 588 NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID); 589 590 NodeHeartbeatRequest request = 591 NodeHeartbeatRequest.newInstance(nodeStatus, 592 NodeStatusUpdaterImpl.this.context 593 .getContainerTokenSecretManager().getCurrentKey(), 594 NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager() 595 .getCurrentKey()); 596 response = resourceTracker.nodeHeartbeat(request); 597 //get next heartbeat interval from response 598 nextHeartBeatInterval = response.getNextHeartBeatInterval(); 599 updateMasterKeys(response); 600 601 if (response.getNodeAction() == NodeAction.SHUTDOWN) { 602 LOG 603 .warn("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," 604 + " hence shutting down."); 605 LOG.warn("Message from ResourceManager: " 606 + response.getDiagnosticsMessage()); 607 context.setDecommissioned(true); 608 dispatcher.getEventHandler().handle( 609 new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); 610 break; 611 } 612 if (response.getNodeAction() == NodeAction.RESYNC) { 613 LOG.warn("Node is out of sync with ResourceManager," 614 + " hence resyncing."); 615 LOG.warn("Message from ResourceManager: " 616 + response.getDiagnosticsMessage()); 617 // Invalidate the RMIdentifier while resync 618 NodeStatusUpdaterImpl.this.rmIdentifier = 619 ResourceManagerConstants.RM_INVALID_IDENTIFIER; 620 dispatcher.getEventHandler().handle( 621 new NodeManagerEvent(NodeManagerEventType.RESYNC)); 622 pendingCompletedContainers.clear(); 623 break; 624 } 625 626 // Explicitly put this method after checking the resync response. We 627 // don't want to remove the completed containers before resync 628 // because these completed containers will be reported back to RM 629 // when NM re-registers with RM. 630 // Only remove the cleanedup containers that are acked 631 removeOrTrackCompletedContainersFromContext(response 632 .getContainersToBeRemovedFromNM()); 633 634 lastHeartBeatID = response.getResponseId(); 635 List<ContainerId> containersToCleanup = response 636 .getContainersToCleanup(); 637 if (!containersToCleanup.isEmpty()) { 638 dispatcher.getEventHandler().handle( 639 new CMgrCompletedContainersEvent(containersToCleanup, 640 CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER)); 641 } 642 List<ApplicationId> appsToCleanup = 643 response.getApplicationsToCleanup(); 644 //Only start tracking for keepAlive on FINISH_APP 645 trackAppsForKeepAlive(appsToCleanup); 646 if (!appsToCleanup.isEmpty()) { 647 dispatcher.getEventHandler().handle( 648 new CMgrCompletedAppsEvent(appsToCleanup, 649 CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); 650 } 651 652 Map<ApplicationId, ByteBuffer> systemCredentials = 653 response.getSystemCredentialsForApps(); 654 if (systemCredentials != null && !systemCredentials.isEmpty()) { 655 ((NMContext) context) 656 .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); 657 } 658 } catch (ConnectException e) { 659 //catch and throw the exception if tried MAX wait time to connect RM 660 dispatcher.getEventHandler().handle( 661 new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); 662 throw new YarnRuntimeException(e); 663 } catch (Throwable e) { 664 665 // TODO Better error handling. Thread can die with the rest of the 666 // NM still running. 667 LOG.error("Caught exception in status-updater", e); 668 } finally { 669 synchronized (heartbeatMonitor) { 670 nextHeartBeatInterval = nextHeartBeatInterval <= 0 ? 671 YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS : 672 nextHeartBeatInterval; 673 try { 674 heartbeatMonitor.wait(nextHeartBeatInterval); 675 } catch (InterruptedException e) { 676 // Do Nothing 677 } 678 } 679 } 680 } 681 } 682 683 private void updateMasterKeys(NodeHeartbeatResponse response) { 684 // See if the master-key has rolled over 685 MasterKey updatedMasterKey = response.getContainerTokenMasterKey(); 686 if (updatedMasterKey != null) { 687 // Will be non-null only on roll-over on RM side 688 context.getContainerTokenSecretManager().setMasterKey(updatedMasterKey); 689 } 690 691 updatedMasterKey = response.getNMTokenMasterKey(); 692 if (updatedMasterKey != null) { 693 context.getNMTokenSecretManager().setMasterKey(updatedMasterKey); 694 } 695 } 696 }; 697 statusUpdater = 698 new Thread(statusUpdaterRunnable, "Node Status Updater"); 699 statusUpdater.start(); 700 } 701 702 703 } 704