1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.yarn.server.resourcemanager;
20 
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.net.InetSocketAddress;
24 import java.util.Map;
25 import java.util.Set;
26 
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
32 import org.apache.hadoop.ha.HAServiceProtocol;
33 import org.apache.hadoop.ha.HAServiceStatus;
34 import org.apache.hadoop.ha.HealthCheckFailedException;
35 import org.apache.hadoop.ha.ServiceFailedException;
36 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
37 import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
38 import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
39 import org.apache.hadoop.ipc.ProtobufRpcEngine;
40 import org.apache.hadoop.ipc.RPC;
41 import org.apache.hadoop.ipc.RPC.Server;
42 import org.apache.hadoop.ipc.StandbyException;
43 import org.apache.hadoop.security.AccessControlException;
44 import org.apache.hadoop.security.Groups;
45 import org.apache.hadoop.security.UserGroupInformation;
46 import org.apache.hadoop.security.authorize.AccessControlList;
47 import org.apache.hadoop.security.authorize.PolicyProvider;
48 import org.apache.hadoop.security.authorize.ProxyUsers;
49 import org.apache.hadoop.service.CompositeService;
50 import org.apache.hadoop.yarn.api.records.NodeId;
51 import org.apache.hadoop.yarn.api.records.ResourceOption;
52 import org.apache.hadoop.yarn.conf.HAUtil;
53 import org.apache.hadoop.yarn.conf.YarnConfiguration;
54 import org.apache.hadoop.yarn.exceptions.YarnException;
55 import org.apache.hadoop.yarn.factories.RecordFactory;
56 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
57 import org.apache.hadoop.yarn.ipc.RPCUtil;
58 import org.apache.hadoop.yarn.ipc.YarnRPC;
59 import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer;
60 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
61 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
62 import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
63 import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
64 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
65 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
66 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
67 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
68 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
69 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
70 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
71 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
72 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
73 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
74 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
75 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
76 import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
77 import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
78 import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
79 import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
80 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
81 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
82 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
83 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
84 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
85 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
86 
87 import com.google.common.annotations.VisibleForTesting;
88 import com.google.protobuf.BlockingService;
89 
90 public class AdminService extends CompositeService implements
91     HAServiceProtocol, ResourceManagerAdministrationProtocol {
92 
93   private static final Log LOG = LogFactory.getLog(AdminService.class);
94 
95   private final RMContext rmContext;
96   private final ResourceManager rm;
97   private String rmId;
98 
99   private boolean autoFailoverEnabled;
100   private EmbeddedElectorService embeddedElector;
101 
102   private Server server;
103 
104   // Address to use for binding. May be a wildcard address.
105   private InetSocketAddress masterServiceBindAddress;
106 
107   private YarnAuthorizationProvider authorizer;
108 
109   private final RecordFactory recordFactory =
110     RecordFactoryProvider.getRecordFactory(null);
111 
112   private UserGroupInformation daemonUser;
113 
AdminService(ResourceManager rm, RMContext rmContext)114   public AdminService(ResourceManager rm, RMContext rmContext) {
115     super(AdminService.class.getName());
116     this.rm = rm;
117     this.rmContext = rmContext;
118   }
119 
120   @Override
serviceInit(Configuration conf)121   public void serviceInit(Configuration conf) throws Exception {
122     if (rmContext.isHAEnabled()) {
123       autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
124       if (autoFailoverEnabled) {
125         if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
126           embeddedElector = createEmbeddedElectorService();
127           addIfService(embeddedElector);
128         }
129       }
130     }
131 
132     masterServiceBindAddress = conf.getSocketAddr(
133         YarnConfiguration.RM_BIND_HOST,
134         YarnConfiguration.RM_ADMIN_ADDRESS,
135         YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
136         YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
137     daemonUser = UserGroupInformation.getCurrentUser();
138     authorizer = YarnAuthorizationProvider.getInstance(conf);
139     authorizer.setAdmins(getAdminAclList(conf), UserGroupInformation
140         .getCurrentUser());
141     rmId = conf.get(YarnConfiguration.RM_HA_ID);
142     super.serviceInit(conf);
143   }
144 
getAdminAclList(Configuration conf)145   private AccessControlList getAdminAclList(Configuration conf) {
146     AccessControlList aclList =
147         new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL,
148           YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
149     aclList.addUser(daemonUser.getShortUserName());
150     return aclList;
151   }
152 
153   @Override
serviceStart()154   protected void serviceStart() throws Exception {
155     startServer();
156     super.serviceStart();
157   }
158 
159   @Override
serviceStop()160   protected void serviceStop() throws Exception {
161     stopServer();
162     super.serviceStop();
163   }
164 
startServer()165   protected void startServer() throws Exception {
166     Configuration conf = getConfig();
167     YarnRPC rpc = YarnRPC.create(conf);
168     this.server = (Server) rpc.getServer(
169         ResourceManagerAdministrationProtocol.class, this, masterServiceBindAddress,
170         conf, null,
171         conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
172             YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
173 
174     // Enable service authorization?
175     if (conf.getBoolean(
176         CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
177         false)) {
178       refreshServiceAcls(
179           getConfiguration(conf,
180               YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE),
181           RMPolicyProvider.getInstance());
182     }
183 
184     if (rmContext.isHAEnabled()) {
185       RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
186           ProtobufRpcEngine.class);
187 
188       HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
189           new HAServiceProtocolServerSideTranslatorPB(this);
190       BlockingService haPbService =
191           HAServiceProtocolProtos.HAServiceProtocolService
192               .newReflectiveBlockingService(haServiceProtocolXlator);
193       server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
194           HAServiceProtocol.class, haPbService);
195     }
196 
197     this.server.start();
198     conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
199                            YarnConfiguration.RM_ADMIN_ADDRESS,
200                            YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
201                            server.getListenerAddress());
202   }
203 
stopServer()204   protected void stopServer() throws Exception {
205     if (this.server != null) {
206       this.server.stop();
207     }
208   }
209 
createEmbeddedElectorService()210   protected EmbeddedElectorService createEmbeddedElectorService() {
211     return new EmbeddedElectorService(rmContext);
212   }
213 
214   @InterfaceAudience.Private
resetLeaderElection()215   void resetLeaderElection() {
216     if (embeddedElector != null) {
217       embeddedElector.resetLeaderElection();
218     }
219   }
220 
checkAccess(String method)221   private UserGroupInformation checkAccess(String method) throws IOException {
222     return RMServerUtils.verifyAdminAccess(authorizer, method, LOG);
223   }
224 
checkAcls(String method)225   private UserGroupInformation checkAcls(String method) throws YarnException {
226     try {
227       return checkAccess(method);
228     } catch (IOException ioe) {
229       throw RPCUtil.getRemoteException(ioe);
230     }
231   }
232 
233   /**
234    * Check that a request to change this node's HA state is valid.
235    * In particular, verifies that, if auto failover is enabled, non-forced
236    * requests from the HAAdmin CLI are rejected, and vice versa.
237    *
238    * @param req the request to check
239    * @throws AccessControlException if the request is disallowed
240    */
checkHaStateChange(StateChangeRequestInfo req)241   private void checkHaStateChange(StateChangeRequestInfo req)
242       throws AccessControlException {
243     switch (req.getSource()) {
244       case REQUEST_BY_USER:
245         if (autoFailoverEnabled) {
246           throw new AccessControlException(
247               "Manual failover for this ResourceManager is disallowed, " +
248                   "because automatic failover is enabled.");
249         }
250         break;
251       case REQUEST_BY_USER_FORCED:
252         if (autoFailoverEnabled) {
253           LOG.warn("Allowing manual failover from " +
254               org.apache.hadoop.ipc.Server.getRemoteAddress() +
255               " even though automatic failover is enabled, because the user " +
256               "specified the force flag");
257         }
258         break;
259       case REQUEST_BY_ZKFC:
260         if (!autoFailoverEnabled) {
261           throw new AccessControlException(
262               "Request from ZK failover controller at " +
263                   org.apache.hadoop.ipc.Server.getRemoteAddress() + " denied " +
264                   "since automatic failover is not enabled");
265         }
266         break;
267     }
268   }
269 
isRMActive()270   private synchronized boolean isRMActive() {
271     return HAServiceState.ACTIVE == rmContext.getHAServiceState();
272   }
273 
throwStandbyException()274   private void throwStandbyException() throws StandbyException {
275     throw new StandbyException("ResourceManager " + rmId + " is not Active!");
276   }
277 
278   @Override
monitorHealth()279   public synchronized void monitorHealth()
280       throws IOException {
281     checkAccess("monitorHealth");
282     if (isRMActive() && !rm.areActiveServicesRunning()) {
283       throw new HealthCheckFailedException(
284           "Active ResourceManager services are not running!");
285     }
286   }
287 
288   @SuppressWarnings("unchecked")
289   @Override
transitionToActive( HAServiceProtocol.StateChangeRequestInfo reqInfo)290   public synchronized void transitionToActive(
291       HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
292     // call refreshAdminAcls before HA state transition
293     // for the case that adminAcls have been updated in previous active RM
294     try {
295       refreshAdminAcls(false);
296     } catch (YarnException ex) {
297       throw new ServiceFailedException("Can not execute refreshAdminAcls", ex);
298     }
299 
300     UserGroupInformation user = checkAccess("transitionToActive");
301     checkHaStateChange(reqInfo);
302     try {
303       rm.transitionToActive();
304     } catch (Exception e) {
305       RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
306           "", "RMHAProtocolService",
307           "Exception transitioning to active");
308       throw new ServiceFailedException(
309           "Error when transitioning to Active mode", e);
310     }
311     try {
312       // call all refresh*s for active RM to get the updated configurations.
313       refreshAll();
314     } catch (Exception e) {
315       LOG.error("RefreshAll failed so firing fatal event", e);
316       rmContext
317           .getDispatcher()
318           .getEventHandler()
319           .handle(
320           new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e));
321       throw new ServiceFailedException(
322           "Error on refreshAll during transistion to Active", e);
323     }
324     RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
325         "RMHAProtocolService");
326   }
327 
328   @Override
transitionToStandby( HAServiceProtocol.StateChangeRequestInfo reqInfo)329   public synchronized void transitionToStandby(
330       HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
331     // call refreshAdminAcls before HA state transition
332     // for the case that adminAcls have been updated in previous active RM
333     try {
334       refreshAdminAcls(false);
335     } catch (YarnException ex) {
336       throw new ServiceFailedException("Can not execute refreshAdminAcls", ex);
337     }
338     UserGroupInformation user = checkAccess("transitionToStandby");
339     checkHaStateChange(reqInfo);
340     try {
341       rm.transitionToStandby(true);
342       RMAuditLogger.logSuccess(user.getShortUserName(),
343           "transitionToStandby", "RMHAProtocolService");
344     } catch (Exception e) {
345       RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
346           "", "RMHAProtocolService",
347           "Exception transitioning to standby");
348       throw new ServiceFailedException(
349           "Error when transitioning to Standby mode", e);
350     }
351   }
352 
353   @Override
getServiceStatus()354   public synchronized HAServiceStatus getServiceStatus() throws IOException {
355     checkAccess("getServiceState");
356     HAServiceState haState = rmContext.getHAServiceState();
357     HAServiceStatus ret = new HAServiceStatus(haState);
358     if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
359       ret.setReadyToBecomeActive();
360     } else {
361       ret.setNotReadyToBecomeActive("State is " + haState);
362     }
363     return ret;
364   }
365 
366   @Override
refreshQueues(RefreshQueuesRequest request)367   public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
368       throws YarnException, StandbyException {
369     String argName = "refreshQueues";
370     final String msg = "refresh queues.";
371     UserGroupInformation user = checkAcls(argName);
372 
373     checkRMStatus(user.getShortUserName(), argName, msg);
374 
375     RefreshQueuesResponse response =
376         recordFactory.newRecordInstance(RefreshQueuesResponse.class);
377     try {
378       rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
379       // refresh the reservation system
380       ReservationSystem rSystem = rmContext.getReservationSystem();
381       if (rSystem != null) {
382         rSystem.reinitialize(getConfig(), rmContext);
383       }
384       RMAuditLogger.logSuccess(user.getShortUserName(), argName,
385           "AdminService");
386       return response;
387     } catch (IOException ioe) {
388       throw logAndWrapException(ioe, user.getShortUserName(), argName, msg);
389     }
390   }
391 
392   @Override
refreshNodes(RefreshNodesRequest request)393   public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
394       throws YarnException, StandbyException {
395     String argName = "refreshNodes";
396     final String msg = "refresh nodes.";
397     UserGroupInformation user = checkAcls("refreshNodes");
398 
399     checkRMStatus(user.getShortUserName(), argName, msg);
400 
401     try {
402       Configuration conf =
403           getConfiguration(new Configuration(false),
404               YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
405       rmContext.getNodesListManager().refreshNodes(conf);
406       RMAuditLogger.logSuccess(user.getShortUserName(), argName,
407           "AdminService");
408       return recordFactory.newRecordInstance(RefreshNodesResponse.class);
409     } catch (IOException ioe) {
410       throw logAndWrapException(ioe, user.getShortUserName(), argName, msg);
411     }
412   }
413 
414   @Override
refreshSuperUserGroupsConfiguration( RefreshSuperUserGroupsConfigurationRequest request)415   public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
416       RefreshSuperUserGroupsConfigurationRequest request)
417       throws YarnException, IOException {
418     String argName = "refreshSuperUserGroupsConfiguration";
419     UserGroupInformation user = checkAcls(argName);
420 
421     checkRMStatus(user.getShortUserName(), argName, "refresh super-user-groups.");
422 
423     // Accept hadoop common configs in core-site.xml as well as RM specific
424     // configurations in yarn-site.xml
425     Configuration conf =
426         getConfiguration(new Configuration(false),
427             YarnConfiguration.CORE_SITE_CONFIGURATION_FILE,
428             YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
429     RMServerUtils.processRMProxyUsersConf(conf);
430     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
431     RMAuditLogger.logSuccess(user.getShortUserName(),
432         argName, "AdminService");
433 
434     return recordFactory.newRecordInstance(
435         RefreshSuperUserGroupsConfigurationResponse.class);
436   }
437 
438   @Override
refreshUserToGroupsMappings( RefreshUserToGroupsMappingsRequest request)439   public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
440       RefreshUserToGroupsMappingsRequest request)
441       throws YarnException, IOException {
442     String argName = "refreshUserToGroupsMappings";
443     UserGroupInformation user = checkAcls(argName);
444 
445     checkRMStatus(user.getShortUserName(), argName, "refresh user-groups.");
446 
447     Groups.getUserToGroupsMappingService(
448         getConfiguration(new Configuration(false),
449             YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh();
450 
451     RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService");
452 
453     return recordFactory.newRecordInstance(
454         RefreshUserToGroupsMappingsResponse.class);
455   }
456 
457   @Override
refreshAdminAcls( RefreshAdminAclsRequest request)458   public RefreshAdminAclsResponse refreshAdminAcls(
459       RefreshAdminAclsRequest request) throws YarnException, IOException {
460     return refreshAdminAcls(true);
461   }
462 
refreshAdminAcls(boolean checkRMHAState)463   private RefreshAdminAclsResponse refreshAdminAcls(boolean checkRMHAState)
464       throws YarnException, IOException {
465     String argName = "refreshAdminAcls";
466     UserGroupInformation user = checkAcls(argName);
467 
468     if (checkRMHAState) {
469       checkRMStatus(user.getShortUserName(), argName, "refresh Admin ACLs.");
470     }
471     Configuration conf =
472         getConfiguration(new Configuration(false),
473             YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
474     authorizer.setAdmins(getAdminAclList(conf), UserGroupInformation
475         .getCurrentUser());
476     RMAuditLogger.logSuccess(user.getShortUserName(), argName,
477         "AdminService");
478 
479     return recordFactory.newRecordInstance(RefreshAdminAclsResponse.class);
480   }
481 
482   @Override
refreshServiceAcls( RefreshServiceAclsRequest request)483   public RefreshServiceAclsResponse refreshServiceAcls(
484       RefreshServiceAclsRequest request) throws YarnException, IOException {
485     if (!getConfig().getBoolean(
486              CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
487              false)) {
488       throw RPCUtil.getRemoteException(
489           new IOException("Service Authorization (" +
490               CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION +
491               ") not enabled."));
492     }
493 
494     String argName = "refreshServiceAcls";
495     UserGroupInformation user = checkAcls(argName);
496 
497     checkRMStatus(user.getShortUserName(), argName, "refresh Service ACLs.");
498 
499     PolicyProvider policyProvider = RMPolicyProvider.getInstance();
500     Configuration conf =
501         getConfiguration(new Configuration(false),
502             YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
503 
504     refreshServiceAcls(conf, policyProvider);
505     rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
506     rmContext.getApplicationMasterService().refreshServiceAcls(
507         conf, policyProvider);
508     rmContext.getResourceTrackerService().refreshServiceAcls(
509         conf, policyProvider);
510 
511     RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService");
512 
513     return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
514   }
515 
refreshServiceAcls(Configuration configuration, PolicyProvider policyProvider)516   private synchronized void refreshServiceAcls(Configuration configuration,
517       PolicyProvider policyProvider) {
518     this.server.refreshServiceAclWithLoadedConfiguration(configuration,
519         policyProvider);
520   }
521 
522   @Override
getGroupsForUser(String user)523   public String[] getGroupsForUser(String user) throws IOException {
524     return UserGroupInformation.createRemoteUser(user).getGroupNames();
525   }
526 
527   @SuppressWarnings("unchecked")
528   @Override
updateNodeResource( UpdateNodeResourceRequest request)529   public UpdateNodeResourceResponse updateNodeResource(
530       UpdateNodeResourceRequest request) throws YarnException, IOException {
531     String argName = "updateNodeResource";
532     UserGroupInformation user = checkAcls(argName);
533 
534     checkRMStatus(user.getShortUserName(), argName, "update node resource.");
535 
536     Map<NodeId, ResourceOption> nodeResourceMap = request.getNodeResourceMap();
537     Set<NodeId> nodeIds = nodeResourceMap.keySet();
538     // verify nodes are all valid first.
539     // if any invalid nodes, throw exception instead of partially updating
540     // valid nodes.
541     for (NodeId nodeId : nodeIds) {
542       RMNode node = this.rmContext.getRMNodes().get(nodeId);
543       if (node == null) {
544         LOG.error("Resource update get failed on all nodes due to change "
545             + "resource on an unrecognized node: " + nodeId);
546         throw RPCUtil.getRemoteException(
547             "Resource update get failed on all nodes due to change resource "
548                 + "on an unrecognized node: " + nodeId);
549       }
550     }
551 
552     // do resource update on each node.
553     // Notice: it is still possible to have invalid NodeIDs as nodes decommission
554     // may happen just at the same time. This time, only log and skip absent
555     // nodes without throwing any exceptions.
556     boolean allSuccess = true;
557     for (Map.Entry<NodeId, ResourceOption> entry : nodeResourceMap.entrySet()) {
558       ResourceOption newResourceOption = entry.getValue();
559       NodeId nodeId = entry.getKey();
560       RMNode node = this.rmContext.getRMNodes().get(nodeId);
561 
562       if (node == null) {
563         LOG.warn("Resource update get failed on an unrecognized node: " + nodeId);
564         allSuccess = false;
565       } else {
566         // update resource to RMNode
567         this.rmContext.getDispatcher().getEventHandler()
568           .handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption));
569         LOG.info("Update resource on node(" + node.getNodeID()
570             + ") with resource(" + newResourceOption.toString() + ")");
571 
572       }
573     }
574     if (allSuccess) {
575       RMAuditLogger.logSuccess(user.getShortUserName(), argName,
576           "AdminService");
577     }
578     UpdateNodeResourceResponse response =
579         UpdateNodeResourceResponse.newInstance();
580     return response;
581   }
582 
getConfiguration(Configuration conf, String... confFileNames)583   private synchronized Configuration getConfiguration(Configuration conf,
584       String... confFileNames) throws YarnException, IOException {
585     for (String confFileName : confFileNames) {
586       InputStream confFileInputStream = this.rmContext.getConfigurationProvider()
587           .getConfigurationInputStream(conf, confFileName);
588       if (confFileInputStream != null) {
589         conf.addResource(confFileInputStream);
590       }
591     }
592     return conf;
593   }
594 
refreshAll()595   private void refreshAll() throws ServiceFailedException {
596     try {
597       refreshQueues(RefreshQueuesRequest.newInstance());
598       refreshNodes(RefreshNodesRequest.newInstance());
599       refreshSuperUserGroupsConfiguration(
600           RefreshSuperUserGroupsConfigurationRequest.newInstance());
601       refreshUserToGroupsMappings(
602           RefreshUserToGroupsMappingsRequest.newInstance());
603       if (getConfig().getBoolean(
604           CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
605           false)) {
606         refreshServiceAcls(RefreshServiceAclsRequest.newInstance());
607       }
608     } catch (Exception ex) {
609       throw new ServiceFailedException(ex.getMessage());
610     }
611   }
612 
613   // only for testing
614   @VisibleForTesting
getAccessControlList()615   public AccessControlList getAccessControlList() {
616     return ((ConfiguredYarnAuthorizer)authorizer).getAdminAcls();
617   }
618 
619   @VisibleForTesting
getServer()620   public Server getServer() {
621     return this.server;
622   }
623 
624   @Override
addToClusterNodeLabels(AddToClusterNodeLabelsRequest request)625   public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request)
626       throws YarnException, IOException {
627     String argName = "addToClusterNodeLabels";
628     final String msg = "add labels.";
629     UserGroupInformation user = checkAcls(argName);
630 
631     checkRMStatus(user.getShortUserName(), argName, msg);
632 
633     AddToClusterNodeLabelsResponse response =
634         recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class);
635     try {
636       rmContext.getNodeLabelManager().addToCluserNodeLabels(request.getNodeLabels());
637       RMAuditLogger
638           .logSuccess(user.getShortUserName(), argName, "AdminService");
639       return response;
640     } catch (IOException ioe) {
641       throw logAndWrapException(ioe, user.getShortUserName(), argName, msg);
642     }
643   }
644 
645   @Override
removeFromClusterNodeLabels( RemoveFromClusterNodeLabelsRequest request)646   public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
647       RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException {
648     String argName = "removeFromClusterNodeLabels";
649     final String msg = "remove labels.";
650     UserGroupInformation user = checkAcls(argName);
651 
652     checkRMStatus(user.getShortUserName(), argName, msg);
653 
654     RemoveFromClusterNodeLabelsResponse response =
655         recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class);
656     try {
657       rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels());
658       RMAuditLogger
659           .logSuccess(user.getShortUserName(), argName, "AdminService");
660       return response;
661     } catch (IOException ioe) {
662       throw logAndWrapException(ioe, user.getShortUserName(), argName, msg);
663     }
664   }
665 
666   @Override
replaceLabelsOnNode( ReplaceLabelsOnNodeRequest request)667   public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
668       ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
669     String argName = "replaceLabelsOnNode";
670     final String msg = "set node to labels.";
671     UserGroupInformation user = checkAcls(argName);
672 
673     checkRMStatus(user.getShortUserName(), argName, msg);
674 
675     ReplaceLabelsOnNodeResponse response =
676         recordFactory.newRecordInstance(ReplaceLabelsOnNodeResponse.class);
677     try {
678       rmContext.getNodeLabelManager().replaceLabelsOnNode(
679           request.getNodeToLabels());
680       RMAuditLogger
681           .logSuccess(user.getShortUserName(), argName, "AdminService");
682       return response;
683     } catch (IOException ioe) {
684       throw logAndWrapException(ioe, user.getShortUserName(), argName, msg);
685     }
686   }
687 
checkRMStatus(String user, String argName, String msg)688   private void checkRMStatus(String user, String argName, String msg)
689       throws StandbyException {
690     if (!isRMActive()) {
691       RMAuditLogger.logFailure(user, argName, "",
692           "AdminService", "ResourceManager is not active. Can not " + msg);
693       throwStandbyException();
694     }
695   }
696 
logAndWrapException(IOException ioe, String user, String argName, String msg)697   private YarnException logAndWrapException(IOException ioe, String user,
698       String argName, String msg) throws YarnException {
699     LOG.info("Exception " + msg, ioe);
700     RMAuditLogger.logFailure(user, argName, "",
701         "AdminService", "Exception " + msg);
702     return RPCUtil.getRemoteException(ioe);
703   }
704 
getHAZookeeperConnectionState()705   public String getHAZookeeperConnectionState() {
706     if (!rmContext.isHAEnabled()) {
707       return "ResourceManager HA is not enabled.";
708     } else if (!autoFailoverEnabled) {
709       return "Auto Failover is not enabled.";
710     }
711     return this.embeddedElector.getHAZookeeperConnectionState();
712   }
713 }
714