1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.server.resourcemanager.webapp;
20 
21 import java.io.IOException;
22 import java.lang.reflect.UndeclaredThrowableException;
23 import java.security.AccessControlException;
24 import java.nio.ByteBuffer;
25 import java.security.Principal;
26 import java.security.PrivilegedExceptionAction;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.EnumSet;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentMap;
36 
37 import javax.servlet.http.HttpServletRequest;
38 import javax.servlet.http.HttpServletResponse;
39 import javax.ws.rs.Consumes;
40 import javax.ws.rs.DELETE;
41 import javax.ws.rs.GET;
42 import javax.ws.rs.POST;
43 import javax.ws.rs.PUT;
44 import javax.ws.rs.Path;
45 import javax.ws.rs.PathParam;
46 import javax.ws.rs.Produces;
47 import javax.ws.rs.QueryParam;
48 import javax.ws.rs.core.Context;
49 import javax.ws.rs.core.HttpHeaders;
50 import javax.ws.rs.core.MediaType;
51 import javax.ws.rs.core.Response;
52 import javax.ws.rs.core.Response.Status;
53 
54 import org.apache.commons.codec.binary.Base64;
55 import org.apache.commons.logging.Log;
56 import org.apache.commons.logging.LogFactory;
57 import org.apache.hadoop.conf.Configuration;
58 import org.apache.hadoop.fs.CommonConfigurationKeys;
59 import org.apache.hadoop.io.DataOutputBuffer;
60 import org.apache.hadoop.io.Text;
61 import org.apache.hadoop.security.Credentials;
62 import org.apache.hadoop.security.UserGroupInformation;
63 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
64 import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
65 import org.apache.hadoop.security.authorize.AuthorizationException;
66 import org.apache.hadoop.security.token.Token;
67 import org.apache.hadoop.security.token.TokenIdentifier;
68 import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
69 import org.apache.hadoop.util.StringUtils;
70 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
71 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
72 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
73 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
74 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
75 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
76 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
77 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
78 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
79 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
80 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
81 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
82 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
83 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
84 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
85 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
86 import org.apache.hadoop.yarn.api.records.ApplicationId;
87 import org.apache.hadoop.yarn.api.records.ApplicationReport;
88 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
89 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
90 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
91 import org.apache.hadoop.yarn.api.records.LocalResource;
92 import org.apache.hadoop.yarn.api.records.NodeId;
93 import org.apache.hadoop.yarn.api.records.NodeState;
94 import org.apache.hadoop.yarn.api.records.Priority;
95 import org.apache.hadoop.yarn.api.records.QueueACL;
96 import org.apache.hadoop.yarn.api.records.Resource;
97 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
98 import org.apache.hadoop.yarn.conf.YarnConfiguration;
99 import org.apache.hadoop.yarn.exceptions.YarnException;
100 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
101 import org.apache.hadoop.yarn.factories.RecordFactory;
102 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
103 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
104 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
105 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
106 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
107 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
108 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
109 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
110 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
111 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
112 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
113 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
114 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
115 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
116 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
117 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
118 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
119 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
120 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
121 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
122 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
123 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
124 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
125 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
126 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
127 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
128 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
129 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
130 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
131 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo;
132 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
133 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
134 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
135 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
136 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
137 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
138 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
139 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
140 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
141 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
142 import org.apache.hadoop.yarn.util.ConverterUtils;
143 import org.apache.hadoop.yarn.webapp.BadRequestException;
144 import org.apache.hadoop.yarn.webapp.NotFoundException;
145 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
146 
147 import com.google.inject.Inject;
148 import com.google.inject.Singleton;
149 
150 @Singleton
151 @Path("/ws/v1/cluster")
152 public class RMWebServices {
153   private static final Log LOG =
154       LogFactory.getLog(RMWebServices.class.getName());
155   private static final String EMPTY = "";
156   private static final String ANY = "*";
157   private final ResourceManager rm;
158   private static RecordFactory recordFactory = RecordFactoryProvider
159       .getRecordFactory(null);
160   private final Configuration conf;
161   private @Context HttpServletResponse response;
162 
163   public final static String DELEGATION_TOKEN_HEADER =
164       "Hadoop-YARN-RM-Delegation-Token";
165 
166   @Inject
RMWebServices(final ResourceManager rm, Configuration conf)167   public RMWebServices(final ResourceManager rm, Configuration conf) {
168     this.rm = rm;
169     this.conf = conf;
170   }
171 
RMWebServices(ResourceManager rm, Configuration conf, HttpServletResponse response)172   RMWebServices(ResourceManager rm, Configuration conf,
173       HttpServletResponse response) {
174     this(rm, conf);
175     this.response = response;
176   }
177 
hasAccess(RMApp app, HttpServletRequest hsr)178   protected Boolean hasAccess(RMApp app, HttpServletRequest hsr) {
179     // Check for the authorization.
180     UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
181     if (callerUGI != null
182         && !(this.rm.getApplicationACLsManager().checkAccess(callerUGI,
183               ApplicationAccessType.VIEW_APP, app.getUser(),
184               app.getApplicationId()) ||
185             this.rm.getQueueACLsManager().checkAccess(callerUGI,
186               QueueACL.ADMINISTER_QUEUE, app.getQueue()))) {
187       return false;
188     }
189     return true;
190   }
191 
init()192   private void init() {
193     //clear content type
194     response.setContentType(null);
195   }
196 
197   @GET
198   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
get()199   public ClusterInfo get() {
200     return getClusterInfo();
201   }
202 
203   @GET
204   @Path("/info")
205   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getClusterInfo()206   public ClusterInfo getClusterInfo() {
207     init();
208     return new ClusterInfo(this.rm);
209   }
210 
211   @GET
212   @Path("/metrics")
213   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getClusterMetricsInfo()214   public ClusterMetricsInfo getClusterMetricsInfo() {
215     init();
216     return new ClusterMetricsInfo(this.rm);
217   }
218 
219   @GET
220   @Path("/scheduler")
221   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getSchedulerInfo()222   public SchedulerTypeInfo getSchedulerInfo() {
223     init();
224     ResourceScheduler rs = rm.getResourceScheduler();
225     SchedulerInfo sinfo;
226     if (rs instanceof CapacityScheduler) {
227       CapacityScheduler cs = (CapacityScheduler) rs;
228       CSQueue root = cs.getRootQueue();
229       sinfo = new CapacitySchedulerInfo(root);
230     } else if (rs instanceof FairScheduler) {
231       FairScheduler fs = (FairScheduler) rs;
232       sinfo = new FairSchedulerInfo(fs);
233     } else if (rs instanceof FifoScheduler) {
234       sinfo = new FifoSchedulerInfo(this.rm);
235     } else {
236       throw new NotFoundException("Unknown scheduler configured");
237     }
238     return new SchedulerTypeInfo(sinfo);
239   }
240 
241   /**
242    * Returns all nodes in the cluster. If the states param is given, returns
243    * all nodes that are in the comma-separated list of states.
244    */
245   @GET
246   @Path("/nodes")
247   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getNodes(@ueryParamR) String states)248   public NodesInfo getNodes(@QueryParam("states") String states) {
249     init();
250     ResourceScheduler sched = this.rm.getResourceScheduler();
251     if (sched == null) {
252       throw new NotFoundException("Null ResourceScheduler instance");
253     }
254 
255     EnumSet<NodeState> acceptedStates;
256     if (states == null) {
257       acceptedStates = EnumSet.allOf(NodeState.class);
258     } else {
259       acceptedStates = EnumSet.noneOf(NodeState.class);
260       for (String stateStr : states.split(",")) {
261         acceptedStates.add(
262             NodeState.valueOf(StringUtils.toUpperCase(stateStr)));
263       }
264     }
265 
266     Collection<RMNode> rmNodes = RMServerUtils.queryRMNodes(this.rm.getRMContext(),
267         acceptedStates);
268     NodesInfo nodesInfo = new NodesInfo();
269     for (RMNode rmNode : rmNodes) {
270       NodeInfo nodeInfo = new NodeInfo(rmNode, sched);
271       if (EnumSet.of(NodeState.LOST, NodeState.DECOMMISSIONED, NodeState.REBOOTED)
272           .contains(rmNode.getState())) {
273         nodeInfo.setNodeHTTPAddress(EMPTY);
274       }
275       nodesInfo.add(nodeInfo);
276     }
277 
278     return nodesInfo;
279   }
280 
281   @GET
282   @Path("/nodes/{nodeId}")
283   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getNode(@athParamR) String nodeId)284   public NodeInfo getNode(@PathParam("nodeId") String nodeId) {
285     init();
286     if (nodeId == null || nodeId.isEmpty()) {
287       throw new NotFoundException("nodeId, " + nodeId + ", is empty or null");
288     }
289     ResourceScheduler sched = this.rm.getResourceScheduler();
290     if (sched == null) {
291       throw new NotFoundException("Null ResourceScheduler instance");
292     }
293     NodeId nid = ConverterUtils.toNodeId(nodeId);
294     RMNode ni = this.rm.getRMContext().getRMNodes().get(nid);
295     boolean isInactive = false;
296     if (ni == null) {
297       ni = this.rm.getRMContext().getInactiveRMNodes().get(nid.getHost());
298       if (ni == null) {
299         throw new NotFoundException("nodeId, " + nodeId + ", is not found");
300       }
301       isInactive = true;
302     }
303     NodeInfo nodeInfo = new NodeInfo(ni, sched);
304     if (isInactive) {
305       nodeInfo.setNodeHTTPAddress(EMPTY);
306     }
307     return nodeInfo;
308   }
309 
310   @GET
311   @Path("/apps")
312   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getApps(@ontext HttpServletRequest hsr, @QueryParam(R) String stateQuery, @QueryParam(R) Set<String> statesQuery, @QueryParam(R) String finalStatusQuery, @QueryParam(R) String userQuery, @QueryParam(R) String queueQuery, @QueryParam(R) String count, @QueryParam(R) String startedBegin, @QueryParam(R) String startedEnd, @QueryParam(R) String finishBegin, @QueryParam(R) String finishEnd, @QueryParam(R) Set<String> applicationTypes, @QueryParam(R) Set<String> applicationTags)313   public AppsInfo getApps(@Context HttpServletRequest hsr,
314       @QueryParam("state") String stateQuery,
315       @QueryParam("states") Set<String> statesQuery,
316       @QueryParam("finalStatus") String finalStatusQuery,
317       @QueryParam("user") String userQuery,
318       @QueryParam("queue") String queueQuery,
319       @QueryParam("limit") String count,
320       @QueryParam("startedTimeBegin") String startedBegin,
321       @QueryParam("startedTimeEnd") String startedEnd,
322       @QueryParam("finishedTimeBegin") String finishBegin,
323       @QueryParam("finishedTimeEnd") String finishEnd,
324       @QueryParam("applicationTypes") Set<String> applicationTypes,
325       @QueryParam("applicationTags") Set<String> applicationTags) {
326     boolean checkCount = false;
327     boolean checkStart = false;
328     boolean checkEnd = false;
329     boolean checkAppTypes = false;
330     boolean checkAppStates = false;
331     boolean checkAppTags = false;
332     long countNum = 0;
333 
334     // set values suitable in case both of begin/end not specified
335     long sBegin = 0;
336     long sEnd = Long.MAX_VALUE;
337     long fBegin = 0;
338     long fEnd = Long.MAX_VALUE;
339 
340     init();
341     if (count != null && !count.isEmpty()) {
342       checkCount = true;
343       countNum = Long.parseLong(count);
344       if (countNum <= 0) {
345         throw new BadRequestException("limit value must be greater then 0");
346       }
347     }
348 
349     if (startedBegin != null && !startedBegin.isEmpty()) {
350       checkStart = true;
351       sBegin = Long.parseLong(startedBegin);
352       if (sBegin < 0) {
353         throw new BadRequestException("startedTimeBegin must be greater than 0");
354       }
355     }
356     if (startedEnd != null && !startedEnd.isEmpty()) {
357       checkStart = true;
358       sEnd = Long.parseLong(startedEnd);
359       if (sEnd < 0) {
360         throw new BadRequestException("startedTimeEnd must be greater than 0");
361       }
362     }
363     if (sBegin > sEnd) {
364       throw new BadRequestException(
365           "startedTimeEnd must be greater than startTimeBegin");
366     }
367 
368     if (finishBegin != null && !finishBegin.isEmpty()) {
369       checkEnd = true;
370       fBegin = Long.parseLong(finishBegin);
371       if (fBegin < 0) {
372         throw new BadRequestException("finishTimeBegin must be greater than 0");
373       }
374     }
375     if (finishEnd != null && !finishEnd.isEmpty()) {
376       checkEnd = true;
377       fEnd = Long.parseLong(finishEnd);
378       if (fEnd < 0) {
379         throw new BadRequestException("finishTimeEnd must be greater than 0");
380       }
381     }
382     if (fBegin > fEnd) {
383       throw new BadRequestException(
384           "finishTimeEnd must be greater than finishTimeBegin");
385     }
386 
387     Set<String> appTypes = parseQueries(applicationTypes, false);
388     if (!appTypes.isEmpty()) {
389       checkAppTypes = true;
390     }
391 
392     Set<String> appTags = parseQueries(applicationTags, false);
393     if (!appTags.isEmpty()) {
394       checkAppTags = true;
395     }
396 
397     // stateQuery is deprecated.
398     if (stateQuery != null && !stateQuery.isEmpty()) {
399       statesQuery.add(stateQuery);
400     }
401     Set<String> appStates = parseQueries(statesQuery, true);
402     if (!appStates.isEmpty()) {
403       checkAppStates = true;
404     }
405 
406     GetApplicationsRequest request = GetApplicationsRequest.newInstance();
407 
408     if (checkStart) {
409       request.setStartRange(sBegin, sEnd);
410     }
411 
412     if (checkEnd) {
413       request.setFinishRange(fBegin, fEnd);
414     }
415 
416     if (checkCount) {
417       request.setLimit(countNum);
418     }
419 
420     if (checkAppTypes) {
421       request.setApplicationTypes(appTypes);
422     }
423 
424     if (checkAppTags) {
425       request.setApplicationTags(appTags);
426     }
427 
428     if (checkAppStates) {
429       request.setApplicationStates(appStates);
430     }
431 
432     if (queueQuery != null && !queueQuery.isEmpty()) {
433       ResourceScheduler rs = rm.getResourceScheduler();
434       if (rs instanceof CapacityScheduler) {
435         CapacityScheduler cs = (CapacityScheduler) rs;
436         // validate queue exists
437         try {
438           cs.getQueueInfo(queueQuery, false, false);
439         } catch (IOException e) {
440           throw new BadRequestException(e.getMessage());
441         }
442       }
443       Set<String> queues = new HashSet<String>(1);
444       queues.add(queueQuery);
445       request.setQueues(queues);
446     }
447 
448     if (userQuery != null && !userQuery.isEmpty()) {
449       Set<String> users = new HashSet<String>(1);
450       users.add(userQuery);
451       request.setUsers(users);
452     }
453 
454     List<ApplicationReport> appReports = null;
455     try {
456       appReports = rm.getClientRMService()
457           .getApplications(request, false).getApplicationList();
458     } catch (YarnException e) {
459       LOG.error("Unable to retrieve apps from ClientRMService", e);
460       throw new YarnRuntimeException(
461           "Unable to retrieve apps from ClientRMService", e);
462     }
463 
464     final ConcurrentMap<ApplicationId, RMApp> apps =
465         rm.getRMContext().getRMApps();
466     AppsInfo allApps = new AppsInfo();
467     for (ApplicationReport report : appReports) {
468       RMApp rmapp = apps.get(report.getApplicationId());
469       if (rmapp == null) {
470         continue;
471       }
472 
473       if (finalStatusQuery != null && !finalStatusQuery.isEmpty()) {
474         FinalApplicationStatus.valueOf(finalStatusQuery);
475         if (!rmapp.getFinalApplicationStatus().toString()
476             .equalsIgnoreCase(finalStatusQuery)) {
477           continue;
478         }
479       }
480 
481       AppInfo app = new AppInfo(rm, rmapp,
482           hasAccess(rmapp, hsr), WebAppUtils.getHttpSchemePrefix(conf));
483       allApps.add(app);
484     }
485     return allApps;
486   }
487 
488   @GET
489   @Path("/appstatistics")
490   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getAppStatistics( @ontext HttpServletRequest hsr, @QueryParam(R) Set<String> stateQueries, @QueryParam(R) Set<String> typeQueries)491   public ApplicationStatisticsInfo getAppStatistics(
492       @Context HttpServletRequest hsr,
493       @QueryParam("states") Set<String> stateQueries,
494       @QueryParam("applicationTypes") Set<String> typeQueries) {
495     init();
496 
497     // parse the params and build the scoreboard
498     // converting state/type name to lowercase
499     Set<String> states = parseQueries(stateQueries, true);
500     Set<String> types = parseQueries(typeQueries, false);
501     // if no types, counts the applications of any types
502     if (types.size() == 0) {
503       types.add(ANY);
504     } else if (types.size() != 1) {
505       throw new BadRequestException("# of applicationTypes = " + types.size()
506           + ", we temporarily support at most one applicationType");
507     }
508     // if no states, returns the counts of all RMAppStates
509     if (states.size() == 0) {
510       for (YarnApplicationState state : YarnApplicationState.values()) {
511         states.add(StringUtils.toLowerCase(state.toString()));
512       }
513     }
514     // in case we extend to multiple applicationTypes in the future
515     Map<YarnApplicationState, Map<String, Long>> scoreboard =
516         buildScoreboard(states, types);
517 
518     // go through the apps in RM to count the numbers, ignoring the case of
519     // the state/type name
520     ConcurrentMap<ApplicationId, RMApp> apps = rm.getRMContext().getRMApps();
521     for (RMApp rmapp : apps.values()) {
522       YarnApplicationState state = rmapp.createApplicationState();
523       String type = StringUtils.toLowerCase(rmapp.getApplicationType().trim());
524       if (states.contains(
525           StringUtils.toLowerCase(state.toString()))) {
526         if (types.contains(ANY)) {
527           countApp(scoreboard, state, ANY);
528         } else if (types.contains(type)) {
529           countApp(scoreboard, state, type);
530         }
531       }
532     }
533 
534     // fill the response object
535     ApplicationStatisticsInfo appStatInfo = new ApplicationStatisticsInfo();
536     for (Map.Entry<YarnApplicationState, Map<String, Long>> partScoreboard
537         : scoreboard.entrySet()) {
538       for (Map.Entry<String, Long> statEntry
539           : partScoreboard.getValue().entrySet()) {
540         StatisticsItemInfo statItem = new StatisticsItemInfo(
541             partScoreboard.getKey(), statEntry.getKey(), statEntry.getValue());
542         appStatInfo.add(statItem);
543       }
544     }
545     return appStatInfo;
546   }
547 
parseQueries( Set<String> queries, boolean isState)548   private static Set<String> parseQueries(
549       Set<String> queries, boolean isState) {
550     Set<String> params = new HashSet<String>();
551     if (!queries.isEmpty()) {
552       for (String query : queries) {
553         if (query != null && !query.trim().isEmpty()) {
554           String[] paramStrs = query.split(",");
555           for (String paramStr : paramStrs) {
556             if (paramStr != null && !paramStr.trim().isEmpty()) {
557               if (isState) {
558                 try {
559                   // enum string is in the uppercase
560                   YarnApplicationState.valueOf(
561                       StringUtils.toUpperCase(paramStr.trim()));
562                 } catch (RuntimeException e) {
563                   YarnApplicationState[] stateArray =
564                       YarnApplicationState.values();
565                   String allAppStates = Arrays.toString(stateArray);
566                   throw new BadRequestException(
567                       "Invalid application-state " + paramStr.trim()
568                       + " specified. It should be one of " + allAppStates);
569                 }
570               }
571               params.add(
572                   StringUtils.toLowerCase(paramStr.trim()));
573             }
574           }
575         }
576       }
577     }
578     return params;
579   }
580 
buildScoreboard( Set<String> states, Set<String> types)581   private static Map<YarnApplicationState, Map<String, Long>> buildScoreboard(
582      Set<String> states, Set<String> types) {
583     Map<YarnApplicationState, Map<String, Long>> scoreboard
584         = new HashMap<YarnApplicationState, Map<String, Long>>();
585     // default states will result in enumerating all YarnApplicationStates
586     assert !states.isEmpty();
587     for (String state : states) {
588       Map<String, Long> partScoreboard = new HashMap<String, Long>();
589       scoreboard.put(
590           YarnApplicationState.valueOf(StringUtils.toUpperCase(state)),
591           partScoreboard);
592       // types is verified no to be empty
593       for (String type : types) {
594         partScoreboard.put(type, 0L);
595       }
596     }
597     return scoreboard;
598   }
599 
countApp( Map<YarnApplicationState, Map<String, Long>> scoreboard, YarnApplicationState state, String type)600   private static void countApp(
601       Map<YarnApplicationState, Map<String, Long>> scoreboard,
602       YarnApplicationState state, String type) {
603     Map<String, Long> partScoreboard = scoreboard.get(state);
604     Long count = partScoreboard.get(type);
605     partScoreboard.put(type, count + 1L);
606   }
607 
608   @GET
609   @Path("/apps/{appid}")
610   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getApp(@ontext HttpServletRequest hsr, @PathParam(R) String appId)611   public AppInfo getApp(@Context HttpServletRequest hsr,
612       @PathParam("appid") String appId) {
613     init();
614     if (appId == null || appId.isEmpty()) {
615       throw new NotFoundException("appId, " + appId + ", is empty or null");
616     }
617     ApplicationId id;
618     id = ConverterUtils.toApplicationId(recordFactory, appId);
619     if (id == null) {
620       throw new NotFoundException("appId is null");
621     }
622     RMApp app = rm.getRMContext().getRMApps().get(id);
623     if (app == null) {
624       throw new NotFoundException("app with id: " + appId + " not found");
625     }
626     return new AppInfo(rm, app, hasAccess(app, hsr), hsr.getScheme() + "://");
627   }
628 
629   @GET
630   @Path("/apps/{appid}/appattempts")
631   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getAppAttempts(@ontext HttpServletRequest hsr, @PathParam(R) String appId)632   public AppAttemptsInfo getAppAttempts(@Context HttpServletRequest hsr,
633       @PathParam("appid") String appId) {
634 
635     init();
636     if (appId == null || appId.isEmpty()) {
637       throw new NotFoundException("appId, " + appId + ", is empty or null");
638     }
639     ApplicationId id;
640     id = ConverterUtils.toApplicationId(recordFactory, appId);
641     if (id == null) {
642       throw new NotFoundException("appId is null");
643     }
644     RMApp app = rm.getRMContext().getRMApps().get(id);
645     if (app == null) {
646       throw new NotFoundException("app with id: " + appId + " not found");
647     }
648 
649     AppAttemptsInfo appAttemptsInfo = new AppAttemptsInfo();
650     for (RMAppAttempt attempt : app.getAppAttempts().values()) {
651       AppAttemptInfo attemptInfo =
652           new AppAttemptInfo(rm, attempt, app.getUser(), hsr.getScheme()
653               + "://");
654       appAttemptsInfo.add(attemptInfo);
655     }
656 
657     return appAttemptsInfo;
658   }
659 
660   @GET
661   @Path("/apps/{appid}/state")
662   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getAppState(@ontext HttpServletRequest hsr, @PathParam(R) String appId)663   public AppState getAppState(@Context HttpServletRequest hsr,
664       @PathParam("appid") String appId) throws AuthorizationException {
665     init();
666     UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
667     String userName = "";
668     if (callerUGI != null) {
669       userName = callerUGI.getUserName();
670     }
671     RMApp app = null;
672     try {
673       app = getRMAppForAppId(appId);
674     } catch (NotFoundException e) {
675       RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
676         "UNKNOWN", "RMWebService",
677         "Trying to get state of an absent application " + appId);
678       throw e;
679     }
680 
681     AppState ret = new AppState();
682     ret.setState(app.getState().toString());
683 
684     return ret;
685   }
686 
687   // can't return POJO because we can't control the status code
688   // it's always set to 200 when we need to allow it to be set
689   // to 202
690 
691   @PUT
692   @Path("/apps/{appid}/state")
693   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
694   @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
updateAppState(AppState targetState, @Context HttpServletRequest hsr, @PathParam(R) String appId)695   public Response updateAppState(AppState targetState,
696       @Context HttpServletRequest hsr, @PathParam("appid") String appId)
697       throws AuthorizationException, YarnException, InterruptedException,
698       IOException {
699 
700     init();
701     UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
702     if (callerUGI == null) {
703       String msg = "Unable to obtain user name, user not authenticated";
704       throw new AuthorizationException(msg);
705     }
706 
707     if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) {
708       String msg = "The default static user cannot carry out this operation.";
709       return Response.status(Status.FORBIDDEN).entity(msg).build();
710     }
711 
712     String userName = callerUGI.getUserName();
713     RMApp app = null;
714     try {
715       app = getRMAppForAppId(appId);
716     } catch (NotFoundException e) {
717       RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
718         "UNKNOWN", "RMWebService", "Trying to kill an absent application "
719             + appId);
720       throw e;
721     }
722 
723     if (!app.getState().toString().equals(targetState.getState())) {
724       // user is attempting to change state. right we only
725       // allow users to kill the app
726 
727       if (targetState.getState().equals(YarnApplicationState.KILLED.toString())) {
728         return killApp(app, callerUGI, hsr);
729       }
730       throw new BadRequestException("Only '"
731           + YarnApplicationState.KILLED.toString()
732           + "' is allowed as a target state.");
733     }
734 
735     AppState ret = new AppState();
736     ret.setState(app.getState().toString());
737 
738     return Response.status(Status.OK).entity(ret).build();
739   }
740 
741   @GET
742   @Path("/get-node-to-labels")
743   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getNodeToLabels(@ontext HttpServletRequest hsr)744   public NodeToLabelsInfo getNodeToLabels(@Context HttpServletRequest hsr)
745     throws IOException {
746     init();
747 
748     NodeToLabelsInfo ntl = new NodeToLabelsInfo();
749     HashMap<String, NodeLabelsInfo> ntlMap = ntl.getNodeToLabels();
750     Map<NodeId, Set<String>> nodeIdToLabels =
751       rm.getRMContext().getNodeLabelManager().getNodeLabels();
752 
753     for (Map.Entry<NodeId, Set<String>> nitle : nodeIdToLabels.entrySet()) {
754       ntlMap.put(nitle.getKey().toString(),
755         new NodeLabelsInfo(nitle.getValue()));
756     }
757 
758     return ntl;
759   }
760 
761   @POST
762   @Path("/replace-node-to-labels")
763   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
replaceLabelsOnNodes( final NodeToLabelsInfo newNodeToLabels, @Context HttpServletRequest hsr)764   public Response replaceLabelsOnNodes(
765     final NodeToLabelsInfo newNodeToLabels,
766     @Context HttpServletRequest hsr)
767     throws IOException {
768     init();
769 
770     UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
771     if (callerUGI == null) {
772       String msg = "Unable to obtain user name, user not authenticated for"
773         + " post to .../replace-node-to-labels";
774       throw new AuthorizationException(msg);
775     }
776     if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) {
777       String msg = "User " + callerUGI.getShortUserName() + " not authorized"
778         + " for post to .../replace-node-to-labels ";
779       throw new AuthorizationException(msg);
780     }
781 
782     Map<NodeId, Set<String>> nodeIdToLabels =
783       new HashMap<NodeId, Set<String>>();
784 
785     for (Map.Entry<String, NodeLabelsInfo> nitle :
786       newNodeToLabels.getNodeToLabels().entrySet()) {
787      nodeIdToLabels.put(ConverterUtils.toNodeIdWithDefaultPort(nitle.getKey()),
788        new HashSet<String>(nitle.getValue().getNodeLabels()));
789     }
790 
791     rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode(nodeIdToLabels);
792 
793     return Response.status(Status.OK).build();
794   }
795 
796   @GET
797   @Path("/get-node-labels")
798   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getClusterNodeLabels(@ontext HttpServletRequest hsr)799   public NodeLabelsInfo getClusterNodeLabels(@Context HttpServletRequest hsr)
800     throws IOException {
801     init();
802 
803     NodeLabelsInfo ret =
804       new NodeLabelsInfo(rm.getRMContext().getNodeLabelManager()
805         .getClusterNodeLabels());
806 
807     return ret;
808   }
809 
810   @POST
811   @Path("/add-node-labels")
812   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
addToClusterNodeLabels(final NodeLabelsInfo newNodeLabels, @Context HttpServletRequest hsr)813   public Response addToClusterNodeLabels(final NodeLabelsInfo newNodeLabels,
814       @Context HttpServletRequest hsr)
815       throws Exception {
816     init();
817 
818     UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
819     if (callerUGI == null) {
820       String msg = "Unable to obtain user name, user not authenticated for"
821         + " post to .../add-node-labels";
822       throw new AuthorizationException(msg);
823     }
824     if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) {
825       String msg = "User " + callerUGI.getShortUserName() + " not authorized"
826         + " for post to .../add-node-labels ";
827       throw new AuthorizationException(msg);
828     }
829 
830     rm.getRMContext().getNodeLabelManager()
831         .addToCluserNodeLabels(new HashSet<String>(
832           newNodeLabels.getNodeLabels()));
833 
834     return Response.status(Status.OK).build();
835 
836   }
837 
838   @POST
839   @Path("/remove-node-labels")
840   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
removeFromCluserNodeLabels(final NodeLabelsInfo oldNodeLabels, @Context HttpServletRequest hsr)841   public Response removeFromCluserNodeLabels(final NodeLabelsInfo oldNodeLabels,
842       @Context HttpServletRequest hsr)
843       throws Exception {
844     init();
845 
846     UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
847     if (callerUGI == null) {
848       String msg = "Unable to obtain user name, user not authenticated for"
849         + " post to .../remove-node-labels";
850       throw new AuthorizationException(msg);
851     }
852     if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) {
853       String msg = "User " + callerUGI.getShortUserName() + " not authorized"
854         + " for post to .../remove-node-labels ";
855       throw new AuthorizationException(msg);
856     }
857 
858     rm.getRMContext().getNodeLabelManager()
859         .removeFromClusterNodeLabels(new HashSet<String>(
860           oldNodeLabels.getNodeLabels()));
861 
862     return Response.status(Status.OK).build();
863 
864   }
865 
866   @GET
867   @Path("/nodes/{nodeId}/get-labels")
868   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getLabelsOnNode(@ontext HttpServletRequest hsr, @PathParam(R) String nodeId)869   public NodeLabelsInfo getLabelsOnNode(@Context HttpServletRequest hsr,
870                                   @PathParam("nodeId") String nodeId)
871     throws IOException {
872     init();
873 
874     NodeId nid = ConverterUtils.toNodeIdWithDefaultPort(nodeId);
875     return new NodeLabelsInfo(
876       rm.getRMContext().getNodeLabelManager().getLabelsOnNode(nid));
877 
878   }
879 
880   @POST
881   @Path("/nodes/{nodeId}/replace-labels")
882   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
replaceLabelsOnNode(NodeLabelsInfo newNodeLabelsInfo, @Context HttpServletRequest hsr, @PathParam(R) String nodeId)883   public Response replaceLabelsOnNode(NodeLabelsInfo newNodeLabelsInfo,
884       @Context HttpServletRequest hsr, @PathParam("nodeId") String nodeId)
885       throws Exception {
886     init();
887 
888     UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
889     if (callerUGI == null) {
890       String msg = "Unable to obtain user name, user not authenticated for"
891         + " post to .../nodes/nodeid/replace-labels";
892       throw new AuthorizationException(msg);
893     }
894 
895     if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) {
896       String msg = "User " + callerUGI.getShortUserName() + " not authorized"
897         + " for post to .../nodes/nodeid/replace-labels";
898       throw new AuthorizationException(msg);
899     }
900 
901     NodeId nid = ConverterUtils.toNodeIdWithDefaultPort(nodeId);
902 
903     Map<NodeId, Set<String>> newLabelsForNode = new HashMap<NodeId,
904       Set<String>>();
905 
906     newLabelsForNode.put(nid, new HashSet<String>(newNodeLabelsInfo.getNodeLabels()));
907 
908     rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode(newLabelsForNode);
909 
910     return Response.status(Status.OK).build();
911 
912   }
913 
killApp(RMApp app, UserGroupInformation callerUGI, HttpServletRequest hsr)914   protected Response killApp(RMApp app, UserGroupInformation callerUGI,
915       HttpServletRequest hsr) throws IOException, InterruptedException {
916 
917     if (app == null) {
918       throw new IllegalArgumentException("app cannot be null");
919     }
920     String userName = callerUGI.getUserName();
921     final ApplicationId appid = app.getApplicationId();
922     KillApplicationResponse resp = null;
923     try {
924       resp =
925           callerUGI
926             .doAs(new PrivilegedExceptionAction<KillApplicationResponse>() {
927               @Override
928               public KillApplicationResponse run() throws IOException,
929                   YarnException {
930                 KillApplicationRequest req =
931                     KillApplicationRequest.newInstance(appid);
932                 return rm.getClientRMService().forceKillApplication(req);
933               }
934             });
935     } catch (UndeclaredThrowableException ue) {
936       // if the root cause is a permissions issue
937       // bubble that up to the user
938       if (ue.getCause() instanceof YarnException) {
939         YarnException ye = (YarnException) ue.getCause();
940         if (ye.getCause() instanceof AccessControlException) {
941           String appId = app.getApplicationId().toString();
942           String msg =
943               "Unauthorized attempt to kill appid " + appId
944                   + " by remote user " + userName;
945           return Response.status(Status.FORBIDDEN).entity(msg).build();
946         } else {
947           throw ue;
948         }
949       } else {
950         throw ue;
951       }
952     }
953 
954     AppState ret = new AppState();
955     ret.setState(app.getState().toString());
956 
957     if (resp.getIsKillCompleted()) {
958       RMAuditLogger.logSuccess(userName, AuditConstants.KILL_APP_REQUEST,
959         "RMWebService", app.getApplicationId());
960     } else {
961       return Response.status(Status.ACCEPTED).entity(ret)
962         .header(HttpHeaders.LOCATION, hsr.getRequestURL()).build();
963     }
964     return Response.status(Status.OK).entity(ret).build();
965   }
966 
967   @GET
968   @Path("/apps/{appid}/queue")
969   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
getAppQueue(@ontext HttpServletRequest hsr, @PathParam(R) String appId)970   public AppQueue getAppQueue(@Context HttpServletRequest hsr,
971       @PathParam("appid") String appId) throws AuthorizationException {
972     init();
973     UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
974     String userName = "UNKNOWN-USER";
975     if (callerUGI != null) {
976       userName = callerUGI.getUserName();
977     }
978     RMApp app = null;
979     try {
980       app = getRMAppForAppId(appId);
981     } catch (NotFoundException e) {
982       RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
983         "UNKNOWN", "RMWebService",
984         "Trying to get state of an absent application " + appId);
985       throw e;
986     }
987 
988     AppQueue ret = new AppQueue();
989     ret.setQueue(app.getQueue());
990 
991     return ret;
992   }
993 
994   @PUT
995   @Path("/apps/{appid}/queue")
996   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
997   @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
updateAppQueue(AppQueue targetQueue, @Context HttpServletRequest hsr, @PathParam(R) String appId)998   public Response updateAppQueue(AppQueue targetQueue,
999       @Context HttpServletRequest hsr, @PathParam("appid") String appId)
1000       throws AuthorizationException, YarnException, InterruptedException,
1001       IOException {
1002 
1003     init();
1004     UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
1005     if (callerUGI == null) {
1006       String msg = "Unable to obtain user name, user not authenticated";
1007       throw new AuthorizationException(msg);
1008     }
1009 
1010     if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) {
1011       String msg = "The default static user cannot carry out this operation.";
1012       return Response.status(Status.FORBIDDEN).entity(msg).build();
1013     }
1014 
1015     String userName = callerUGI.getUserName();
1016     RMApp app = null;
1017     try {
1018       app = getRMAppForAppId(appId);
1019     } catch (NotFoundException e) {
1020       RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
1021         "UNKNOWN", "RMWebService", "Trying to move an absent application "
1022             + appId);
1023       throw e;
1024     }
1025 
1026     if (!app.getQueue().equals(targetQueue.getQueue())) {
1027       // user is attempting to change queue.
1028       return moveApp(app, callerUGI, targetQueue.getQueue());
1029     }
1030 
1031     AppQueue ret = new AppQueue();
1032     ret.setQueue(app.getQueue());
1033 
1034     return Response.status(Status.OK).entity(ret).build();
1035   }
1036 
moveApp(RMApp app, UserGroupInformation callerUGI, String targetQueue)1037   protected Response moveApp(RMApp app, UserGroupInformation callerUGI,
1038       String targetQueue) throws IOException, InterruptedException {
1039 
1040     if (app == null) {
1041       throw new IllegalArgumentException("app cannot be null");
1042     }
1043     String userName = callerUGI.getUserName();
1044     final ApplicationId appid = app.getApplicationId();
1045     final String reqTargetQueue = targetQueue;
1046     try {
1047       callerUGI
1048         .doAs(new PrivilegedExceptionAction<Void>() {
1049           @Override
1050           public Void run() throws IOException,
1051               YarnException {
1052             MoveApplicationAcrossQueuesRequest req =
1053                 MoveApplicationAcrossQueuesRequest.newInstance(appid,
1054                   reqTargetQueue);
1055             rm.getClientRMService().moveApplicationAcrossQueues(req);
1056             return null;
1057           }
1058         });
1059     } catch (UndeclaredThrowableException ue) {
1060       // if the root cause is a permissions issue
1061       // bubble that up to the user
1062       if (ue.getCause() instanceof YarnException) {
1063         YarnException ye = (YarnException) ue.getCause();
1064         if (ye.getCause() instanceof AccessControlException) {
1065           String appId = app.getApplicationId().toString();
1066           String msg =
1067               "Unauthorized attempt to move appid " + appId
1068                   + " by remote user " + userName;
1069           return Response.status(Status.FORBIDDEN).entity(msg).build();
1070         } else if (ye.getMessage().startsWith("App in")
1071             && ye.getMessage().endsWith("state cannot be moved.")) {
1072           return Response.status(Status.BAD_REQUEST).entity(ye.getMessage())
1073             .build();
1074         } else {
1075           throw ue;
1076         }
1077       } else {
1078         throw ue;
1079       }
1080     }
1081 
1082     AppQueue ret = new AppQueue();
1083     ret.setQueue(app.getQueue());
1084     return Response.status(Status.OK).entity(ret).build();
1085   }
1086 
getRMAppForAppId(String appId)1087   private RMApp getRMAppForAppId(String appId) {
1088 
1089     if (appId == null || appId.isEmpty()) {
1090       throw new NotFoundException("appId, " + appId + ", is empty or null");
1091     }
1092     ApplicationId id;
1093     try {
1094       id = ConverterUtils.toApplicationId(recordFactory, appId);
1095     } catch (NumberFormatException e) {
1096       throw new NotFoundException("appId is invalid");
1097     }
1098     if (id == null) {
1099       throw new NotFoundException("appId is invalid");
1100     }
1101     RMApp app = rm.getRMContext().getRMApps().get(id);
1102     if (app == null) {
1103       throw new NotFoundException("app with id: " + appId + " not found");
1104     }
1105     return app;
1106   }
1107 
getCallerUserGroupInformation( HttpServletRequest hsr, boolean usePrincipal)1108   private UserGroupInformation getCallerUserGroupInformation(
1109       HttpServletRequest hsr, boolean usePrincipal) {
1110 
1111     String remoteUser = hsr.getRemoteUser();
1112     if (usePrincipal) {
1113       Principal princ = hsr.getUserPrincipal();
1114       remoteUser = princ == null ? null : princ.getName();
1115     }
1116 
1117     UserGroupInformation callerUGI = null;
1118     if (remoteUser != null) {
1119       callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
1120     }
1121 
1122     return callerUGI;
1123   }
1124 
isStaticUser(UserGroupInformation callerUGI)1125   private boolean isStaticUser(UserGroupInformation callerUGI) {
1126     String staticUser =
1127         conf.get(CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER,
1128           CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER);
1129     return staticUser.equals(callerUGI.getUserName());
1130   }
1131 
1132   /**
1133    * Generates a new ApplicationId which is then sent to the client
1134    *
1135    * @param hsr
1136    *          the servlet request
1137    * @return Response containing the app id and the maximum resource
1138    *         capabilities
1139    * @throws AuthorizationException
1140    * @throws IOException
1141    * @throws InterruptedException
1142    */
1143   @POST
1144   @Path("/apps/new-application")
1145   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
createNewApplication(@ontext HttpServletRequest hsr)1146   public Response createNewApplication(@Context HttpServletRequest hsr)
1147       throws AuthorizationException, IOException, InterruptedException {
1148     init();
1149     UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
1150     if (callerUGI == null) {
1151       throw new AuthorizationException("Unable to obtain user name, "
1152           + "user not authenticated");
1153     }
1154     if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) {
1155       String msg = "The default static user cannot carry out this operation.";
1156       return Response.status(Status.FORBIDDEN).entity(msg).build();
1157     }
1158 
1159     NewApplication appId = createNewApplication();
1160     return Response.status(Status.OK).entity(appId).build();
1161 
1162   }
1163 
1164   // reuse the code in ClientRMService to create new app
1165   // get the new app id and submit app
1166   // set location header with new app location
1167   /**
1168    * Function to submit an app to the RM
1169    *
1170    * @param newApp
1171    *          structure containing information to construct the
1172    *          ApplicationSubmissionContext
1173    * @param hsr
1174    *          the servlet request
1175    * @return Response containing the status code
1176    * @throws AuthorizationException
1177    * @throws IOException
1178    * @throws InterruptedException
1179    */
1180   @POST
1181   @Path("/apps")
1182   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
1183   @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
submitApplication(ApplicationSubmissionContextInfo newApp, @Context HttpServletRequest hsr)1184   public Response submitApplication(ApplicationSubmissionContextInfo newApp,
1185       @Context HttpServletRequest hsr) throws AuthorizationException,
1186       IOException, InterruptedException {
1187 
1188     init();
1189     UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
1190     if (callerUGI == null) {
1191       throw new AuthorizationException("Unable to obtain user name, "
1192           + "user not authenticated");
1193     }
1194 
1195     if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) {
1196       String msg = "The default static user cannot carry out this operation.";
1197       return Response.status(Status.FORBIDDEN).entity(msg).build();
1198     }
1199 
1200     ApplicationSubmissionContext appContext =
1201         createAppSubmissionContext(newApp);
1202     final SubmitApplicationRequest req =
1203         SubmitApplicationRequest.newInstance(appContext);
1204 
1205     try {
1206       callerUGI
1207         .doAs(new PrivilegedExceptionAction<SubmitApplicationResponse>() {
1208           @Override
1209           public SubmitApplicationResponse run() throws IOException,
1210               YarnException {
1211             return rm.getClientRMService().submitApplication(req);
1212           }
1213         });
1214     } catch (UndeclaredThrowableException ue) {
1215       if (ue.getCause() instanceof YarnException) {
1216         throw new BadRequestException(ue.getCause().getMessage());
1217       }
1218       LOG.info("Submit app request failed", ue);
1219       throw ue;
1220     }
1221 
1222     String url = hsr.getRequestURL() + "/" + newApp.getApplicationId();
1223     return Response.status(Status.ACCEPTED).header(HttpHeaders.LOCATION, url)
1224       .build();
1225   }
1226 
1227   /**
1228    * Function that actually creates the ApplicationId by calling the
1229    * ClientRMService
1230    *
1231    * @return returns structure containing the app-id and maximum resource
1232    *         capabilities
1233    */
createNewApplication()1234   private NewApplication createNewApplication() {
1235     GetNewApplicationRequest req =
1236         recordFactory.newRecordInstance(GetNewApplicationRequest.class);
1237     GetNewApplicationResponse resp;
1238     try {
1239       resp = rm.getClientRMService().getNewApplication(req);
1240     } catch (YarnException e) {
1241       String msg = "Unable to create new app from RM web service";
1242       LOG.error(msg, e);
1243       throw new YarnRuntimeException(msg, e);
1244     }
1245     NewApplication appId =
1246         new NewApplication(resp.getApplicationId().toString(),
1247           new ResourceInfo(resp.getMaximumResourceCapability()));
1248     return appId;
1249   }
1250 
1251   /**
1252    * Create the actual ApplicationSubmissionContext to be submitted to the RM
1253    * from the information provided by the user.
1254    *
1255    * @param newApp
1256    *          the information provided by the user
1257    * @return returns the constructed ApplicationSubmissionContext
1258    * @throws IOException
1259    */
createAppSubmissionContext( ApplicationSubmissionContextInfo newApp)1260   protected ApplicationSubmissionContext createAppSubmissionContext(
1261       ApplicationSubmissionContextInfo newApp) throws IOException {
1262 
1263     // create local resources and app submission context
1264 
1265     ApplicationId appid;
1266     String error =
1267         "Could not parse application id " + newApp.getApplicationId();
1268     try {
1269       appid =
1270           ConverterUtils.toApplicationId(recordFactory,
1271             newApp.getApplicationId());
1272     } catch (Exception e) {
1273       throw new BadRequestException(error);
1274     }
1275     ApplicationSubmissionContext appContext =
1276         ApplicationSubmissionContext.newInstance(appid,
1277           newApp.getApplicationName(), newApp.getQueue(),
1278           Priority.newInstance(newApp.getPriority()),
1279           createContainerLaunchContext(newApp), newApp.getUnmanagedAM(),
1280           newApp.getCancelTokensWhenComplete(), newApp.getMaxAppAttempts(),
1281           createAppSubmissionContextResource(newApp),
1282           newApp.getApplicationType(),
1283           newApp.getKeepContainersAcrossApplicationAttempts(),
1284           newApp.getAppNodeLabelExpression(),
1285           newApp.getAMContainerNodeLabelExpression());
1286     appContext.setApplicationTags(newApp.getApplicationTags());
1287 
1288     return appContext;
1289   }
1290 
createAppSubmissionContextResource( ApplicationSubmissionContextInfo newApp)1291   protected Resource createAppSubmissionContextResource(
1292       ApplicationSubmissionContextInfo newApp) throws BadRequestException {
1293     if (newApp.getResource().getvCores() > rm.getConfig().getInt(
1294       YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
1295       YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)) {
1296       String msg = "Requested more cores than configured max";
1297       throw new BadRequestException(msg);
1298     }
1299     if (newApp.getResource().getMemory() > rm.getConfig().getInt(
1300       YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
1301       YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)) {
1302       String msg = "Requested more memory than configured max";
1303       throw new BadRequestException(msg);
1304     }
1305     Resource r =
1306         Resource.newInstance(newApp.getResource().getMemory(), newApp
1307           .getResource().getvCores());
1308     return r;
1309   }
1310 
1311   /**
1312    * Create the ContainerLaunchContext required for the
1313    * ApplicationSubmissionContext. This function takes the user information and
1314    * generates the ByteBuffer structures required by the ContainerLaunchContext
1315    *
1316    * @param newApp
1317    *          the information provided by the user
1318    * @return created context
1319    * @throws BadRequestException
1320    * @throws IOException
1321    */
createContainerLaunchContext( ApplicationSubmissionContextInfo newApp)1322   protected ContainerLaunchContext createContainerLaunchContext(
1323       ApplicationSubmissionContextInfo newApp) throws BadRequestException,
1324       IOException {
1325 
1326     // create container launch context
1327 
1328     HashMap<String, ByteBuffer> hmap = new HashMap<String, ByteBuffer>();
1329     for (Map.Entry<String, String> entry : newApp
1330       .getContainerLaunchContextInfo().getAuxillaryServiceData().entrySet()) {
1331       if (entry.getValue().isEmpty() == false) {
1332         Base64 decoder = new Base64(0, null, true);
1333         byte[] data = decoder.decode(entry.getValue());
1334         hmap.put(entry.getKey(), ByteBuffer.wrap(data));
1335       }
1336     }
1337 
1338     HashMap<String, LocalResource> hlr = new HashMap<String, LocalResource>();
1339     for (Map.Entry<String, LocalResourceInfo> entry : newApp
1340       .getContainerLaunchContextInfo().getResources().entrySet()) {
1341       LocalResourceInfo l = entry.getValue();
1342       LocalResource lr =
1343           LocalResource.newInstance(
1344             ConverterUtils.getYarnUrlFromURI(l.getUrl()), l.getType(),
1345             l.getVisibility(), l.getSize(), l.getTimestamp());
1346       hlr.put(entry.getKey(), lr);
1347     }
1348 
1349     DataOutputBuffer out = new DataOutputBuffer();
1350     Credentials cs =
1351         createCredentials(newApp.getContainerLaunchContextInfo()
1352           .getCredentials());
1353     cs.writeTokenStorageToStream(out);
1354     ByteBuffer tokens = ByteBuffer.wrap(out.getData());
1355 
1356     ContainerLaunchContext ctx =
1357         ContainerLaunchContext.newInstance(hlr, newApp
1358           .getContainerLaunchContextInfo().getEnvironment(), newApp
1359           .getContainerLaunchContextInfo().getCommands(), hmap, tokens, newApp
1360           .getContainerLaunchContextInfo().getAcls());
1361 
1362     return ctx;
1363   }
1364 
1365   /**
1366    * Generate a Credentials object from the information in the CredentialsInfo
1367    * object.
1368    *
1369    * @param credentials
1370    *          the CredentialsInfo provided by the user.
1371    * @return
1372    */
createCredentials(CredentialsInfo credentials)1373   private Credentials createCredentials(CredentialsInfo credentials) {
1374     Credentials ret = new Credentials();
1375     try {
1376       for (Map.Entry<String, String> entry : credentials.getTokens().entrySet()) {
1377         Text alias = new Text(entry.getKey());
1378         Token<TokenIdentifier> token = new Token<TokenIdentifier>();
1379         token.decodeFromUrlString(entry.getValue());
1380         ret.addToken(alias, token);
1381       }
1382       for (Map.Entry<String, String> entry : credentials.getSecrets().entrySet()) {
1383         Text alias = new Text(entry.getKey());
1384         Base64 decoder = new Base64(0, null, true);
1385         byte[] secret = decoder.decode(entry.getValue());
1386         ret.addSecretKey(alias, secret);
1387       }
1388     } catch (IOException ie) {
1389       throw new BadRequestException(
1390         "Could not parse credentials data; exception message = "
1391             + ie.getMessage());
1392     }
1393     return ret;
1394   }
1395 
createKerberosUserGroupInformation( HttpServletRequest hsr)1396   private UserGroupInformation createKerberosUserGroupInformation(
1397       HttpServletRequest hsr) throws AuthorizationException, YarnException {
1398 
1399     UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
1400     if (callerUGI == null) {
1401       String msg = "Unable to obtain user name, user not authenticated";
1402       throw new AuthorizationException(msg);
1403     }
1404 
1405     String authType = hsr.getAuthType();
1406     if (!KerberosAuthenticationHandler.TYPE.equalsIgnoreCase(authType)) {
1407       String msg =
1408           "Delegation token operations can only be carried out on a "
1409               + "Kerberos authenticated channel. Expected auth type is "
1410               + KerberosAuthenticationHandler.TYPE + ", got type " + authType;
1411       throw new YarnException(msg);
1412     }
1413     if (hsr
1414       .getAttribute(DelegationTokenAuthenticationHandler.DELEGATION_TOKEN_UGI_ATTRIBUTE) != null) {
1415       String msg =
1416           "Delegation token operations cannot be carried out using delegation"
1417               + " token authentication.";
1418       throw new YarnException(msg);
1419     }
1420 
1421     callerUGI.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
1422     return callerUGI;
1423   }
1424 
1425   @POST
1426   @Path("/delegation-token")
1427   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
1428   @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
postDelegationToken(DelegationToken tokenData, @Context HttpServletRequest hsr)1429   public Response postDelegationToken(DelegationToken tokenData,
1430       @Context HttpServletRequest hsr) throws AuthorizationException,
1431       IOException, InterruptedException, Exception {
1432 
1433     init();
1434     UserGroupInformation callerUGI;
1435     try {
1436       callerUGI = createKerberosUserGroupInformation(hsr);
1437     } catch (YarnException ye) {
1438       return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
1439     }
1440     return createDelegationToken(tokenData, hsr, callerUGI);
1441   }
1442 
1443   @POST
1444   @Path("/delegation-token/expiration")
1445   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
1446   @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
1447   public Response
postDelegationTokenExpiration(@ontext HttpServletRequest hsr)1448       postDelegationTokenExpiration(@Context HttpServletRequest hsr)
1449           throws AuthorizationException, IOException, InterruptedException,
1450           Exception {
1451 
1452     init();
1453     UserGroupInformation callerUGI;
1454     try {
1455       callerUGI = createKerberosUserGroupInformation(hsr);
1456     } catch (YarnException ye) {
1457       return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
1458     }
1459 
1460     DelegationToken requestToken = new DelegationToken();
1461     requestToken.setToken(extractToken(hsr).encodeToUrlString());
1462     return renewDelegationToken(requestToken, hsr, callerUGI);
1463   }
1464 
createDelegationToken(DelegationToken tokenData, HttpServletRequest hsr, UserGroupInformation callerUGI)1465   private Response createDelegationToken(DelegationToken tokenData,
1466       HttpServletRequest hsr, UserGroupInformation callerUGI)
1467       throws AuthorizationException, IOException, InterruptedException,
1468       Exception {
1469 
1470     final String renewer = tokenData.getRenewer();
1471     GetDelegationTokenResponse resp;
1472     try {
1473       resp =
1474           callerUGI
1475             .doAs(new PrivilegedExceptionAction<GetDelegationTokenResponse>() {
1476               @Override
1477               public GetDelegationTokenResponse run() throws IOException,
1478                   YarnException {
1479                 GetDelegationTokenRequest createReq =
1480                     GetDelegationTokenRequest.newInstance(renewer);
1481                 return rm.getClientRMService().getDelegationToken(createReq);
1482               }
1483             });
1484     } catch (Exception e) {
1485       LOG.info("Create delegation token request failed", e);
1486       throw e;
1487     }
1488 
1489     Token<RMDelegationTokenIdentifier> tk =
1490         new Token<RMDelegationTokenIdentifier>(resp.getRMDelegationToken()
1491           .getIdentifier().array(), resp.getRMDelegationToken().getPassword()
1492           .array(), new Text(resp.getRMDelegationToken().getKind()), new Text(
1493           resp.getRMDelegationToken().getService()));
1494     RMDelegationTokenIdentifier identifier = tk.decodeIdentifier();
1495     long currentExpiration =
1496         rm.getRMContext().getRMDelegationTokenSecretManager()
1497           .getRenewDate(identifier);
1498     DelegationToken respToken =
1499         new DelegationToken(tk.encodeToUrlString(), renewer, identifier
1500           .getOwner().toString(), tk.getKind().toString(), currentExpiration,
1501           identifier.getMaxDate());
1502     return Response.status(Status.OK).entity(respToken).build();
1503   }
1504 
renewDelegationToken(DelegationToken tokenData, HttpServletRequest hsr, UserGroupInformation callerUGI)1505   private Response renewDelegationToken(DelegationToken tokenData,
1506       HttpServletRequest hsr, UserGroupInformation callerUGI)
1507       throws AuthorizationException, IOException, InterruptedException,
1508       Exception {
1509 
1510     Token<RMDelegationTokenIdentifier> token =
1511         extractToken(tokenData.getToken());
1512 
1513     org.apache.hadoop.yarn.api.records.Token dToken =
1514         BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind()
1515           .toString(), token.getPassword(), token.getService().toString());
1516     final RenewDelegationTokenRequest req =
1517         RenewDelegationTokenRequest.newInstance(dToken);
1518 
1519     RenewDelegationTokenResponse resp;
1520     try {
1521       resp =
1522           callerUGI
1523             .doAs(new PrivilegedExceptionAction<RenewDelegationTokenResponse>() {
1524               @Override
1525               public RenewDelegationTokenResponse run() throws IOException,
1526                   YarnException {
1527                 return rm.getClientRMService().renewDelegationToken(req);
1528               }
1529             });
1530     } catch (UndeclaredThrowableException ue) {
1531       if (ue.getCause() instanceof YarnException) {
1532         if (ue.getCause().getCause() instanceof InvalidToken) {
1533           throw new BadRequestException(ue.getCause().getCause().getMessage());
1534         } else if (ue.getCause().getCause() instanceof org.apache.hadoop.security.AccessControlException) {
1535           return Response.status(Status.FORBIDDEN)
1536             .entity(ue.getCause().getCause().getMessage()).build();
1537         }
1538         LOG.info("Renew delegation token request failed", ue);
1539         throw ue;
1540       }
1541       LOG.info("Renew delegation token request failed", ue);
1542       throw ue;
1543     } catch (Exception e) {
1544       LOG.info("Renew delegation token request failed", e);
1545       throw e;
1546     }
1547     long renewTime = resp.getNextExpirationTime();
1548 
1549     DelegationToken respToken = new DelegationToken();
1550     respToken.setNextExpirationTime(renewTime);
1551     return Response.status(Status.OK).entity(respToken).build();
1552   }
1553 
1554   // For cancelling tokens, the encoded token is passed as a header
1555   // There are two reasons for this -
1556   // 1. Passing a request body as part of a DELETE request is not
1557   // allowed by Jetty
1558   // 2. Passing the encoded token as part of the url is not ideal
1559   // since urls tend to get logged and anyone with access to
1560   // the logs can extract tokens which are meant to be secret
1561   @DELETE
1562   @Path("/delegation-token")
1563   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
cancelDelegationToken(@ontext HttpServletRequest hsr)1564   public Response cancelDelegationToken(@Context HttpServletRequest hsr)
1565       throws AuthorizationException, IOException, InterruptedException,
1566       Exception {
1567 
1568     init();
1569     UserGroupInformation callerUGI;
1570     try {
1571       callerUGI = createKerberosUserGroupInformation(hsr);
1572     } catch (YarnException ye) {
1573       return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
1574     }
1575 
1576     Token<RMDelegationTokenIdentifier> token = extractToken(hsr);
1577 
1578     org.apache.hadoop.yarn.api.records.Token dToken =
1579         BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind()
1580           .toString(), token.getPassword(), token.getService().toString());
1581     final CancelDelegationTokenRequest req =
1582         CancelDelegationTokenRequest.newInstance(dToken);
1583 
1584     try {
1585       callerUGI
1586         .doAs(new PrivilegedExceptionAction<CancelDelegationTokenResponse>() {
1587           @Override
1588           public CancelDelegationTokenResponse run() throws IOException,
1589               YarnException {
1590             return rm.getClientRMService().cancelDelegationToken(req);
1591           }
1592         });
1593     } catch (UndeclaredThrowableException ue) {
1594       if (ue.getCause() instanceof YarnException) {
1595         if (ue.getCause().getCause() instanceof InvalidToken) {
1596           throw new BadRequestException(ue.getCause().getCause().getMessage());
1597         } else if (ue.getCause().getCause() instanceof org.apache.hadoop.security.AccessControlException) {
1598           return Response.status(Status.FORBIDDEN)
1599             .entity(ue.getCause().getCause().getMessage()).build();
1600         }
1601         LOG.info("Renew delegation token request failed", ue);
1602         throw ue;
1603       }
1604       LOG.info("Renew delegation token request failed", ue);
1605       throw ue;
1606     } catch (Exception e) {
1607       LOG.info("Renew delegation token request failed", e);
1608       throw e;
1609     }
1610 
1611     return Response.status(Status.OK).build();
1612   }
1613 
extractToken( HttpServletRequest request)1614   private Token<RMDelegationTokenIdentifier> extractToken(
1615       HttpServletRequest request) {
1616     String encodedToken = request.getHeader(DELEGATION_TOKEN_HEADER);
1617     if (encodedToken == null) {
1618       String msg =
1619           "Header '" + DELEGATION_TOKEN_HEADER
1620               + "' containing encoded token not found";
1621       throw new BadRequestException(msg);
1622     }
1623     return extractToken(encodedToken);
1624   }
1625 
extractToken(String encodedToken)1626   private Token<RMDelegationTokenIdentifier> extractToken(String encodedToken) {
1627     Token<RMDelegationTokenIdentifier> token =
1628         new Token<RMDelegationTokenIdentifier>();
1629     try {
1630       token.decodeFromUrlString(encodedToken);
1631     } catch (Exception ie) {
1632       String msg = "Could not decode encoded token";
1633       throw new BadRequestException(msg);
1634     }
1635     return token;
1636   }
1637 }
1638