1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.resourcemanager.webapp; 20 21 import java.io.IOException; 22 import java.lang.reflect.UndeclaredThrowableException; 23 import java.security.AccessControlException; 24 import java.nio.ByteBuffer; 25 import java.security.Principal; 26 import java.security.PrivilegedExceptionAction; 27 import java.util.Arrays; 28 import java.util.Collection; 29 import java.util.EnumSet; 30 import java.util.HashMap; 31 import java.util.HashSet; 32 import java.util.List; 33 import java.util.Map; 34 import java.util.Set; 35 import java.util.concurrent.ConcurrentMap; 36 37 import javax.servlet.http.HttpServletRequest; 38 import javax.servlet.http.HttpServletResponse; 39 import javax.ws.rs.Consumes; 40 import javax.ws.rs.DELETE; 41 import javax.ws.rs.GET; 42 import javax.ws.rs.POST; 43 import javax.ws.rs.PUT; 44 import javax.ws.rs.Path; 45 import javax.ws.rs.PathParam; 46 import javax.ws.rs.Produces; 47 import javax.ws.rs.QueryParam; 48 import javax.ws.rs.core.Context; 49 import javax.ws.rs.core.HttpHeaders; 50 import javax.ws.rs.core.MediaType; 51 import javax.ws.rs.core.Response; 52 import javax.ws.rs.core.Response.Status; 53 54 import org.apache.commons.codec.binary.Base64; 55 import org.apache.commons.logging.Log; 56 import org.apache.commons.logging.LogFactory; 57 import org.apache.hadoop.conf.Configuration; 58 import org.apache.hadoop.fs.CommonConfigurationKeys; 59 import org.apache.hadoop.io.DataOutputBuffer; 60 import org.apache.hadoop.io.Text; 61 import org.apache.hadoop.security.Credentials; 62 import org.apache.hadoop.security.UserGroupInformation; 63 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 64 import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; 65 import org.apache.hadoop.security.authorize.AuthorizationException; 66 import org.apache.hadoop.security.token.Token; 67 import org.apache.hadoop.security.token.TokenIdentifier; 68 import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler; 69 import org.apache.hadoop.util.StringUtils; 70 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; 71 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; 72 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; 73 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; 74 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; 75 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; 76 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; 77 import org.apache.hadoop.security.token.SecretManager.InvalidToken; 78 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; 79 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; 80 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; 81 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; 82 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; 83 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; 84 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; 85 import org.apache.hadoop.yarn.api.records.ApplicationAccessType; 86 import org.apache.hadoop.yarn.api.records.ApplicationId; 87 import org.apache.hadoop.yarn.api.records.ApplicationReport; 88 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; 89 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 90 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 91 import org.apache.hadoop.yarn.api.records.LocalResource; 92 import org.apache.hadoop.yarn.api.records.NodeId; 93 import org.apache.hadoop.yarn.api.records.NodeState; 94 import org.apache.hadoop.yarn.api.records.Priority; 95 import org.apache.hadoop.yarn.api.records.QueueACL; 96 import org.apache.hadoop.yarn.api.records.Resource; 97 import org.apache.hadoop.yarn.api.records.YarnApplicationState; 98 import org.apache.hadoop.yarn.conf.YarnConfiguration; 99 import org.apache.hadoop.yarn.exceptions.YarnException; 100 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 101 import org.apache.hadoop.yarn.factories.RecordFactory; 102 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; 103 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; 104 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; 105 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; 106 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; 107 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; 108 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; 109 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; 110 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; 111 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; 112 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; 113 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; 114 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; 115 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; 116 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; 117 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; 118 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication; 119 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; 120 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState; 121 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; 122 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo; 123 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; 124 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo; 125 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; 126 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo; 127 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; 128 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo; 129 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken; 130 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo; 131 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo; 132 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo; 133 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo; 134 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; 135 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; 136 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo; 137 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; 138 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; 139 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; 140 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; 141 import org.apache.hadoop.yarn.server.utils.BuilderUtils; 142 import org.apache.hadoop.yarn.util.ConverterUtils; 143 import org.apache.hadoop.yarn.webapp.BadRequestException; 144 import org.apache.hadoop.yarn.webapp.NotFoundException; 145 import org.apache.hadoop.yarn.webapp.util.WebAppUtils; 146 147 import com.google.inject.Inject; 148 import com.google.inject.Singleton; 149 150 @Singleton 151 @Path("/ws/v1/cluster") 152 public class RMWebServices { 153 private static final Log LOG = 154 LogFactory.getLog(RMWebServices.class.getName()); 155 private static final String EMPTY = ""; 156 private static final String ANY = "*"; 157 private final ResourceManager rm; 158 private static RecordFactory recordFactory = RecordFactoryProvider 159 .getRecordFactory(null); 160 private final Configuration conf; 161 private @Context HttpServletResponse response; 162 163 public final static String DELEGATION_TOKEN_HEADER = 164 "Hadoop-YARN-RM-Delegation-Token"; 165 166 @Inject RMWebServices(final ResourceManager rm, Configuration conf)167 public RMWebServices(final ResourceManager rm, Configuration conf) { 168 this.rm = rm; 169 this.conf = conf; 170 } 171 RMWebServices(ResourceManager rm, Configuration conf, HttpServletResponse response)172 RMWebServices(ResourceManager rm, Configuration conf, 173 HttpServletResponse response) { 174 this(rm, conf); 175 this.response = response; 176 } 177 hasAccess(RMApp app, HttpServletRequest hsr)178 protected Boolean hasAccess(RMApp app, HttpServletRequest hsr) { 179 // Check for the authorization. 180 UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); 181 if (callerUGI != null 182 && !(this.rm.getApplicationACLsManager().checkAccess(callerUGI, 183 ApplicationAccessType.VIEW_APP, app.getUser(), 184 app.getApplicationId()) || 185 this.rm.getQueueACLsManager().checkAccess(callerUGI, 186 QueueACL.ADMINISTER_QUEUE, app.getQueue()))) { 187 return false; 188 } 189 return true; 190 } 191 init()192 private void init() { 193 //clear content type 194 response.setContentType(null); 195 } 196 197 @GET 198 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) get()199 public ClusterInfo get() { 200 return getClusterInfo(); 201 } 202 203 @GET 204 @Path("/info") 205 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getClusterInfo()206 public ClusterInfo getClusterInfo() { 207 init(); 208 return new ClusterInfo(this.rm); 209 } 210 211 @GET 212 @Path("/metrics") 213 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getClusterMetricsInfo()214 public ClusterMetricsInfo getClusterMetricsInfo() { 215 init(); 216 return new ClusterMetricsInfo(this.rm); 217 } 218 219 @GET 220 @Path("/scheduler") 221 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getSchedulerInfo()222 public SchedulerTypeInfo getSchedulerInfo() { 223 init(); 224 ResourceScheduler rs = rm.getResourceScheduler(); 225 SchedulerInfo sinfo; 226 if (rs instanceof CapacityScheduler) { 227 CapacityScheduler cs = (CapacityScheduler) rs; 228 CSQueue root = cs.getRootQueue(); 229 sinfo = new CapacitySchedulerInfo(root); 230 } else if (rs instanceof FairScheduler) { 231 FairScheduler fs = (FairScheduler) rs; 232 sinfo = new FairSchedulerInfo(fs); 233 } else if (rs instanceof FifoScheduler) { 234 sinfo = new FifoSchedulerInfo(this.rm); 235 } else { 236 throw new NotFoundException("Unknown scheduler configured"); 237 } 238 return new SchedulerTypeInfo(sinfo); 239 } 240 241 /** 242 * Returns all nodes in the cluster. If the states param is given, returns 243 * all nodes that are in the comma-separated list of states. 244 */ 245 @GET 246 @Path("/nodes") 247 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getNodes(@ueryParamR) String states)248 public NodesInfo getNodes(@QueryParam("states") String states) { 249 init(); 250 ResourceScheduler sched = this.rm.getResourceScheduler(); 251 if (sched == null) { 252 throw new NotFoundException("Null ResourceScheduler instance"); 253 } 254 255 EnumSet<NodeState> acceptedStates; 256 if (states == null) { 257 acceptedStates = EnumSet.allOf(NodeState.class); 258 } else { 259 acceptedStates = EnumSet.noneOf(NodeState.class); 260 for (String stateStr : states.split(",")) { 261 acceptedStates.add( 262 NodeState.valueOf(StringUtils.toUpperCase(stateStr))); 263 } 264 } 265 266 Collection<RMNode> rmNodes = RMServerUtils.queryRMNodes(this.rm.getRMContext(), 267 acceptedStates); 268 NodesInfo nodesInfo = new NodesInfo(); 269 for (RMNode rmNode : rmNodes) { 270 NodeInfo nodeInfo = new NodeInfo(rmNode, sched); 271 if (EnumSet.of(NodeState.LOST, NodeState.DECOMMISSIONED, NodeState.REBOOTED) 272 .contains(rmNode.getState())) { 273 nodeInfo.setNodeHTTPAddress(EMPTY); 274 } 275 nodesInfo.add(nodeInfo); 276 } 277 278 return nodesInfo; 279 } 280 281 @GET 282 @Path("/nodes/{nodeId}") 283 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getNode(@athParamR) String nodeId)284 public NodeInfo getNode(@PathParam("nodeId") String nodeId) { 285 init(); 286 if (nodeId == null || nodeId.isEmpty()) { 287 throw new NotFoundException("nodeId, " + nodeId + ", is empty or null"); 288 } 289 ResourceScheduler sched = this.rm.getResourceScheduler(); 290 if (sched == null) { 291 throw new NotFoundException("Null ResourceScheduler instance"); 292 } 293 NodeId nid = ConverterUtils.toNodeId(nodeId); 294 RMNode ni = this.rm.getRMContext().getRMNodes().get(nid); 295 boolean isInactive = false; 296 if (ni == null) { 297 ni = this.rm.getRMContext().getInactiveRMNodes().get(nid.getHost()); 298 if (ni == null) { 299 throw new NotFoundException("nodeId, " + nodeId + ", is not found"); 300 } 301 isInactive = true; 302 } 303 NodeInfo nodeInfo = new NodeInfo(ni, sched); 304 if (isInactive) { 305 nodeInfo.setNodeHTTPAddress(EMPTY); 306 } 307 return nodeInfo; 308 } 309 310 @GET 311 @Path("/apps") 312 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getApps(@ontext HttpServletRequest hsr, @QueryParam(R) String stateQuery, @QueryParam(R) Set<String> statesQuery, @QueryParam(R) String finalStatusQuery, @QueryParam(R) String userQuery, @QueryParam(R) String queueQuery, @QueryParam(R) String count, @QueryParam(R) String startedBegin, @QueryParam(R) String startedEnd, @QueryParam(R) String finishBegin, @QueryParam(R) String finishEnd, @QueryParam(R) Set<String> applicationTypes, @QueryParam(R) Set<String> applicationTags)313 public AppsInfo getApps(@Context HttpServletRequest hsr, 314 @QueryParam("state") String stateQuery, 315 @QueryParam("states") Set<String> statesQuery, 316 @QueryParam("finalStatus") String finalStatusQuery, 317 @QueryParam("user") String userQuery, 318 @QueryParam("queue") String queueQuery, 319 @QueryParam("limit") String count, 320 @QueryParam("startedTimeBegin") String startedBegin, 321 @QueryParam("startedTimeEnd") String startedEnd, 322 @QueryParam("finishedTimeBegin") String finishBegin, 323 @QueryParam("finishedTimeEnd") String finishEnd, 324 @QueryParam("applicationTypes") Set<String> applicationTypes, 325 @QueryParam("applicationTags") Set<String> applicationTags) { 326 boolean checkCount = false; 327 boolean checkStart = false; 328 boolean checkEnd = false; 329 boolean checkAppTypes = false; 330 boolean checkAppStates = false; 331 boolean checkAppTags = false; 332 long countNum = 0; 333 334 // set values suitable in case both of begin/end not specified 335 long sBegin = 0; 336 long sEnd = Long.MAX_VALUE; 337 long fBegin = 0; 338 long fEnd = Long.MAX_VALUE; 339 340 init(); 341 if (count != null && !count.isEmpty()) { 342 checkCount = true; 343 countNum = Long.parseLong(count); 344 if (countNum <= 0) { 345 throw new BadRequestException("limit value must be greater then 0"); 346 } 347 } 348 349 if (startedBegin != null && !startedBegin.isEmpty()) { 350 checkStart = true; 351 sBegin = Long.parseLong(startedBegin); 352 if (sBegin < 0) { 353 throw new BadRequestException("startedTimeBegin must be greater than 0"); 354 } 355 } 356 if (startedEnd != null && !startedEnd.isEmpty()) { 357 checkStart = true; 358 sEnd = Long.parseLong(startedEnd); 359 if (sEnd < 0) { 360 throw new BadRequestException("startedTimeEnd must be greater than 0"); 361 } 362 } 363 if (sBegin > sEnd) { 364 throw new BadRequestException( 365 "startedTimeEnd must be greater than startTimeBegin"); 366 } 367 368 if (finishBegin != null && !finishBegin.isEmpty()) { 369 checkEnd = true; 370 fBegin = Long.parseLong(finishBegin); 371 if (fBegin < 0) { 372 throw new BadRequestException("finishTimeBegin must be greater than 0"); 373 } 374 } 375 if (finishEnd != null && !finishEnd.isEmpty()) { 376 checkEnd = true; 377 fEnd = Long.parseLong(finishEnd); 378 if (fEnd < 0) { 379 throw new BadRequestException("finishTimeEnd must be greater than 0"); 380 } 381 } 382 if (fBegin > fEnd) { 383 throw new BadRequestException( 384 "finishTimeEnd must be greater than finishTimeBegin"); 385 } 386 387 Set<String> appTypes = parseQueries(applicationTypes, false); 388 if (!appTypes.isEmpty()) { 389 checkAppTypes = true; 390 } 391 392 Set<String> appTags = parseQueries(applicationTags, false); 393 if (!appTags.isEmpty()) { 394 checkAppTags = true; 395 } 396 397 // stateQuery is deprecated. 398 if (stateQuery != null && !stateQuery.isEmpty()) { 399 statesQuery.add(stateQuery); 400 } 401 Set<String> appStates = parseQueries(statesQuery, true); 402 if (!appStates.isEmpty()) { 403 checkAppStates = true; 404 } 405 406 GetApplicationsRequest request = GetApplicationsRequest.newInstance(); 407 408 if (checkStart) { 409 request.setStartRange(sBegin, sEnd); 410 } 411 412 if (checkEnd) { 413 request.setFinishRange(fBegin, fEnd); 414 } 415 416 if (checkCount) { 417 request.setLimit(countNum); 418 } 419 420 if (checkAppTypes) { 421 request.setApplicationTypes(appTypes); 422 } 423 424 if (checkAppTags) { 425 request.setApplicationTags(appTags); 426 } 427 428 if (checkAppStates) { 429 request.setApplicationStates(appStates); 430 } 431 432 if (queueQuery != null && !queueQuery.isEmpty()) { 433 ResourceScheduler rs = rm.getResourceScheduler(); 434 if (rs instanceof CapacityScheduler) { 435 CapacityScheduler cs = (CapacityScheduler) rs; 436 // validate queue exists 437 try { 438 cs.getQueueInfo(queueQuery, false, false); 439 } catch (IOException e) { 440 throw new BadRequestException(e.getMessage()); 441 } 442 } 443 Set<String> queues = new HashSet<String>(1); 444 queues.add(queueQuery); 445 request.setQueues(queues); 446 } 447 448 if (userQuery != null && !userQuery.isEmpty()) { 449 Set<String> users = new HashSet<String>(1); 450 users.add(userQuery); 451 request.setUsers(users); 452 } 453 454 List<ApplicationReport> appReports = null; 455 try { 456 appReports = rm.getClientRMService() 457 .getApplications(request, false).getApplicationList(); 458 } catch (YarnException e) { 459 LOG.error("Unable to retrieve apps from ClientRMService", e); 460 throw new YarnRuntimeException( 461 "Unable to retrieve apps from ClientRMService", e); 462 } 463 464 final ConcurrentMap<ApplicationId, RMApp> apps = 465 rm.getRMContext().getRMApps(); 466 AppsInfo allApps = new AppsInfo(); 467 for (ApplicationReport report : appReports) { 468 RMApp rmapp = apps.get(report.getApplicationId()); 469 if (rmapp == null) { 470 continue; 471 } 472 473 if (finalStatusQuery != null && !finalStatusQuery.isEmpty()) { 474 FinalApplicationStatus.valueOf(finalStatusQuery); 475 if (!rmapp.getFinalApplicationStatus().toString() 476 .equalsIgnoreCase(finalStatusQuery)) { 477 continue; 478 } 479 } 480 481 AppInfo app = new AppInfo(rm, rmapp, 482 hasAccess(rmapp, hsr), WebAppUtils.getHttpSchemePrefix(conf)); 483 allApps.add(app); 484 } 485 return allApps; 486 } 487 488 @GET 489 @Path("/appstatistics") 490 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getAppStatistics( @ontext HttpServletRequest hsr, @QueryParam(R) Set<String> stateQueries, @QueryParam(R) Set<String> typeQueries)491 public ApplicationStatisticsInfo getAppStatistics( 492 @Context HttpServletRequest hsr, 493 @QueryParam("states") Set<String> stateQueries, 494 @QueryParam("applicationTypes") Set<String> typeQueries) { 495 init(); 496 497 // parse the params and build the scoreboard 498 // converting state/type name to lowercase 499 Set<String> states = parseQueries(stateQueries, true); 500 Set<String> types = parseQueries(typeQueries, false); 501 // if no types, counts the applications of any types 502 if (types.size() == 0) { 503 types.add(ANY); 504 } else if (types.size() != 1) { 505 throw new BadRequestException("# of applicationTypes = " + types.size() 506 + ", we temporarily support at most one applicationType"); 507 } 508 // if no states, returns the counts of all RMAppStates 509 if (states.size() == 0) { 510 for (YarnApplicationState state : YarnApplicationState.values()) { 511 states.add(StringUtils.toLowerCase(state.toString())); 512 } 513 } 514 // in case we extend to multiple applicationTypes in the future 515 Map<YarnApplicationState, Map<String, Long>> scoreboard = 516 buildScoreboard(states, types); 517 518 // go through the apps in RM to count the numbers, ignoring the case of 519 // the state/type name 520 ConcurrentMap<ApplicationId, RMApp> apps = rm.getRMContext().getRMApps(); 521 for (RMApp rmapp : apps.values()) { 522 YarnApplicationState state = rmapp.createApplicationState(); 523 String type = StringUtils.toLowerCase(rmapp.getApplicationType().trim()); 524 if (states.contains( 525 StringUtils.toLowerCase(state.toString()))) { 526 if (types.contains(ANY)) { 527 countApp(scoreboard, state, ANY); 528 } else if (types.contains(type)) { 529 countApp(scoreboard, state, type); 530 } 531 } 532 } 533 534 // fill the response object 535 ApplicationStatisticsInfo appStatInfo = new ApplicationStatisticsInfo(); 536 for (Map.Entry<YarnApplicationState, Map<String, Long>> partScoreboard 537 : scoreboard.entrySet()) { 538 for (Map.Entry<String, Long> statEntry 539 : partScoreboard.getValue().entrySet()) { 540 StatisticsItemInfo statItem = new StatisticsItemInfo( 541 partScoreboard.getKey(), statEntry.getKey(), statEntry.getValue()); 542 appStatInfo.add(statItem); 543 } 544 } 545 return appStatInfo; 546 } 547 parseQueries( Set<String> queries, boolean isState)548 private static Set<String> parseQueries( 549 Set<String> queries, boolean isState) { 550 Set<String> params = new HashSet<String>(); 551 if (!queries.isEmpty()) { 552 for (String query : queries) { 553 if (query != null && !query.trim().isEmpty()) { 554 String[] paramStrs = query.split(","); 555 for (String paramStr : paramStrs) { 556 if (paramStr != null && !paramStr.trim().isEmpty()) { 557 if (isState) { 558 try { 559 // enum string is in the uppercase 560 YarnApplicationState.valueOf( 561 StringUtils.toUpperCase(paramStr.trim())); 562 } catch (RuntimeException e) { 563 YarnApplicationState[] stateArray = 564 YarnApplicationState.values(); 565 String allAppStates = Arrays.toString(stateArray); 566 throw new BadRequestException( 567 "Invalid application-state " + paramStr.trim() 568 + " specified. It should be one of " + allAppStates); 569 } 570 } 571 params.add( 572 StringUtils.toLowerCase(paramStr.trim())); 573 } 574 } 575 } 576 } 577 } 578 return params; 579 } 580 buildScoreboard( Set<String> states, Set<String> types)581 private static Map<YarnApplicationState, Map<String, Long>> buildScoreboard( 582 Set<String> states, Set<String> types) { 583 Map<YarnApplicationState, Map<String, Long>> scoreboard 584 = new HashMap<YarnApplicationState, Map<String, Long>>(); 585 // default states will result in enumerating all YarnApplicationStates 586 assert !states.isEmpty(); 587 for (String state : states) { 588 Map<String, Long> partScoreboard = new HashMap<String, Long>(); 589 scoreboard.put( 590 YarnApplicationState.valueOf(StringUtils.toUpperCase(state)), 591 partScoreboard); 592 // types is verified no to be empty 593 for (String type : types) { 594 partScoreboard.put(type, 0L); 595 } 596 } 597 return scoreboard; 598 } 599 countApp( Map<YarnApplicationState, Map<String, Long>> scoreboard, YarnApplicationState state, String type)600 private static void countApp( 601 Map<YarnApplicationState, Map<String, Long>> scoreboard, 602 YarnApplicationState state, String type) { 603 Map<String, Long> partScoreboard = scoreboard.get(state); 604 Long count = partScoreboard.get(type); 605 partScoreboard.put(type, count + 1L); 606 } 607 608 @GET 609 @Path("/apps/{appid}") 610 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getApp(@ontext HttpServletRequest hsr, @PathParam(R) String appId)611 public AppInfo getApp(@Context HttpServletRequest hsr, 612 @PathParam("appid") String appId) { 613 init(); 614 if (appId == null || appId.isEmpty()) { 615 throw new NotFoundException("appId, " + appId + ", is empty or null"); 616 } 617 ApplicationId id; 618 id = ConverterUtils.toApplicationId(recordFactory, appId); 619 if (id == null) { 620 throw new NotFoundException("appId is null"); 621 } 622 RMApp app = rm.getRMContext().getRMApps().get(id); 623 if (app == null) { 624 throw new NotFoundException("app with id: " + appId + " not found"); 625 } 626 return new AppInfo(rm, app, hasAccess(app, hsr), hsr.getScheme() + "://"); 627 } 628 629 @GET 630 @Path("/apps/{appid}/appattempts") 631 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getAppAttempts(@ontext HttpServletRequest hsr, @PathParam(R) String appId)632 public AppAttemptsInfo getAppAttempts(@Context HttpServletRequest hsr, 633 @PathParam("appid") String appId) { 634 635 init(); 636 if (appId == null || appId.isEmpty()) { 637 throw new NotFoundException("appId, " + appId + ", is empty or null"); 638 } 639 ApplicationId id; 640 id = ConverterUtils.toApplicationId(recordFactory, appId); 641 if (id == null) { 642 throw new NotFoundException("appId is null"); 643 } 644 RMApp app = rm.getRMContext().getRMApps().get(id); 645 if (app == null) { 646 throw new NotFoundException("app with id: " + appId + " not found"); 647 } 648 649 AppAttemptsInfo appAttemptsInfo = new AppAttemptsInfo(); 650 for (RMAppAttempt attempt : app.getAppAttempts().values()) { 651 AppAttemptInfo attemptInfo = 652 new AppAttemptInfo(rm, attempt, app.getUser(), hsr.getScheme() 653 + "://"); 654 appAttemptsInfo.add(attemptInfo); 655 } 656 657 return appAttemptsInfo; 658 } 659 660 @GET 661 @Path("/apps/{appid}/state") 662 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getAppState(@ontext HttpServletRequest hsr, @PathParam(R) String appId)663 public AppState getAppState(@Context HttpServletRequest hsr, 664 @PathParam("appid") String appId) throws AuthorizationException { 665 init(); 666 UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); 667 String userName = ""; 668 if (callerUGI != null) { 669 userName = callerUGI.getUserName(); 670 } 671 RMApp app = null; 672 try { 673 app = getRMAppForAppId(appId); 674 } catch (NotFoundException e) { 675 RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST, 676 "UNKNOWN", "RMWebService", 677 "Trying to get state of an absent application " + appId); 678 throw e; 679 } 680 681 AppState ret = new AppState(); 682 ret.setState(app.getState().toString()); 683 684 return ret; 685 } 686 687 // can't return POJO because we can't control the status code 688 // it's always set to 200 when we need to allow it to be set 689 // to 202 690 691 @PUT 692 @Path("/apps/{appid}/state") 693 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) 694 @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) updateAppState(AppState targetState, @Context HttpServletRequest hsr, @PathParam(R) String appId)695 public Response updateAppState(AppState targetState, 696 @Context HttpServletRequest hsr, @PathParam("appid") String appId) 697 throws AuthorizationException, YarnException, InterruptedException, 698 IOException { 699 700 init(); 701 UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); 702 if (callerUGI == null) { 703 String msg = "Unable to obtain user name, user not authenticated"; 704 throw new AuthorizationException(msg); 705 } 706 707 if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) { 708 String msg = "The default static user cannot carry out this operation."; 709 return Response.status(Status.FORBIDDEN).entity(msg).build(); 710 } 711 712 String userName = callerUGI.getUserName(); 713 RMApp app = null; 714 try { 715 app = getRMAppForAppId(appId); 716 } catch (NotFoundException e) { 717 RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST, 718 "UNKNOWN", "RMWebService", "Trying to kill an absent application " 719 + appId); 720 throw e; 721 } 722 723 if (!app.getState().toString().equals(targetState.getState())) { 724 // user is attempting to change state. right we only 725 // allow users to kill the app 726 727 if (targetState.getState().equals(YarnApplicationState.KILLED.toString())) { 728 return killApp(app, callerUGI, hsr); 729 } 730 throw new BadRequestException("Only '" 731 + YarnApplicationState.KILLED.toString() 732 + "' is allowed as a target state."); 733 } 734 735 AppState ret = new AppState(); 736 ret.setState(app.getState().toString()); 737 738 return Response.status(Status.OK).entity(ret).build(); 739 } 740 741 @GET 742 @Path("/get-node-to-labels") 743 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getNodeToLabels(@ontext HttpServletRequest hsr)744 public NodeToLabelsInfo getNodeToLabels(@Context HttpServletRequest hsr) 745 throws IOException { 746 init(); 747 748 NodeToLabelsInfo ntl = new NodeToLabelsInfo(); 749 HashMap<String, NodeLabelsInfo> ntlMap = ntl.getNodeToLabels(); 750 Map<NodeId, Set<String>> nodeIdToLabels = 751 rm.getRMContext().getNodeLabelManager().getNodeLabels(); 752 753 for (Map.Entry<NodeId, Set<String>> nitle : nodeIdToLabels.entrySet()) { 754 ntlMap.put(nitle.getKey().toString(), 755 new NodeLabelsInfo(nitle.getValue())); 756 } 757 758 return ntl; 759 } 760 761 @POST 762 @Path("/replace-node-to-labels") 763 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) replaceLabelsOnNodes( final NodeToLabelsInfo newNodeToLabels, @Context HttpServletRequest hsr)764 public Response replaceLabelsOnNodes( 765 final NodeToLabelsInfo newNodeToLabels, 766 @Context HttpServletRequest hsr) 767 throws IOException { 768 init(); 769 770 UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); 771 if (callerUGI == null) { 772 String msg = "Unable to obtain user name, user not authenticated for" 773 + " post to .../replace-node-to-labels"; 774 throw new AuthorizationException(msg); 775 } 776 if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) { 777 String msg = "User " + callerUGI.getShortUserName() + " not authorized" 778 + " for post to .../replace-node-to-labels "; 779 throw new AuthorizationException(msg); 780 } 781 782 Map<NodeId, Set<String>> nodeIdToLabels = 783 new HashMap<NodeId, Set<String>>(); 784 785 for (Map.Entry<String, NodeLabelsInfo> nitle : 786 newNodeToLabels.getNodeToLabels().entrySet()) { 787 nodeIdToLabels.put(ConverterUtils.toNodeIdWithDefaultPort(nitle.getKey()), 788 new HashSet<String>(nitle.getValue().getNodeLabels())); 789 } 790 791 rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode(nodeIdToLabels); 792 793 return Response.status(Status.OK).build(); 794 } 795 796 @GET 797 @Path("/get-node-labels") 798 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getClusterNodeLabels(@ontext HttpServletRequest hsr)799 public NodeLabelsInfo getClusterNodeLabels(@Context HttpServletRequest hsr) 800 throws IOException { 801 init(); 802 803 NodeLabelsInfo ret = 804 new NodeLabelsInfo(rm.getRMContext().getNodeLabelManager() 805 .getClusterNodeLabels()); 806 807 return ret; 808 } 809 810 @POST 811 @Path("/add-node-labels") 812 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) addToClusterNodeLabels(final NodeLabelsInfo newNodeLabels, @Context HttpServletRequest hsr)813 public Response addToClusterNodeLabels(final NodeLabelsInfo newNodeLabels, 814 @Context HttpServletRequest hsr) 815 throws Exception { 816 init(); 817 818 UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); 819 if (callerUGI == null) { 820 String msg = "Unable to obtain user name, user not authenticated for" 821 + " post to .../add-node-labels"; 822 throw new AuthorizationException(msg); 823 } 824 if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) { 825 String msg = "User " + callerUGI.getShortUserName() + " not authorized" 826 + " for post to .../add-node-labels "; 827 throw new AuthorizationException(msg); 828 } 829 830 rm.getRMContext().getNodeLabelManager() 831 .addToCluserNodeLabels(new HashSet<String>( 832 newNodeLabels.getNodeLabels())); 833 834 return Response.status(Status.OK).build(); 835 836 } 837 838 @POST 839 @Path("/remove-node-labels") 840 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) removeFromCluserNodeLabels(final NodeLabelsInfo oldNodeLabels, @Context HttpServletRequest hsr)841 public Response removeFromCluserNodeLabels(final NodeLabelsInfo oldNodeLabels, 842 @Context HttpServletRequest hsr) 843 throws Exception { 844 init(); 845 846 UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); 847 if (callerUGI == null) { 848 String msg = "Unable to obtain user name, user not authenticated for" 849 + " post to .../remove-node-labels"; 850 throw new AuthorizationException(msg); 851 } 852 if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) { 853 String msg = "User " + callerUGI.getShortUserName() + " not authorized" 854 + " for post to .../remove-node-labels "; 855 throw new AuthorizationException(msg); 856 } 857 858 rm.getRMContext().getNodeLabelManager() 859 .removeFromClusterNodeLabels(new HashSet<String>( 860 oldNodeLabels.getNodeLabels())); 861 862 return Response.status(Status.OK).build(); 863 864 } 865 866 @GET 867 @Path("/nodes/{nodeId}/get-labels") 868 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getLabelsOnNode(@ontext HttpServletRequest hsr, @PathParam(R) String nodeId)869 public NodeLabelsInfo getLabelsOnNode(@Context HttpServletRequest hsr, 870 @PathParam("nodeId") String nodeId) 871 throws IOException { 872 init(); 873 874 NodeId nid = ConverterUtils.toNodeIdWithDefaultPort(nodeId); 875 return new NodeLabelsInfo( 876 rm.getRMContext().getNodeLabelManager().getLabelsOnNode(nid)); 877 878 } 879 880 @POST 881 @Path("/nodes/{nodeId}/replace-labels") 882 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) replaceLabelsOnNode(NodeLabelsInfo newNodeLabelsInfo, @Context HttpServletRequest hsr, @PathParam(R) String nodeId)883 public Response replaceLabelsOnNode(NodeLabelsInfo newNodeLabelsInfo, 884 @Context HttpServletRequest hsr, @PathParam("nodeId") String nodeId) 885 throws Exception { 886 init(); 887 888 UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); 889 if (callerUGI == null) { 890 String msg = "Unable to obtain user name, user not authenticated for" 891 + " post to .../nodes/nodeid/replace-labels"; 892 throw new AuthorizationException(msg); 893 } 894 895 if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) { 896 String msg = "User " + callerUGI.getShortUserName() + " not authorized" 897 + " for post to .../nodes/nodeid/replace-labels"; 898 throw new AuthorizationException(msg); 899 } 900 901 NodeId nid = ConverterUtils.toNodeIdWithDefaultPort(nodeId); 902 903 Map<NodeId, Set<String>> newLabelsForNode = new HashMap<NodeId, 904 Set<String>>(); 905 906 newLabelsForNode.put(nid, new HashSet<String>(newNodeLabelsInfo.getNodeLabels())); 907 908 rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode(newLabelsForNode); 909 910 return Response.status(Status.OK).build(); 911 912 } 913 killApp(RMApp app, UserGroupInformation callerUGI, HttpServletRequest hsr)914 protected Response killApp(RMApp app, UserGroupInformation callerUGI, 915 HttpServletRequest hsr) throws IOException, InterruptedException { 916 917 if (app == null) { 918 throw new IllegalArgumentException("app cannot be null"); 919 } 920 String userName = callerUGI.getUserName(); 921 final ApplicationId appid = app.getApplicationId(); 922 KillApplicationResponse resp = null; 923 try { 924 resp = 925 callerUGI 926 .doAs(new PrivilegedExceptionAction<KillApplicationResponse>() { 927 @Override 928 public KillApplicationResponse run() throws IOException, 929 YarnException { 930 KillApplicationRequest req = 931 KillApplicationRequest.newInstance(appid); 932 return rm.getClientRMService().forceKillApplication(req); 933 } 934 }); 935 } catch (UndeclaredThrowableException ue) { 936 // if the root cause is a permissions issue 937 // bubble that up to the user 938 if (ue.getCause() instanceof YarnException) { 939 YarnException ye = (YarnException) ue.getCause(); 940 if (ye.getCause() instanceof AccessControlException) { 941 String appId = app.getApplicationId().toString(); 942 String msg = 943 "Unauthorized attempt to kill appid " + appId 944 + " by remote user " + userName; 945 return Response.status(Status.FORBIDDEN).entity(msg).build(); 946 } else { 947 throw ue; 948 } 949 } else { 950 throw ue; 951 } 952 } 953 954 AppState ret = new AppState(); 955 ret.setState(app.getState().toString()); 956 957 if (resp.getIsKillCompleted()) { 958 RMAuditLogger.logSuccess(userName, AuditConstants.KILL_APP_REQUEST, 959 "RMWebService", app.getApplicationId()); 960 } else { 961 return Response.status(Status.ACCEPTED).entity(ret) 962 .header(HttpHeaders.LOCATION, hsr.getRequestURL()).build(); 963 } 964 return Response.status(Status.OK).entity(ret).build(); 965 } 966 967 @GET 968 @Path("/apps/{appid}/queue") 969 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) getAppQueue(@ontext HttpServletRequest hsr, @PathParam(R) String appId)970 public AppQueue getAppQueue(@Context HttpServletRequest hsr, 971 @PathParam("appid") String appId) throws AuthorizationException { 972 init(); 973 UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); 974 String userName = "UNKNOWN-USER"; 975 if (callerUGI != null) { 976 userName = callerUGI.getUserName(); 977 } 978 RMApp app = null; 979 try { 980 app = getRMAppForAppId(appId); 981 } catch (NotFoundException e) { 982 RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST, 983 "UNKNOWN", "RMWebService", 984 "Trying to get state of an absent application " + appId); 985 throw e; 986 } 987 988 AppQueue ret = new AppQueue(); 989 ret.setQueue(app.getQueue()); 990 991 return ret; 992 } 993 994 @PUT 995 @Path("/apps/{appid}/queue") 996 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) 997 @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) updateAppQueue(AppQueue targetQueue, @Context HttpServletRequest hsr, @PathParam(R) String appId)998 public Response updateAppQueue(AppQueue targetQueue, 999 @Context HttpServletRequest hsr, @PathParam("appid") String appId) 1000 throws AuthorizationException, YarnException, InterruptedException, 1001 IOException { 1002 1003 init(); 1004 UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); 1005 if (callerUGI == null) { 1006 String msg = "Unable to obtain user name, user not authenticated"; 1007 throw new AuthorizationException(msg); 1008 } 1009 1010 if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) { 1011 String msg = "The default static user cannot carry out this operation."; 1012 return Response.status(Status.FORBIDDEN).entity(msg).build(); 1013 } 1014 1015 String userName = callerUGI.getUserName(); 1016 RMApp app = null; 1017 try { 1018 app = getRMAppForAppId(appId); 1019 } catch (NotFoundException e) { 1020 RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST, 1021 "UNKNOWN", "RMWebService", "Trying to move an absent application " 1022 + appId); 1023 throw e; 1024 } 1025 1026 if (!app.getQueue().equals(targetQueue.getQueue())) { 1027 // user is attempting to change queue. 1028 return moveApp(app, callerUGI, targetQueue.getQueue()); 1029 } 1030 1031 AppQueue ret = new AppQueue(); 1032 ret.setQueue(app.getQueue()); 1033 1034 return Response.status(Status.OK).entity(ret).build(); 1035 } 1036 moveApp(RMApp app, UserGroupInformation callerUGI, String targetQueue)1037 protected Response moveApp(RMApp app, UserGroupInformation callerUGI, 1038 String targetQueue) throws IOException, InterruptedException { 1039 1040 if (app == null) { 1041 throw new IllegalArgumentException("app cannot be null"); 1042 } 1043 String userName = callerUGI.getUserName(); 1044 final ApplicationId appid = app.getApplicationId(); 1045 final String reqTargetQueue = targetQueue; 1046 try { 1047 callerUGI 1048 .doAs(new PrivilegedExceptionAction<Void>() { 1049 @Override 1050 public Void run() throws IOException, 1051 YarnException { 1052 MoveApplicationAcrossQueuesRequest req = 1053 MoveApplicationAcrossQueuesRequest.newInstance(appid, 1054 reqTargetQueue); 1055 rm.getClientRMService().moveApplicationAcrossQueues(req); 1056 return null; 1057 } 1058 }); 1059 } catch (UndeclaredThrowableException ue) { 1060 // if the root cause is a permissions issue 1061 // bubble that up to the user 1062 if (ue.getCause() instanceof YarnException) { 1063 YarnException ye = (YarnException) ue.getCause(); 1064 if (ye.getCause() instanceof AccessControlException) { 1065 String appId = app.getApplicationId().toString(); 1066 String msg = 1067 "Unauthorized attempt to move appid " + appId 1068 + " by remote user " + userName; 1069 return Response.status(Status.FORBIDDEN).entity(msg).build(); 1070 } else if (ye.getMessage().startsWith("App in") 1071 && ye.getMessage().endsWith("state cannot be moved.")) { 1072 return Response.status(Status.BAD_REQUEST).entity(ye.getMessage()) 1073 .build(); 1074 } else { 1075 throw ue; 1076 } 1077 } else { 1078 throw ue; 1079 } 1080 } 1081 1082 AppQueue ret = new AppQueue(); 1083 ret.setQueue(app.getQueue()); 1084 return Response.status(Status.OK).entity(ret).build(); 1085 } 1086 getRMAppForAppId(String appId)1087 private RMApp getRMAppForAppId(String appId) { 1088 1089 if (appId == null || appId.isEmpty()) { 1090 throw new NotFoundException("appId, " + appId + ", is empty or null"); 1091 } 1092 ApplicationId id; 1093 try { 1094 id = ConverterUtils.toApplicationId(recordFactory, appId); 1095 } catch (NumberFormatException e) { 1096 throw new NotFoundException("appId is invalid"); 1097 } 1098 if (id == null) { 1099 throw new NotFoundException("appId is invalid"); 1100 } 1101 RMApp app = rm.getRMContext().getRMApps().get(id); 1102 if (app == null) { 1103 throw new NotFoundException("app with id: " + appId + " not found"); 1104 } 1105 return app; 1106 } 1107 getCallerUserGroupInformation( HttpServletRequest hsr, boolean usePrincipal)1108 private UserGroupInformation getCallerUserGroupInformation( 1109 HttpServletRequest hsr, boolean usePrincipal) { 1110 1111 String remoteUser = hsr.getRemoteUser(); 1112 if (usePrincipal) { 1113 Principal princ = hsr.getUserPrincipal(); 1114 remoteUser = princ == null ? null : princ.getName(); 1115 } 1116 1117 UserGroupInformation callerUGI = null; 1118 if (remoteUser != null) { 1119 callerUGI = UserGroupInformation.createRemoteUser(remoteUser); 1120 } 1121 1122 return callerUGI; 1123 } 1124 isStaticUser(UserGroupInformation callerUGI)1125 private boolean isStaticUser(UserGroupInformation callerUGI) { 1126 String staticUser = 1127 conf.get(CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER, 1128 CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER); 1129 return staticUser.equals(callerUGI.getUserName()); 1130 } 1131 1132 /** 1133 * Generates a new ApplicationId which is then sent to the client 1134 * 1135 * @param hsr 1136 * the servlet request 1137 * @return Response containing the app id and the maximum resource 1138 * capabilities 1139 * @throws AuthorizationException 1140 * @throws IOException 1141 * @throws InterruptedException 1142 */ 1143 @POST 1144 @Path("/apps/new-application") 1145 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) createNewApplication(@ontext HttpServletRequest hsr)1146 public Response createNewApplication(@Context HttpServletRequest hsr) 1147 throws AuthorizationException, IOException, InterruptedException { 1148 init(); 1149 UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); 1150 if (callerUGI == null) { 1151 throw new AuthorizationException("Unable to obtain user name, " 1152 + "user not authenticated"); 1153 } 1154 if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) { 1155 String msg = "The default static user cannot carry out this operation."; 1156 return Response.status(Status.FORBIDDEN).entity(msg).build(); 1157 } 1158 1159 NewApplication appId = createNewApplication(); 1160 return Response.status(Status.OK).entity(appId).build(); 1161 1162 } 1163 1164 // reuse the code in ClientRMService to create new app 1165 // get the new app id and submit app 1166 // set location header with new app location 1167 /** 1168 * Function to submit an app to the RM 1169 * 1170 * @param newApp 1171 * structure containing information to construct the 1172 * ApplicationSubmissionContext 1173 * @param hsr 1174 * the servlet request 1175 * @return Response containing the status code 1176 * @throws AuthorizationException 1177 * @throws IOException 1178 * @throws InterruptedException 1179 */ 1180 @POST 1181 @Path("/apps") 1182 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) 1183 @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) submitApplication(ApplicationSubmissionContextInfo newApp, @Context HttpServletRequest hsr)1184 public Response submitApplication(ApplicationSubmissionContextInfo newApp, 1185 @Context HttpServletRequest hsr) throws AuthorizationException, 1186 IOException, InterruptedException { 1187 1188 init(); 1189 UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); 1190 if (callerUGI == null) { 1191 throw new AuthorizationException("Unable to obtain user name, " 1192 + "user not authenticated"); 1193 } 1194 1195 if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) { 1196 String msg = "The default static user cannot carry out this operation."; 1197 return Response.status(Status.FORBIDDEN).entity(msg).build(); 1198 } 1199 1200 ApplicationSubmissionContext appContext = 1201 createAppSubmissionContext(newApp); 1202 final SubmitApplicationRequest req = 1203 SubmitApplicationRequest.newInstance(appContext); 1204 1205 try { 1206 callerUGI 1207 .doAs(new PrivilegedExceptionAction<SubmitApplicationResponse>() { 1208 @Override 1209 public SubmitApplicationResponse run() throws IOException, 1210 YarnException { 1211 return rm.getClientRMService().submitApplication(req); 1212 } 1213 }); 1214 } catch (UndeclaredThrowableException ue) { 1215 if (ue.getCause() instanceof YarnException) { 1216 throw new BadRequestException(ue.getCause().getMessage()); 1217 } 1218 LOG.info("Submit app request failed", ue); 1219 throw ue; 1220 } 1221 1222 String url = hsr.getRequestURL() + "/" + newApp.getApplicationId(); 1223 return Response.status(Status.ACCEPTED).header(HttpHeaders.LOCATION, url) 1224 .build(); 1225 } 1226 1227 /** 1228 * Function that actually creates the ApplicationId by calling the 1229 * ClientRMService 1230 * 1231 * @return returns structure containing the app-id and maximum resource 1232 * capabilities 1233 */ createNewApplication()1234 private NewApplication createNewApplication() { 1235 GetNewApplicationRequest req = 1236 recordFactory.newRecordInstance(GetNewApplicationRequest.class); 1237 GetNewApplicationResponse resp; 1238 try { 1239 resp = rm.getClientRMService().getNewApplication(req); 1240 } catch (YarnException e) { 1241 String msg = "Unable to create new app from RM web service"; 1242 LOG.error(msg, e); 1243 throw new YarnRuntimeException(msg, e); 1244 } 1245 NewApplication appId = 1246 new NewApplication(resp.getApplicationId().toString(), 1247 new ResourceInfo(resp.getMaximumResourceCapability())); 1248 return appId; 1249 } 1250 1251 /** 1252 * Create the actual ApplicationSubmissionContext to be submitted to the RM 1253 * from the information provided by the user. 1254 * 1255 * @param newApp 1256 * the information provided by the user 1257 * @return returns the constructed ApplicationSubmissionContext 1258 * @throws IOException 1259 */ createAppSubmissionContext( ApplicationSubmissionContextInfo newApp)1260 protected ApplicationSubmissionContext createAppSubmissionContext( 1261 ApplicationSubmissionContextInfo newApp) throws IOException { 1262 1263 // create local resources and app submission context 1264 1265 ApplicationId appid; 1266 String error = 1267 "Could not parse application id " + newApp.getApplicationId(); 1268 try { 1269 appid = 1270 ConverterUtils.toApplicationId(recordFactory, 1271 newApp.getApplicationId()); 1272 } catch (Exception e) { 1273 throw new BadRequestException(error); 1274 } 1275 ApplicationSubmissionContext appContext = 1276 ApplicationSubmissionContext.newInstance(appid, 1277 newApp.getApplicationName(), newApp.getQueue(), 1278 Priority.newInstance(newApp.getPriority()), 1279 createContainerLaunchContext(newApp), newApp.getUnmanagedAM(), 1280 newApp.getCancelTokensWhenComplete(), newApp.getMaxAppAttempts(), 1281 createAppSubmissionContextResource(newApp), 1282 newApp.getApplicationType(), 1283 newApp.getKeepContainersAcrossApplicationAttempts(), 1284 newApp.getAppNodeLabelExpression(), 1285 newApp.getAMContainerNodeLabelExpression()); 1286 appContext.setApplicationTags(newApp.getApplicationTags()); 1287 1288 return appContext; 1289 } 1290 createAppSubmissionContextResource( ApplicationSubmissionContextInfo newApp)1291 protected Resource createAppSubmissionContextResource( 1292 ApplicationSubmissionContextInfo newApp) throws BadRequestException { 1293 if (newApp.getResource().getvCores() > rm.getConfig().getInt( 1294 YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1295 YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)) { 1296 String msg = "Requested more cores than configured max"; 1297 throw new BadRequestException(msg); 1298 } 1299 if (newApp.getResource().getMemory() > rm.getConfig().getInt( 1300 YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1301 YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)) { 1302 String msg = "Requested more memory than configured max"; 1303 throw new BadRequestException(msg); 1304 } 1305 Resource r = 1306 Resource.newInstance(newApp.getResource().getMemory(), newApp 1307 .getResource().getvCores()); 1308 return r; 1309 } 1310 1311 /** 1312 * Create the ContainerLaunchContext required for the 1313 * ApplicationSubmissionContext. This function takes the user information and 1314 * generates the ByteBuffer structures required by the ContainerLaunchContext 1315 * 1316 * @param newApp 1317 * the information provided by the user 1318 * @return created context 1319 * @throws BadRequestException 1320 * @throws IOException 1321 */ createContainerLaunchContext( ApplicationSubmissionContextInfo newApp)1322 protected ContainerLaunchContext createContainerLaunchContext( 1323 ApplicationSubmissionContextInfo newApp) throws BadRequestException, 1324 IOException { 1325 1326 // create container launch context 1327 1328 HashMap<String, ByteBuffer> hmap = new HashMap<String, ByteBuffer>(); 1329 for (Map.Entry<String, String> entry : newApp 1330 .getContainerLaunchContextInfo().getAuxillaryServiceData().entrySet()) { 1331 if (entry.getValue().isEmpty() == false) { 1332 Base64 decoder = new Base64(0, null, true); 1333 byte[] data = decoder.decode(entry.getValue()); 1334 hmap.put(entry.getKey(), ByteBuffer.wrap(data)); 1335 } 1336 } 1337 1338 HashMap<String, LocalResource> hlr = new HashMap<String, LocalResource>(); 1339 for (Map.Entry<String, LocalResourceInfo> entry : newApp 1340 .getContainerLaunchContextInfo().getResources().entrySet()) { 1341 LocalResourceInfo l = entry.getValue(); 1342 LocalResource lr = 1343 LocalResource.newInstance( 1344 ConverterUtils.getYarnUrlFromURI(l.getUrl()), l.getType(), 1345 l.getVisibility(), l.getSize(), l.getTimestamp()); 1346 hlr.put(entry.getKey(), lr); 1347 } 1348 1349 DataOutputBuffer out = new DataOutputBuffer(); 1350 Credentials cs = 1351 createCredentials(newApp.getContainerLaunchContextInfo() 1352 .getCredentials()); 1353 cs.writeTokenStorageToStream(out); 1354 ByteBuffer tokens = ByteBuffer.wrap(out.getData()); 1355 1356 ContainerLaunchContext ctx = 1357 ContainerLaunchContext.newInstance(hlr, newApp 1358 .getContainerLaunchContextInfo().getEnvironment(), newApp 1359 .getContainerLaunchContextInfo().getCommands(), hmap, tokens, newApp 1360 .getContainerLaunchContextInfo().getAcls()); 1361 1362 return ctx; 1363 } 1364 1365 /** 1366 * Generate a Credentials object from the information in the CredentialsInfo 1367 * object. 1368 * 1369 * @param credentials 1370 * the CredentialsInfo provided by the user. 1371 * @return 1372 */ createCredentials(CredentialsInfo credentials)1373 private Credentials createCredentials(CredentialsInfo credentials) { 1374 Credentials ret = new Credentials(); 1375 try { 1376 for (Map.Entry<String, String> entry : credentials.getTokens().entrySet()) { 1377 Text alias = new Text(entry.getKey()); 1378 Token<TokenIdentifier> token = new Token<TokenIdentifier>(); 1379 token.decodeFromUrlString(entry.getValue()); 1380 ret.addToken(alias, token); 1381 } 1382 for (Map.Entry<String, String> entry : credentials.getSecrets().entrySet()) { 1383 Text alias = new Text(entry.getKey()); 1384 Base64 decoder = new Base64(0, null, true); 1385 byte[] secret = decoder.decode(entry.getValue()); 1386 ret.addSecretKey(alias, secret); 1387 } 1388 } catch (IOException ie) { 1389 throw new BadRequestException( 1390 "Could not parse credentials data; exception message = " 1391 + ie.getMessage()); 1392 } 1393 return ret; 1394 } 1395 createKerberosUserGroupInformation( HttpServletRequest hsr)1396 private UserGroupInformation createKerberosUserGroupInformation( 1397 HttpServletRequest hsr) throws AuthorizationException, YarnException { 1398 1399 UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); 1400 if (callerUGI == null) { 1401 String msg = "Unable to obtain user name, user not authenticated"; 1402 throw new AuthorizationException(msg); 1403 } 1404 1405 String authType = hsr.getAuthType(); 1406 if (!KerberosAuthenticationHandler.TYPE.equalsIgnoreCase(authType)) { 1407 String msg = 1408 "Delegation token operations can only be carried out on a " 1409 + "Kerberos authenticated channel. Expected auth type is " 1410 + KerberosAuthenticationHandler.TYPE + ", got type " + authType; 1411 throw new YarnException(msg); 1412 } 1413 if (hsr 1414 .getAttribute(DelegationTokenAuthenticationHandler.DELEGATION_TOKEN_UGI_ATTRIBUTE) != null) { 1415 String msg = 1416 "Delegation token operations cannot be carried out using delegation" 1417 + " token authentication."; 1418 throw new YarnException(msg); 1419 } 1420 1421 callerUGI.setAuthenticationMethod(AuthenticationMethod.KERBEROS); 1422 return callerUGI; 1423 } 1424 1425 @POST 1426 @Path("/delegation-token") 1427 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) 1428 @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) postDelegationToken(DelegationToken tokenData, @Context HttpServletRequest hsr)1429 public Response postDelegationToken(DelegationToken tokenData, 1430 @Context HttpServletRequest hsr) throws AuthorizationException, 1431 IOException, InterruptedException, Exception { 1432 1433 init(); 1434 UserGroupInformation callerUGI; 1435 try { 1436 callerUGI = createKerberosUserGroupInformation(hsr); 1437 } catch (YarnException ye) { 1438 return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build(); 1439 } 1440 return createDelegationToken(tokenData, hsr, callerUGI); 1441 } 1442 1443 @POST 1444 @Path("/delegation-token/expiration") 1445 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) 1446 @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) 1447 public Response postDelegationTokenExpiration(@ontext HttpServletRequest hsr)1448 postDelegationTokenExpiration(@Context HttpServletRequest hsr) 1449 throws AuthorizationException, IOException, InterruptedException, 1450 Exception { 1451 1452 init(); 1453 UserGroupInformation callerUGI; 1454 try { 1455 callerUGI = createKerberosUserGroupInformation(hsr); 1456 } catch (YarnException ye) { 1457 return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build(); 1458 } 1459 1460 DelegationToken requestToken = new DelegationToken(); 1461 requestToken.setToken(extractToken(hsr).encodeToUrlString()); 1462 return renewDelegationToken(requestToken, hsr, callerUGI); 1463 } 1464 createDelegationToken(DelegationToken tokenData, HttpServletRequest hsr, UserGroupInformation callerUGI)1465 private Response createDelegationToken(DelegationToken tokenData, 1466 HttpServletRequest hsr, UserGroupInformation callerUGI) 1467 throws AuthorizationException, IOException, InterruptedException, 1468 Exception { 1469 1470 final String renewer = tokenData.getRenewer(); 1471 GetDelegationTokenResponse resp; 1472 try { 1473 resp = 1474 callerUGI 1475 .doAs(new PrivilegedExceptionAction<GetDelegationTokenResponse>() { 1476 @Override 1477 public GetDelegationTokenResponse run() throws IOException, 1478 YarnException { 1479 GetDelegationTokenRequest createReq = 1480 GetDelegationTokenRequest.newInstance(renewer); 1481 return rm.getClientRMService().getDelegationToken(createReq); 1482 } 1483 }); 1484 } catch (Exception e) { 1485 LOG.info("Create delegation token request failed", e); 1486 throw e; 1487 } 1488 1489 Token<RMDelegationTokenIdentifier> tk = 1490 new Token<RMDelegationTokenIdentifier>(resp.getRMDelegationToken() 1491 .getIdentifier().array(), resp.getRMDelegationToken().getPassword() 1492 .array(), new Text(resp.getRMDelegationToken().getKind()), new Text( 1493 resp.getRMDelegationToken().getService())); 1494 RMDelegationTokenIdentifier identifier = tk.decodeIdentifier(); 1495 long currentExpiration = 1496 rm.getRMContext().getRMDelegationTokenSecretManager() 1497 .getRenewDate(identifier); 1498 DelegationToken respToken = 1499 new DelegationToken(tk.encodeToUrlString(), renewer, identifier 1500 .getOwner().toString(), tk.getKind().toString(), currentExpiration, 1501 identifier.getMaxDate()); 1502 return Response.status(Status.OK).entity(respToken).build(); 1503 } 1504 renewDelegationToken(DelegationToken tokenData, HttpServletRequest hsr, UserGroupInformation callerUGI)1505 private Response renewDelegationToken(DelegationToken tokenData, 1506 HttpServletRequest hsr, UserGroupInformation callerUGI) 1507 throws AuthorizationException, IOException, InterruptedException, 1508 Exception { 1509 1510 Token<RMDelegationTokenIdentifier> token = 1511 extractToken(tokenData.getToken()); 1512 1513 org.apache.hadoop.yarn.api.records.Token dToken = 1514 BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind() 1515 .toString(), token.getPassword(), token.getService().toString()); 1516 final RenewDelegationTokenRequest req = 1517 RenewDelegationTokenRequest.newInstance(dToken); 1518 1519 RenewDelegationTokenResponse resp; 1520 try { 1521 resp = 1522 callerUGI 1523 .doAs(new PrivilegedExceptionAction<RenewDelegationTokenResponse>() { 1524 @Override 1525 public RenewDelegationTokenResponse run() throws IOException, 1526 YarnException { 1527 return rm.getClientRMService().renewDelegationToken(req); 1528 } 1529 }); 1530 } catch (UndeclaredThrowableException ue) { 1531 if (ue.getCause() instanceof YarnException) { 1532 if (ue.getCause().getCause() instanceof InvalidToken) { 1533 throw new BadRequestException(ue.getCause().getCause().getMessage()); 1534 } else if (ue.getCause().getCause() instanceof org.apache.hadoop.security.AccessControlException) { 1535 return Response.status(Status.FORBIDDEN) 1536 .entity(ue.getCause().getCause().getMessage()).build(); 1537 } 1538 LOG.info("Renew delegation token request failed", ue); 1539 throw ue; 1540 } 1541 LOG.info("Renew delegation token request failed", ue); 1542 throw ue; 1543 } catch (Exception e) { 1544 LOG.info("Renew delegation token request failed", e); 1545 throw e; 1546 } 1547 long renewTime = resp.getNextExpirationTime(); 1548 1549 DelegationToken respToken = new DelegationToken(); 1550 respToken.setNextExpirationTime(renewTime); 1551 return Response.status(Status.OK).entity(respToken).build(); 1552 } 1553 1554 // For cancelling tokens, the encoded token is passed as a header 1555 // There are two reasons for this - 1556 // 1. Passing a request body as part of a DELETE request is not 1557 // allowed by Jetty 1558 // 2. Passing the encoded token as part of the url is not ideal 1559 // since urls tend to get logged and anyone with access to 1560 // the logs can extract tokens which are meant to be secret 1561 @DELETE 1562 @Path("/delegation-token") 1563 @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) cancelDelegationToken(@ontext HttpServletRequest hsr)1564 public Response cancelDelegationToken(@Context HttpServletRequest hsr) 1565 throws AuthorizationException, IOException, InterruptedException, 1566 Exception { 1567 1568 init(); 1569 UserGroupInformation callerUGI; 1570 try { 1571 callerUGI = createKerberosUserGroupInformation(hsr); 1572 } catch (YarnException ye) { 1573 return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build(); 1574 } 1575 1576 Token<RMDelegationTokenIdentifier> token = extractToken(hsr); 1577 1578 org.apache.hadoop.yarn.api.records.Token dToken = 1579 BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind() 1580 .toString(), token.getPassword(), token.getService().toString()); 1581 final CancelDelegationTokenRequest req = 1582 CancelDelegationTokenRequest.newInstance(dToken); 1583 1584 try { 1585 callerUGI 1586 .doAs(new PrivilegedExceptionAction<CancelDelegationTokenResponse>() { 1587 @Override 1588 public CancelDelegationTokenResponse run() throws IOException, 1589 YarnException { 1590 return rm.getClientRMService().cancelDelegationToken(req); 1591 } 1592 }); 1593 } catch (UndeclaredThrowableException ue) { 1594 if (ue.getCause() instanceof YarnException) { 1595 if (ue.getCause().getCause() instanceof InvalidToken) { 1596 throw new BadRequestException(ue.getCause().getCause().getMessage()); 1597 } else if (ue.getCause().getCause() instanceof org.apache.hadoop.security.AccessControlException) { 1598 return Response.status(Status.FORBIDDEN) 1599 .entity(ue.getCause().getCause().getMessage()).build(); 1600 } 1601 LOG.info("Renew delegation token request failed", ue); 1602 throw ue; 1603 } 1604 LOG.info("Renew delegation token request failed", ue); 1605 throw ue; 1606 } catch (Exception e) { 1607 LOG.info("Renew delegation token request failed", e); 1608 throw e; 1609 } 1610 1611 return Response.status(Status.OK).build(); 1612 } 1613 extractToken( HttpServletRequest request)1614 private Token<RMDelegationTokenIdentifier> extractToken( 1615 HttpServletRequest request) { 1616 String encodedToken = request.getHeader(DELEGATION_TOKEN_HEADER); 1617 if (encodedToken == null) { 1618 String msg = 1619 "Header '" + DELEGATION_TOKEN_HEADER 1620 + "' containing encoded token not found"; 1621 throw new BadRequestException(msg); 1622 } 1623 return extractToken(encodedToken); 1624 } 1625 extractToken(String encodedToken)1626 private Token<RMDelegationTokenIdentifier> extractToken(String encodedToken) { 1627 Token<RMDelegationTokenIdentifier> token = 1628 new Token<RMDelegationTokenIdentifier>(); 1629 try { 1630 token.decodeFromUrlString(encodedToken); 1631 } catch (Exception ie) { 1632 String msg = "Could not decode encoded token"; 1633 throw new BadRequestException(msg); 1634 } 1635 return token; 1636 } 1637 } 1638