1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.resourcemanager; 20 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.net.InetSocketAddress; 24 import java.util.Map; 25 import java.util.Set; 26 27 import org.apache.commons.logging.Log; 28 import org.apache.commons.logging.LogFactory; 29 import org.apache.hadoop.classification.InterfaceAudience; 30 import org.apache.hadoop.conf.Configuration; 31 import org.apache.hadoop.fs.CommonConfigurationKeysPublic; 32 import org.apache.hadoop.ha.HAServiceProtocol; 33 import org.apache.hadoop.ha.HAServiceStatus; 34 import org.apache.hadoop.ha.HealthCheckFailedException; 35 import org.apache.hadoop.ha.ServiceFailedException; 36 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; 37 import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB; 38 import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB; 39 import org.apache.hadoop.ipc.ProtobufRpcEngine; 40 import org.apache.hadoop.ipc.RPC; 41 import org.apache.hadoop.ipc.RPC.Server; 42 import org.apache.hadoop.ipc.StandbyException; 43 import org.apache.hadoop.security.AccessControlException; 44 import org.apache.hadoop.security.Groups; 45 import org.apache.hadoop.security.UserGroupInformation; 46 import org.apache.hadoop.security.authorize.AccessControlList; 47 import org.apache.hadoop.security.authorize.PolicyProvider; 48 import org.apache.hadoop.security.authorize.ProxyUsers; 49 import org.apache.hadoop.service.CompositeService; 50 import org.apache.hadoop.yarn.api.records.NodeId; 51 import org.apache.hadoop.yarn.api.records.ResourceOption; 52 import org.apache.hadoop.yarn.conf.HAUtil; 53 import org.apache.hadoop.yarn.conf.YarnConfiguration; 54 import org.apache.hadoop.yarn.exceptions.YarnException; 55 import org.apache.hadoop.yarn.factories.RecordFactory; 56 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; 57 import org.apache.hadoop.yarn.ipc.RPCUtil; 58 import org.apache.hadoop.yarn.ipc.YarnRPC; 59 import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer; 60 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; 61 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; 62 import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; 63 import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; 64 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; 65 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; 66 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; 67 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse; 68 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; 69 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse; 70 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; 71 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; 72 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; 73 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; 74 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; 75 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; 76 import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; 77 import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; 78 import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; 79 import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; 80 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; 81 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; 82 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; 83 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; 84 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; 85 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; 86 87 import com.google.common.annotations.VisibleForTesting; 88 import com.google.protobuf.BlockingService; 89 90 public class AdminService extends CompositeService implements 91 HAServiceProtocol, ResourceManagerAdministrationProtocol { 92 93 private static final Log LOG = LogFactory.getLog(AdminService.class); 94 95 private final RMContext rmContext; 96 private final ResourceManager rm; 97 private String rmId; 98 99 private boolean autoFailoverEnabled; 100 private EmbeddedElectorService embeddedElector; 101 102 private Server server; 103 104 // Address to use for binding. May be a wildcard address. 105 private InetSocketAddress masterServiceBindAddress; 106 107 private YarnAuthorizationProvider authorizer; 108 109 private final RecordFactory recordFactory = 110 RecordFactoryProvider.getRecordFactory(null); 111 112 private UserGroupInformation daemonUser; 113 AdminService(ResourceManager rm, RMContext rmContext)114 public AdminService(ResourceManager rm, RMContext rmContext) { 115 super(AdminService.class.getName()); 116 this.rm = rm; 117 this.rmContext = rmContext; 118 } 119 120 @Override serviceInit(Configuration conf)121 public void serviceInit(Configuration conf) throws Exception { 122 if (rmContext.isHAEnabled()) { 123 autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf); 124 if (autoFailoverEnabled) { 125 if (HAUtil.isAutomaticFailoverEmbedded(conf)) { 126 embeddedElector = createEmbeddedElectorService(); 127 addIfService(embeddedElector); 128 } 129 } 130 } 131 132 masterServiceBindAddress = conf.getSocketAddr( 133 YarnConfiguration.RM_BIND_HOST, 134 YarnConfiguration.RM_ADMIN_ADDRESS, 135 YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, 136 YarnConfiguration.DEFAULT_RM_ADMIN_PORT); 137 daemonUser = UserGroupInformation.getCurrentUser(); 138 authorizer = YarnAuthorizationProvider.getInstance(conf); 139 authorizer.setAdmins(getAdminAclList(conf), UserGroupInformation 140 .getCurrentUser()); 141 rmId = conf.get(YarnConfiguration.RM_HA_ID); 142 super.serviceInit(conf); 143 } 144 getAdminAclList(Configuration conf)145 private AccessControlList getAdminAclList(Configuration conf) { 146 AccessControlList aclList = 147 new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL, 148 YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); 149 aclList.addUser(daemonUser.getShortUserName()); 150 return aclList; 151 } 152 153 @Override serviceStart()154 protected void serviceStart() throws Exception { 155 startServer(); 156 super.serviceStart(); 157 } 158 159 @Override serviceStop()160 protected void serviceStop() throws Exception { 161 stopServer(); 162 super.serviceStop(); 163 } 164 startServer()165 protected void startServer() throws Exception { 166 Configuration conf = getConfig(); 167 YarnRPC rpc = YarnRPC.create(conf); 168 this.server = (Server) rpc.getServer( 169 ResourceManagerAdministrationProtocol.class, this, masterServiceBindAddress, 170 conf, null, 171 conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, 172 YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT)); 173 174 // Enable service authorization? 175 if (conf.getBoolean( 176 CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 177 false)) { 178 refreshServiceAcls( 179 getConfiguration(conf, 180 YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE), 181 RMPolicyProvider.getInstance()); 182 } 183 184 if (rmContext.isHAEnabled()) { 185 RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, 186 ProtobufRpcEngine.class); 187 188 HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator = 189 new HAServiceProtocolServerSideTranslatorPB(this); 190 BlockingService haPbService = 191 HAServiceProtocolProtos.HAServiceProtocolService 192 .newReflectiveBlockingService(haServiceProtocolXlator); 193 server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, 194 HAServiceProtocol.class, haPbService); 195 } 196 197 this.server.start(); 198 conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, 199 YarnConfiguration.RM_ADMIN_ADDRESS, 200 YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, 201 server.getListenerAddress()); 202 } 203 stopServer()204 protected void stopServer() throws Exception { 205 if (this.server != null) { 206 this.server.stop(); 207 } 208 } 209 createEmbeddedElectorService()210 protected EmbeddedElectorService createEmbeddedElectorService() { 211 return new EmbeddedElectorService(rmContext); 212 } 213 214 @InterfaceAudience.Private resetLeaderElection()215 void resetLeaderElection() { 216 if (embeddedElector != null) { 217 embeddedElector.resetLeaderElection(); 218 } 219 } 220 checkAccess(String method)221 private UserGroupInformation checkAccess(String method) throws IOException { 222 return RMServerUtils.verifyAdminAccess(authorizer, method, LOG); 223 } 224 checkAcls(String method)225 private UserGroupInformation checkAcls(String method) throws YarnException { 226 try { 227 return checkAccess(method); 228 } catch (IOException ioe) { 229 throw RPCUtil.getRemoteException(ioe); 230 } 231 } 232 233 /** 234 * Check that a request to change this node's HA state is valid. 235 * In particular, verifies that, if auto failover is enabled, non-forced 236 * requests from the HAAdmin CLI are rejected, and vice versa. 237 * 238 * @param req the request to check 239 * @throws AccessControlException if the request is disallowed 240 */ checkHaStateChange(StateChangeRequestInfo req)241 private void checkHaStateChange(StateChangeRequestInfo req) 242 throws AccessControlException { 243 switch (req.getSource()) { 244 case REQUEST_BY_USER: 245 if (autoFailoverEnabled) { 246 throw new AccessControlException( 247 "Manual failover for this ResourceManager is disallowed, " + 248 "because automatic failover is enabled."); 249 } 250 break; 251 case REQUEST_BY_USER_FORCED: 252 if (autoFailoverEnabled) { 253 LOG.warn("Allowing manual failover from " + 254 org.apache.hadoop.ipc.Server.getRemoteAddress() + 255 " even though automatic failover is enabled, because the user " + 256 "specified the force flag"); 257 } 258 break; 259 case REQUEST_BY_ZKFC: 260 if (!autoFailoverEnabled) { 261 throw new AccessControlException( 262 "Request from ZK failover controller at " + 263 org.apache.hadoop.ipc.Server.getRemoteAddress() + " denied " + 264 "since automatic failover is not enabled"); 265 } 266 break; 267 } 268 } 269 isRMActive()270 private synchronized boolean isRMActive() { 271 return HAServiceState.ACTIVE == rmContext.getHAServiceState(); 272 } 273 throwStandbyException()274 private void throwStandbyException() throws StandbyException { 275 throw new StandbyException("ResourceManager " + rmId + " is not Active!"); 276 } 277 278 @Override monitorHealth()279 public synchronized void monitorHealth() 280 throws IOException { 281 checkAccess("monitorHealth"); 282 if (isRMActive() && !rm.areActiveServicesRunning()) { 283 throw new HealthCheckFailedException( 284 "Active ResourceManager services are not running!"); 285 } 286 } 287 288 @SuppressWarnings("unchecked") 289 @Override transitionToActive( HAServiceProtocol.StateChangeRequestInfo reqInfo)290 public synchronized void transitionToActive( 291 HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException { 292 // call refreshAdminAcls before HA state transition 293 // for the case that adminAcls have been updated in previous active RM 294 try { 295 refreshAdminAcls(false); 296 } catch (YarnException ex) { 297 throw new ServiceFailedException("Can not execute refreshAdminAcls", ex); 298 } 299 300 UserGroupInformation user = checkAccess("transitionToActive"); 301 checkHaStateChange(reqInfo); 302 try { 303 rm.transitionToActive(); 304 } catch (Exception e) { 305 RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", 306 "", "RMHAProtocolService", 307 "Exception transitioning to active"); 308 throw new ServiceFailedException( 309 "Error when transitioning to Active mode", e); 310 } 311 try { 312 // call all refresh*s for active RM to get the updated configurations. 313 refreshAll(); 314 } catch (Exception e) { 315 LOG.error("RefreshAll failed so firing fatal event", e); 316 rmContext 317 .getDispatcher() 318 .getEventHandler() 319 .handle( 320 new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e)); 321 throw new ServiceFailedException( 322 "Error on refreshAll during transistion to Active", e); 323 } 324 RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", 325 "RMHAProtocolService"); 326 } 327 328 @Override transitionToStandby( HAServiceProtocol.StateChangeRequestInfo reqInfo)329 public synchronized void transitionToStandby( 330 HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException { 331 // call refreshAdminAcls before HA state transition 332 // for the case that adminAcls have been updated in previous active RM 333 try { 334 refreshAdminAcls(false); 335 } catch (YarnException ex) { 336 throw new ServiceFailedException("Can not execute refreshAdminAcls", ex); 337 } 338 UserGroupInformation user = checkAccess("transitionToStandby"); 339 checkHaStateChange(reqInfo); 340 try { 341 rm.transitionToStandby(true); 342 RMAuditLogger.logSuccess(user.getShortUserName(), 343 "transitionToStandby", "RMHAProtocolService"); 344 } catch (Exception e) { 345 RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby", 346 "", "RMHAProtocolService", 347 "Exception transitioning to standby"); 348 throw new ServiceFailedException( 349 "Error when transitioning to Standby mode", e); 350 } 351 } 352 353 @Override getServiceStatus()354 public synchronized HAServiceStatus getServiceStatus() throws IOException { 355 checkAccess("getServiceState"); 356 HAServiceState haState = rmContext.getHAServiceState(); 357 HAServiceStatus ret = new HAServiceStatus(haState); 358 if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { 359 ret.setReadyToBecomeActive(); 360 } else { 361 ret.setNotReadyToBecomeActive("State is " + haState); 362 } 363 return ret; 364 } 365 366 @Override refreshQueues(RefreshQueuesRequest request)367 public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) 368 throws YarnException, StandbyException { 369 String argName = "refreshQueues"; 370 final String msg = "refresh queues."; 371 UserGroupInformation user = checkAcls(argName); 372 373 checkRMStatus(user.getShortUserName(), argName, msg); 374 375 RefreshQueuesResponse response = 376 recordFactory.newRecordInstance(RefreshQueuesResponse.class); 377 try { 378 rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); 379 // refresh the reservation system 380 ReservationSystem rSystem = rmContext.getReservationSystem(); 381 if (rSystem != null) { 382 rSystem.reinitialize(getConfig(), rmContext); 383 } 384 RMAuditLogger.logSuccess(user.getShortUserName(), argName, 385 "AdminService"); 386 return response; 387 } catch (IOException ioe) { 388 throw logAndWrapException(ioe, user.getShortUserName(), argName, msg); 389 } 390 } 391 392 @Override refreshNodes(RefreshNodesRequest request)393 public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) 394 throws YarnException, StandbyException { 395 String argName = "refreshNodes"; 396 final String msg = "refresh nodes."; 397 UserGroupInformation user = checkAcls("refreshNodes"); 398 399 checkRMStatus(user.getShortUserName(), argName, msg); 400 401 try { 402 Configuration conf = 403 getConfiguration(new Configuration(false), 404 YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); 405 rmContext.getNodesListManager().refreshNodes(conf); 406 RMAuditLogger.logSuccess(user.getShortUserName(), argName, 407 "AdminService"); 408 return recordFactory.newRecordInstance(RefreshNodesResponse.class); 409 } catch (IOException ioe) { 410 throw logAndWrapException(ioe, user.getShortUserName(), argName, msg); 411 } 412 } 413 414 @Override refreshSuperUserGroupsConfiguration( RefreshSuperUserGroupsConfigurationRequest request)415 public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( 416 RefreshSuperUserGroupsConfigurationRequest request) 417 throws YarnException, IOException { 418 String argName = "refreshSuperUserGroupsConfiguration"; 419 UserGroupInformation user = checkAcls(argName); 420 421 checkRMStatus(user.getShortUserName(), argName, "refresh super-user-groups."); 422 423 // Accept hadoop common configs in core-site.xml as well as RM specific 424 // configurations in yarn-site.xml 425 Configuration conf = 426 getConfiguration(new Configuration(false), 427 YarnConfiguration.CORE_SITE_CONFIGURATION_FILE, 428 YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); 429 RMServerUtils.processRMProxyUsersConf(conf); 430 ProxyUsers.refreshSuperUserGroupsConfiguration(conf); 431 RMAuditLogger.logSuccess(user.getShortUserName(), 432 argName, "AdminService"); 433 434 return recordFactory.newRecordInstance( 435 RefreshSuperUserGroupsConfigurationResponse.class); 436 } 437 438 @Override refreshUserToGroupsMappings( RefreshUserToGroupsMappingsRequest request)439 public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( 440 RefreshUserToGroupsMappingsRequest request) 441 throws YarnException, IOException { 442 String argName = "refreshUserToGroupsMappings"; 443 UserGroupInformation user = checkAcls(argName); 444 445 checkRMStatus(user.getShortUserName(), argName, "refresh user-groups."); 446 447 Groups.getUserToGroupsMappingService( 448 getConfiguration(new Configuration(false), 449 YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh(); 450 451 RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService"); 452 453 return recordFactory.newRecordInstance( 454 RefreshUserToGroupsMappingsResponse.class); 455 } 456 457 @Override refreshAdminAcls( RefreshAdminAclsRequest request)458 public RefreshAdminAclsResponse refreshAdminAcls( 459 RefreshAdminAclsRequest request) throws YarnException, IOException { 460 return refreshAdminAcls(true); 461 } 462 refreshAdminAcls(boolean checkRMHAState)463 private RefreshAdminAclsResponse refreshAdminAcls(boolean checkRMHAState) 464 throws YarnException, IOException { 465 String argName = "refreshAdminAcls"; 466 UserGroupInformation user = checkAcls(argName); 467 468 if (checkRMHAState) { 469 checkRMStatus(user.getShortUserName(), argName, "refresh Admin ACLs."); 470 } 471 Configuration conf = 472 getConfiguration(new Configuration(false), 473 YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); 474 authorizer.setAdmins(getAdminAclList(conf), UserGroupInformation 475 .getCurrentUser()); 476 RMAuditLogger.logSuccess(user.getShortUserName(), argName, 477 "AdminService"); 478 479 return recordFactory.newRecordInstance(RefreshAdminAclsResponse.class); 480 } 481 482 @Override refreshServiceAcls( RefreshServiceAclsRequest request)483 public RefreshServiceAclsResponse refreshServiceAcls( 484 RefreshServiceAclsRequest request) throws YarnException, IOException { 485 if (!getConfig().getBoolean( 486 CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 487 false)) { 488 throw RPCUtil.getRemoteException( 489 new IOException("Service Authorization (" + 490 CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION + 491 ") not enabled.")); 492 } 493 494 String argName = "refreshServiceAcls"; 495 UserGroupInformation user = checkAcls(argName); 496 497 checkRMStatus(user.getShortUserName(), argName, "refresh Service ACLs."); 498 499 PolicyProvider policyProvider = RMPolicyProvider.getInstance(); 500 Configuration conf = 501 getConfiguration(new Configuration(false), 502 YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE); 503 504 refreshServiceAcls(conf, policyProvider); 505 rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider); 506 rmContext.getApplicationMasterService().refreshServiceAcls( 507 conf, policyProvider); 508 rmContext.getResourceTrackerService().refreshServiceAcls( 509 conf, policyProvider); 510 511 RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService"); 512 513 return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class); 514 } 515 refreshServiceAcls(Configuration configuration, PolicyProvider policyProvider)516 private synchronized void refreshServiceAcls(Configuration configuration, 517 PolicyProvider policyProvider) { 518 this.server.refreshServiceAclWithLoadedConfiguration(configuration, 519 policyProvider); 520 } 521 522 @Override getGroupsForUser(String user)523 public String[] getGroupsForUser(String user) throws IOException { 524 return UserGroupInformation.createRemoteUser(user).getGroupNames(); 525 } 526 527 @SuppressWarnings("unchecked") 528 @Override updateNodeResource( UpdateNodeResourceRequest request)529 public UpdateNodeResourceResponse updateNodeResource( 530 UpdateNodeResourceRequest request) throws YarnException, IOException { 531 String argName = "updateNodeResource"; 532 UserGroupInformation user = checkAcls(argName); 533 534 checkRMStatus(user.getShortUserName(), argName, "update node resource."); 535 536 Map<NodeId, ResourceOption> nodeResourceMap = request.getNodeResourceMap(); 537 Set<NodeId> nodeIds = nodeResourceMap.keySet(); 538 // verify nodes are all valid first. 539 // if any invalid nodes, throw exception instead of partially updating 540 // valid nodes. 541 for (NodeId nodeId : nodeIds) { 542 RMNode node = this.rmContext.getRMNodes().get(nodeId); 543 if (node == null) { 544 LOG.error("Resource update get failed on all nodes due to change " 545 + "resource on an unrecognized node: " + nodeId); 546 throw RPCUtil.getRemoteException( 547 "Resource update get failed on all nodes due to change resource " 548 + "on an unrecognized node: " + nodeId); 549 } 550 } 551 552 // do resource update on each node. 553 // Notice: it is still possible to have invalid NodeIDs as nodes decommission 554 // may happen just at the same time. This time, only log and skip absent 555 // nodes without throwing any exceptions. 556 boolean allSuccess = true; 557 for (Map.Entry<NodeId, ResourceOption> entry : nodeResourceMap.entrySet()) { 558 ResourceOption newResourceOption = entry.getValue(); 559 NodeId nodeId = entry.getKey(); 560 RMNode node = this.rmContext.getRMNodes().get(nodeId); 561 562 if (node == null) { 563 LOG.warn("Resource update get failed on an unrecognized node: " + nodeId); 564 allSuccess = false; 565 } else { 566 // update resource to RMNode 567 this.rmContext.getDispatcher().getEventHandler() 568 .handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption)); 569 LOG.info("Update resource on node(" + node.getNodeID() 570 + ") with resource(" + newResourceOption.toString() + ")"); 571 572 } 573 } 574 if (allSuccess) { 575 RMAuditLogger.logSuccess(user.getShortUserName(), argName, 576 "AdminService"); 577 } 578 UpdateNodeResourceResponse response = 579 UpdateNodeResourceResponse.newInstance(); 580 return response; 581 } 582 getConfiguration(Configuration conf, String... confFileNames)583 private synchronized Configuration getConfiguration(Configuration conf, 584 String... confFileNames) throws YarnException, IOException { 585 for (String confFileName : confFileNames) { 586 InputStream confFileInputStream = this.rmContext.getConfigurationProvider() 587 .getConfigurationInputStream(conf, confFileName); 588 if (confFileInputStream != null) { 589 conf.addResource(confFileInputStream); 590 } 591 } 592 return conf; 593 } 594 refreshAll()595 private void refreshAll() throws ServiceFailedException { 596 try { 597 refreshQueues(RefreshQueuesRequest.newInstance()); 598 refreshNodes(RefreshNodesRequest.newInstance()); 599 refreshSuperUserGroupsConfiguration( 600 RefreshSuperUserGroupsConfigurationRequest.newInstance()); 601 refreshUserToGroupsMappings( 602 RefreshUserToGroupsMappingsRequest.newInstance()); 603 if (getConfig().getBoolean( 604 CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 605 false)) { 606 refreshServiceAcls(RefreshServiceAclsRequest.newInstance()); 607 } 608 } catch (Exception ex) { 609 throw new ServiceFailedException(ex.getMessage()); 610 } 611 } 612 613 // only for testing 614 @VisibleForTesting getAccessControlList()615 public AccessControlList getAccessControlList() { 616 return ((ConfiguredYarnAuthorizer)authorizer).getAdminAcls(); 617 } 618 619 @VisibleForTesting getServer()620 public Server getServer() { 621 return this.server; 622 } 623 624 @Override addToClusterNodeLabels(AddToClusterNodeLabelsRequest request)625 public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request) 626 throws YarnException, IOException { 627 String argName = "addToClusterNodeLabels"; 628 final String msg = "add labels."; 629 UserGroupInformation user = checkAcls(argName); 630 631 checkRMStatus(user.getShortUserName(), argName, msg); 632 633 AddToClusterNodeLabelsResponse response = 634 recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class); 635 try { 636 rmContext.getNodeLabelManager().addToCluserNodeLabels(request.getNodeLabels()); 637 RMAuditLogger 638 .logSuccess(user.getShortUserName(), argName, "AdminService"); 639 return response; 640 } catch (IOException ioe) { 641 throw logAndWrapException(ioe, user.getShortUserName(), argName, msg); 642 } 643 } 644 645 @Override removeFromClusterNodeLabels( RemoveFromClusterNodeLabelsRequest request)646 public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( 647 RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException { 648 String argName = "removeFromClusterNodeLabels"; 649 final String msg = "remove labels."; 650 UserGroupInformation user = checkAcls(argName); 651 652 checkRMStatus(user.getShortUserName(), argName, msg); 653 654 RemoveFromClusterNodeLabelsResponse response = 655 recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class); 656 try { 657 rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels()); 658 RMAuditLogger 659 .logSuccess(user.getShortUserName(), argName, "AdminService"); 660 return response; 661 } catch (IOException ioe) { 662 throw logAndWrapException(ioe, user.getShortUserName(), argName, msg); 663 } 664 } 665 666 @Override replaceLabelsOnNode( ReplaceLabelsOnNodeRequest request)667 public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( 668 ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { 669 String argName = "replaceLabelsOnNode"; 670 final String msg = "set node to labels."; 671 UserGroupInformation user = checkAcls(argName); 672 673 checkRMStatus(user.getShortUserName(), argName, msg); 674 675 ReplaceLabelsOnNodeResponse response = 676 recordFactory.newRecordInstance(ReplaceLabelsOnNodeResponse.class); 677 try { 678 rmContext.getNodeLabelManager().replaceLabelsOnNode( 679 request.getNodeToLabels()); 680 RMAuditLogger 681 .logSuccess(user.getShortUserName(), argName, "AdminService"); 682 return response; 683 } catch (IOException ioe) { 684 throw logAndWrapException(ioe, user.getShortUserName(), argName, msg); 685 } 686 } 687 checkRMStatus(String user, String argName, String msg)688 private void checkRMStatus(String user, String argName, String msg) 689 throws StandbyException { 690 if (!isRMActive()) { 691 RMAuditLogger.logFailure(user, argName, "", 692 "AdminService", "ResourceManager is not active. Can not " + msg); 693 throwStandbyException(); 694 } 695 } 696 logAndWrapException(IOException ioe, String user, String argName, String msg)697 private YarnException logAndWrapException(IOException ioe, String user, 698 String argName, String msg) throws YarnException { 699 LOG.info("Exception " + msg, ioe); 700 RMAuditLogger.logFailure(user, argName, "", 701 "AdminService", "Exception " + msg); 702 return RPCUtil.getRemoteException(ioe); 703 } 704 getHAZookeeperConnectionState()705 public String getHAZookeeperConnectionState() { 706 if (!rmContext.isHAEnabled()) { 707 return "ResourceManager HA is not enabled."; 708 } else if (!autoFailoverEnabled) { 709 return "Auto Failover is not enabled."; 710 } 711 return this.embeddedElector.getHAZookeeperConnectionState(); 712 } 713 } 714