1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.v2.util; 20 21 import java.io.File; 22 import java.io.IOException; 23 import java.net.MalformedURLException; 24 import java.net.URI; 25 import java.net.URISyntaxException; 26 import java.security.AccessController; 27 import java.security.PrivilegedActionException; 28 import java.security.PrivilegedExceptionAction; 29 import java.util.ArrayList; 30 import java.util.Arrays; 31 import java.util.HashMap; 32 import java.util.List; 33 import java.util.Map; 34 35 import com.google.common.annotations.VisibleForTesting; 36 37 import org.apache.commons.logging.Log; 38 import org.apache.commons.logging.LogFactory; 39 import org.apache.hadoop.classification.InterfaceAudience.Private; 40 import org.apache.hadoop.classification.InterfaceAudience.Public; 41 import org.apache.hadoop.classification.InterfaceStability.Unstable; 42 import org.apache.hadoop.conf.Configuration; 43 import org.apache.hadoop.fs.FileSystem; 44 import org.apache.hadoop.fs.Path; 45 import org.apache.hadoop.mapred.InvalidJobConfException; 46 import org.apache.hadoop.mapred.JobConf; 47 import org.apache.hadoop.mapred.Task; 48 import org.apache.hadoop.mapred.TaskLog; 49 import org.apache.hadoop.mapreduce.JobID; 50 import org.apache.hadoop.mapreduce.MRConfig; 51 import org.apache.hadoop.mapreduce.MRJobConfig; 52 import org.apache.hadoop.mapreduce.TaskAttemptID; 53 import org.apache.hadoop.mapreduce.TaskID; 54 import org.apache.hadoop.mapreduce.TypeConverter; 55 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 56 import org.apache.hadoop.mapreduce.v2.api.records.JobId; 57 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; 58 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; 59 import org.apache.hadoop.mapreduce.v2.api.records.TaskId; 60 import org.apache.hadoop.mapreduce.v2.api.records.TaskState; 61 import org.apache.hadoop.mapreduce.v2.api.records.TaskType; 62 import org.apache.hadoop.util.ApplicationClassLoader; 63 import org.apache.hadoop.util.StringUtils; 64 import org.apache.hadoop.yarn.ContainerLogAppender; 65 import org.apache.hadoop.yarn.ContainerRollingLogAppender; 66 import org.apache.hadoop.yarn.api.ApplicationConstants; 67 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 68 import org.apache.hadoop.yarn.api.records.LocalResource; 69 import org.apache.hadoop.yarn.api.records.LocalResourceType; 70 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 71 import org.apache.hadoop.yarn.conf.YarnConfiguration; 72 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 73 import org.apache.hadoop.yarn.util.Apps; 74 import org.apache.hadoop.yarn.util.ConverterUtils; 75 76 /** 77 * Helper class for MR applications 78 */ 79 @Private 80 @Unstable 81 public class MRApps extends Apps { 82 public static final Log LOG = LogFactory.getLog(MRApps.class); 83 toString(JobId jid)84 public static String toString(JobId jid) { 85 return jid.toString(); 86 } 87 toJobID(String jid)88 public static JobId toJobID(String jid) { 89 return TypeConverter.toYarn(JobID.forName(jid)); 90 } 91 toString(TaskId tid)92 public static String toString(TaskId tid) { 93 return tid.toString(); 94 } 95 toTaskID(String tid)96 public static TaskId toTaskID(String tid) { 97 return TypeConverter.toYarn(TaskID.forName(tid)); 98 } 99 toString(TaskAttemptId taid)100 public static String toString(TaskAttemptId taid) { 101 return taid.toString(); 102 } 103 toTaskAttemptID(String taid)104 public static TaskAttemptId toTaskAttemptID(String taid) { 105 return TypeConverter.toYarn(TaskAttemptID.forName(taid)); 106 } 107 taskSymbol(TaskType type)108 public static String taskSymbol(TaskType type) { 109 switch (type) { 110 case MAP: return "m"; 111 case REDUCE: return "r"; 112 } 113 throw new YarnRuntimeException("Unknown task type: "+ type.toString()); 114 } 115 116 public static enum TaskAttemptStateUI { 117 NEW( 118 new TaskAttemptState[] { TaskAttemptState.NEW, 119 TaskAttemptState.STARTING }), 120 RUNNING( 121 new TaskAttemptState[] { TaskAttemptState.RUNNING, 122 TaskAttemptState.COMMIT_PENDING }), 123 SUCCESSFUL(new TaskAttemptState[] { TaskAttemptState.SUCCEEDED}), 124 FAILED(new TaskAttemptState[] { TaskAttemptState.FAILED}), 125 KILLED(new TaskAttemptState[] { TaskAttemptState.KILLED}); 126 127 private final List<TaskAttemptState> correspondingStates; 128 TaskAttemptStateUI(TaskAttemptState[] correspondingStates)129 private TaskAttemptStateUI(TaskAttemptState[] correspondingStates) { 130 this.correspondingStates = Arrays.asList(correspondingStates); 131 } 132 correspondsTo(TaskAttemptState state)133 public boolean correspondsTo(TaskAttemptState state) { 134 return this.correspondingStates.contains(state); 135 } 136 } 137 138 public static enum TaskStateUI { 139 RUNNING( 140 new TaskState[]{TaskState.RUNNING}), 141 PENDING(new TaskState[]{TaskState.SCHEDULED}), 142 COMPLETED(new TaskState[]{TaskState.SUCCEEDED, TaskState.FAILED, TaskState.KILLED}); 143 144 private final List<TaskState> correspondingStates; 145 TaskStateUI(TaskState[] correspondingStates)146 private TaskStateUI(TaskState[] correspondingStates) { 147 this.correspondingStates = Arrays.asList(correspondingStates); 148 } 149 correspondsTo(TaskState state)150 public boolean correspondsTo(TaskState state) { 151 return this.correspondingStates.contains(state); 152 } 153 } 154 taskType(String symbol)155 public static TaskType taskType(String symbol) { 156 // JDK 7 supports switch on strings 157 if (symbol.equals("m")) return TaskType.MAP; 158 if (symbol.equals("r")) return TaskType.REDUCE; 159 throw new YarnRuntimeException("Unknown task symbol: "+ symbol); 160 } 161 taskAttemptState(String attemptStateStr)162 public static TaskAttemptStateUI taskAttemptState(String attemptStateStr) { 163 return TaskAttemptStateUI.valueOf(attemptStateStr); 164 } 165 taskState(String taskStateStr)166 public static TaskStateUI taskState(String taskStateStr) { 167 return TaskStateUI.valueOf(taskStateStr); 168 } 169 170 // gets the base name of the MapReduce framework or null if no 171 // framework was configured getMRFrameworkName(Configuration conf)172 private static String getMRFrameworkName(Configuration conf) { 173 String frameworkName = null; 174 String framework = 175 conf.get(MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, ""); 176 if (!framework.isEmpty()) { 177 URI uri; 178 try { 179 uri = new URI(framework); 180 } catch (URISyntaxException e) { 181 throw new IllegalArgumentException("Unable to parse '" + framework 182 + "' as a URI, check the setting for " 183 + MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, e); 184 } 185 186 frameworkName = uri.getFragment(); 187 if (frameworkName == null) { 188 frameworkName = new Path(uri).getName(); 189 } 190 } 191 return frameworkName; 192 } 193 setMRFrameworkClasspath( Map<String, String> environment, Configuration conf)194 private static void setMRFrameworkClasspath( 195 Map<String, String> environment, Configuration conf) throws IOException { 196 // Propagate the system classpath when using the mini cluster 197 if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { 198 MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(), 199 System.getProperty("java.class.path"), conf); 200 } 201 boolean crossPlatform = 202 conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, 203 MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM); 204 205 // if the framework is specified then only use the MR classpath 206 String frameworkName = getMRFrameworkName(conf); 207 if (frameworkName == null) { 208 // Add standard Hadoop classes 209 for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, 210 crossPlatform 211 ? YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH 212 : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { 213 MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(), 214 c.trim(), conf); 215 } 216 } 217 218 boolean foundFrameworkInClasspath = (frameworkName == null); 219 for (String c : conf.getStrings(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, 220 crossPlatform ? 221 StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH) 222 : StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH))) { 223 MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(), 224 c.trim(), conf); 225 if (!foundFrameworkInClasspath) { 226 foundFrameworkInClasspath = c.contains(frameworkName); 227 } 228 } 229 230 if (!foundFrameworkInClasspath) { 231 throw new IllegalArgumentException( 232 "Could not locate MapReduce framework name '" + frameworkName 233 + "' in " + MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH); 234 } 235 // TODO: Remove duplicates. 236 } 237 238 @SuppressWarnings("deprecation") setClasspath(Map<String, String> environment, Configuration conf)239 public static void setClasspath(Map<String, String> environment, 240 Configuration conf) throws IOException { 241 boolean userClassesTakesPrecedence = 242 conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false); 243 244 String classpathEnvVar = 245 conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false) 246 ? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name(); 247 248 String hadoopClasspathEnvVar = Environment.HADOOP_CLASSPATH.name(); 249 250 MRApps.addToEnvironment(environment, 251 classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf); 252 253 MRApps.addToEnvironment(environment, 254 hadoopClasspathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), 255 conf); 256 257 if (!userClassesTakesPrecedence) { 258 MRApps.setMRFrameworkClasspath(environment, conf); 259 } 260 261 addClasspathToEnv(environment, classpathEnvVar, conf); 262 addClasspathToEnv(environment, hadoopClasspathEnvVar, conf); 263 264 if (userClassesTakesPrecedence) { 265 MRApps.setMRFrameworkClasspath(environment, conf); 266 } 267 } 268 269 @SuppressWarnings("deprecation") addClasspathToEnv(Map<String, String> environment, String classpathEnvVar, Configuration conf)270 public static void addClasspathToEnv(Map<String, String> environment, 271 String classpathEnvVar, Configuration conf) throws IOException { 272 MRApps.addToEnvironment( 273 environment, 274 classpathEnvVar, 275 MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf); 276 MRApps.addToEnvironment( 277 environment, 278 classpathEnvVar, 279 MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, 280 conf); 281 282 MRApps.addToEnvironment( 283 environment, 284 classpathEnvVar, 285 MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", 286 conf); 287 288 MRApps.addToEnvironment( 289 environment, 290 classpathEnvVar, 291 crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", 292 conf); 293 294 // a * in the classpath will only find a .jar, so we need to filter out 295 // all .jars and add everything else 296 addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf), 297 DistributedCache.getCacheFiles(conf), 298 conf, 299 environment, classpathEnvVar); 300 addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf), 301 DistributedCache.getCacheArchives(conf), 302 conf, 303 environment, classpathEnvVar); 304 } 305 306 /** 307 * Add the paths to the classpath if they are not jars 308 * @param paths the paths to add to the classpath 309 * @param withLinks the corresponding paths that may have a link name in them 310 * @param conf used to resolve the paths 311 * @param environment the environment to update CLASSPATH in 312 * @throws IOException if there is an error resolving any of the paths. 313 */ addToClasspathIfNotJar(Path[] paths, URI[] withLinks, Configuration conf, Map<String, String> environment, String classpathEnvVar)314 private static void addToClasspathIfNotJar(Path[] paths, 315 URI[] withLinks, Configuration conf, 316 Map<String, String> environment, 317 String classpathEnvVar) throws IOException { 318 if (paths != null) { 319 HashMap<Path, String> linkLookup = new HashMap<Path, String>(); 320 if (withLinks != null) { 321 for (URI u: withLinks) { 322 Path p = new Path(u); 323 FileSystem remoteFS = p.getFileSystem(conf); 324 p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), 325 remoteFS.getWorkingDirectory())); 326 String name = (null == u.getFragment()) 327 ? p.getName() : u.getFragment(); 328 if (!StringUtils.toLowerCase(name).endsWith(".jar")) { 329 linkLookup.put(p, name); 330 } 331 } 332 } 333 334 for (Path p : paths) { 335 FileSystem remoteFS = p.getFileSystem(conf); 336 p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), 337 remoteFS.getWorkingDirectory())); 338 String name = linkLookup.get(p); 339 if (name == null) { 340 name = p.getName(); 341 } 342 if(!StringUtils.toLowerCase(name).endsWith(".jar")) { 343 MRApps.addToEnvironment( 344 environment, 345 classpathEnvVar, 346 crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + name, conf); 347 } 348 } 349 } 350 } 351 352 /** 353 * Creates and sets a {@link ApplicationClassLoader} on the given 354 * configuration and as the thread context classloader, if 355 * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and 356 * the APP_CLASSPATH environment variable is set. 357 * @param conf 358 * @throws IOException 359 */ setJobClassLoader(Configuration conf)360 public static void setJobClassLoader(Configuration conf) 361 throws IOException { 362 setClassLoader(createJobClassLoader(conf), conf); 363 } 364 365 /** 366 * Creates a {@link ApplicationClassLoader} if 367 * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and 368 * the APP_CLASSPATH environment variable is set. 369 * @param conf 370 * @return the created job classloader, or null if the job classloader is not 371 * enabled or the APP_CLASSPATH environment variable is not set 372 * @throws IOException 373 */ createJobClassLoader(Configuration conf)374 public static ClassLoader createJobClassLoader(Configuration conf) 375 throws IOException { 376 ClassLoader jobClassLoader = null; 377 if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) { 378 String appClasspath = System.getenv(Environment.APP_CLASSPATH.key()); 379 if (appClasspath == null) { 380 LOG.warn("Not creating job classloader since APP_CLASSPATH is not set."); 381 } else { 382 LOG.info("Creating job classloader"); 383 if (LOG.isDebugEnabled()) { 384 LOG.debug("APP_CLASSPATH=" + appClasspath); 385 } 386 String[] systemClasses = getSystemClasses(conf); 387 jobClassLoader = createJobClassLoader(appClasspath, 388 systemClasses); 389 } 390 } 391 return jobClassLoader; 392 } 393 394 /** 395 * Sets the provided classloader on the given configuration and as the thread 396 * context classloader if the classloader is not null. 397 * @param classLoader 398 * @param conf 399 */ setClassLoader(ClassLoader classLoader, Configuration conf)400 public static void setClassLoader(ClassLoader classLoader, 401 Configuration conf) { 402 if (classLoader != null) { 403 LOG.info("Setting classloader " + classLoader.getClass().getName() + 404 " on the configuration and as the thread context classloader"); 405 conf.setClassLoader(classLoader); 406 Thread.currentThread().setContextClassLoader(classLoader); 407 } 408 } 409 410 @VisibleForTesting getSystemClasses(Configuration conf)411 static String[] getSystemClasses(Configuration conf) { 412 return conf.getTrimmedStrings( 413 MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES); 414 } 415 createJobClassLoader(final String appClasspath, final String[] systemClasses)416 private static ClassLoader createJobClassLoader(final String appClasspath, 417 final String[] systemClasses) throws IOException { 418 try { 419 return AccessController.doPrivileged( 420 new PrivilegedExceptionAction<ClassLoader>() { 421 @Override 422 public ClassLoader run() throws MalformedURLException { 423 return new ApplicationClassLoader(appClasspath, 424 MRApps.class.getClassLoader(), Arrays.asList(systemClasses)); 425 } 426 }); 427 } catch (PrivilegedActionException e) { 428 Throwable t = e.getCause(); 429 if (t instanceof MalformedURLException) { 430 throw (MalformedURLException) t; 431 } 432 throw new IOException(e); 433 } 434 } 435 436 private static final String STAGING_CONSTANT = ".staging"; 437 public static Path getStagingAreaDir(Configuration conf, String user) { 438 return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, 439 MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) 440 + Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT); 441 } 442 443 public static String getJobFile(Configuration conf, String user, 444 org.apache.hadoop.mapreduce.JobID jobId) { 445 Path jobFile = new Path(MRApps.getStagingAreaDir(conf, user), 446 jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE); 447 return jobFile.toString(); 448 } 449 450 public static Path getEndJobCommitSuccessFile(Configuration conf, String user, 451 JobId jobId) { 452 Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user), 453 jobId.toString() + Path.SEPARATOR + "COMMIT_SUCCESS"); 454 return endCommitFile; 455 } 456 457 public static Path getEndJobCommitFailureFile(Configuration conf, String user, 458 JobId jobId) { 459 Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user), 460 jobId.toString() + Path.SEPARATOR + "COMMIT_FAIL"); 461 return endCommitFile; 462 } 463 464 public static Path getStartJobCommitFile(Configuration conf, String user, 465 JobId jobId) { 466 Path startCommitFile = new Path(MRApps.getStagingAreaDir(conf, user), 467 jobId.toString() + Path.SEPARATOR + "COMMIT_STARTED"); 468 return startCommitFile; 469 } 470 471 @SuppressWarnings("deprecation") 472 public static void setupDistributedCache( 473 Configuration conf, 474 Map<String, LocalResource> localResources) 475 throws IOException { 476 477 // Cache archives 478 parseDistributedCacheArtifacts(conf, localResources, 479 LocalResourceType.ARCHIVE, 480 DistributedCache.getCacheArchives(conf), 481 DistributedCache.getArchiveTimestamps(conf), 482 getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 483 DistributedCache.getArchiveVisibilities(conf)); 484 485 // Cache files 486 parseDistributedCacheArtifacts(conf, 487 localResources, 488 LocalResourceType.FILE, 489 DistributedCache.getCacheFiles(conf), 490 DistributedCache.getFileTimestamps(conf), 491 getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), 492 DistributedCache.getFileVisibilities(conf)); 493 } 494 495 /** 496 * Set up the DistributedCache related configs to make 497 * {@link DistributedCache#getLocalCacheFiles(Configuration)} 498 * and 499 * {@link DistributedCache#getLocalCacheArchives(Configuration)} 500 * working. 501 * @param conf 502 * @throws java.io.IOException 503 */ 504 @SuppressWarnings("deprecation") 505 public static void setupDistributedCacheLocal(Configuration conf) 506 throws IOException { 507 508 String localWorkDir = System.getenv("PWD"); 509 // ^ ^ all symlinks are created in the current work-dir 510 511 // Update the configuration object with localized archives. 512 URI[] cacheArchives = DistributedCache.getCacheArchives(conf); 513 if (cacheArchives != null) { 514 List<String> localArchives = new ArrayList<String>(); 515 for (int i = 0; i < cacheArchives.length; ++i) { 516 URI u = cacheArchives[i]; 517 Path p = new Path(u); 518 Path name = 519 new Path((null == u.getFragment()) ? p.getName() 520 : u.getFragment()); 521 String linkName = name.toUri().getPath(); 522 localArchives.add(new Path(localWorkDir, linkName).toUri().getPath()); 523 } 524 if (!localArchives.isEmpty()) { 525 conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils 526 .arrayToString(localArchives.toArray(new String[localArchives 527 .size()]))); 528 } 529 } 530 531 // Update the configuration object with localized files. 532 URI[] cacheFiles = DistributedCache.getCacheFiles(conf); 533 if (cacheFiles != null) { 534 List<String> localFiles = new ArrayList<String>(); 535 for (int i = 0; i < cacheFiles.length; ++i) { 536 URI u = cacheFiles[i]; 537 Path p = new Path(u); 538 Path name = 539 new Path((null == u.getFragment()) ? p.getName() 540 : u.getFragment()); 541 String linkName = name.toUri().getPath(); 542 localFiles.add(new Path(localWorkDir, linkName).toUri().getPath()); 543 } 544 if (!localFiles.isEmpty()) { 545 conf.set(MRJobConfig.CACHE_LOCALFILES, 546 StringUtils.arrayToString(localFiles 547 .toArray(new String[localFiles.size()]))); 548 } 549 } 550 } 551 552 private static String getResourceDescription(LocalResourceType type) { 553 if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) { 554 return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") "; 555 } 556 return "cache file (" + MRJobConfig.CACHE_FILES + ") "; 557 } 558 559 private static String toString(org.apache.hadoop.yarn.api.records.URL url) { 560 StringBuffer b = new StringBuffer(); 561 b.append(url.getScheme()).append("://").append(url.getHost()); 562 if(url.getPort() >= 0) { 563 b.append(":").append(url.getPort()); 564 } 565 b.append(url.getFile()); 566 return b.toString(); 567 } 568 569 // TODO - Move this to MR! 570 // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], 571 // long[], boolean[], Path[], FileType) 572 @SuppressWarnings("deprecation") 573 private static void parseDistributedCacheArtifacts( 574 Configuration conf, 575 Map<String, LocalResource> localResources, 576 LocalResourceType type, 577 URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[]) 578 throws IOException { 579 580 if (uris != null) { 581 // Sanity check 582 if ((uris.length != timestamps.length) || (uris.length != sizes.length) || 583 (uris.length != visibilities.length)) { 584 throw new IllegalArgumentException("Invalid specification for " + 585 "distributed-cache artifacts of type " + type + " :" + 586 " #uris=" + uris.length + 587 " #timestamps=" + timestamps.length + 588 " #visibilities=" + visibilities.length 589 ); 590 } 591 592 for (int i = 0; i < uris.length; ++i) { 593 URI u = uris[i]; 594 Path p = new Path(u); 595 FileSystem remoteFS = p.getFileSystem(conf); 596 p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), 597 remoteFS.getWorkingDirectory())); 598 // Add URI fragment or just the filename 599 Path name = new Path((null == u.getFragment()) 600 ? p.getName() 601 : u.getFragment()); 602 if (name.isAbsolute()) { 603 throw new IllegalArgumentException("Resource name must be relative"); 604 } 605 String linkName = name.toUri().getPath(); 606 LocalResource orig = localResources.get(linkName); 607 org.apache.hadoop.yarn.api.records.URL url = 608 ConverterUtils.getYarnUrlFromURI(p.toUri()); 609 if(orig != null && !orig.getResource().equals(url)) { 610 LOG.warn( 611 getResourceDescription(orig.getType()) + 612 toString(orig.getResource()) + " conflicts with " + 613 getResourceDescription(type) + toString(url) + 614 " This will be an error in Hadoop 2.0"); 615 continue; 616 } 617 localResources.put(linkName, LocalResource.newInstance(ConverterUtils 618 .getYarnUrlFromURI(p.toUri()), type, visibilities[i] 619 ? LocalResourceVisibility.PUBLIC : LocalResourceVisibility.PRIVATE, 620 sizes[i], timestamps[i])); 621 } 622 } 623 } 624 625 // TODO - Move this to MR! 626 private static long[] getFileSizes(Configuration conf, String key) { 627 String[] strs = conf.getStrings(key); 628 if (strs == null) { 629 return null; 630 } 631 long[] result = new long[strs.length]; 632 for(int i=0; i < strs.length; ++i) { 633 result[i] = Long.parseLong(strs[i]); 634 } 635 return result; 636 } 637 638 public static String getChildLogLevel(Configuration conf, boolean isMap) { 639 if (isMap) { 640 return conf.get( 641 MRJobConfig.MAP_LOG_LEVEL, 642 JobConf.DEFAULT_LOG_LEVEL.toString() 643 ); 644 } else { 645 return conf.get( 646 MRJobConfig.REDUCE_LOG_LEVEL, 647 JobConf.DEFAULT_LOG_LEVEL.toString() 648 ); 649 } 650 } 651 652 /** 653 * Add the JVM system properties necessary to configure 654 * {@link ContainerLogAppender} or 655 * {@link ContainerRollingLogAppender}. 656 * 657 * @param task for map/reduce, or null for app master 658 * @param vargs the argument list to append to 659 * @param conf configuration of MR job 660 */ 661 public static void addLog4jSystemProperties(Task task, 662 List<String> vargs, Configuration conf) { 663 String log4jPropertyFile = 664 conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, ""); 665 if (log4jPropertyFile.isEmpty()) { 666 vargs.add("-Dlog4j.configuration=container-log4j.properties"); 667 } else { 668 URI log4jURI = null; 669 try { 670 log4jURI = new URI(log4jPropertyFile); 671 } catch (URISyntaxException e) { 672 throw new IllegalArgumentException(e); 673 } 674 Path log4jPath = new Path(log4jURI); 675 vargs.add("-Dlog4j.configuration="+log4jPath.getName()); 676 } 677 678 long logSize; 679 String logLevel; 680 int numBackups; 681 682 if (task == null) { 683 logSize = conf.getLong(MRJobConfig.MR_AM_LOG_KB, 684 MRJobConfig.DEFAULT_MR_AM_LOG_KB) << 10; 685 logLevel = conf.get( 686 MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL); 687 numBackups = conf.getInt(MRJobConfig.MR_AM_LOG_BACKUPS, 688 MRJobConfig.DEFAULT_MR_AM_LOG_BACKUPS); 689 } else { 690 logSize = TaskLog.getTaskLogLimitBytes(conf); 691 logLevel = getChildLogLevel(conf, task.isMapTask()); 692 numBackups = conf.getInt(MRJobConfig.TASK_LOG_BACKUPS, 693 MRJobConfig.DEFAULT_TASK_LOG_BACKUPS); 694 } 695 696 vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + 697 ApplicationConstants.LOG_DIR_EXPANSION_VAR); 698 vargs.add( 699 "-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + logSize); 700 701 if (logSize > 0L && numBackups > 0) { 702 // log should be rolled 703 vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_BACKUPS + "=" 704 + numBackups); 705 vargs.add("-Dhadoop.root.logger=" + logLevel + ",CRLA"); 706 } else { 707 vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA"); 708 } 709 vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG); 710 711 if ( task != null 712 && !task.isMapTask() 713 && conf.getBoolean(MRJobConfig.REDUCE_SEPARATE_SHUFFLE_LOG, 714 MRJobConfig.DEFAULT_REDUCE_SEPARATE_SHUFFLE_LOG)) { 715 final int numShuffleBackups = conf.getInt(MRJobConfig.SHUFFLE_LOG_BACKUPS, 716 MRJobConfig.DEFAULT_SHUFFLE_LOG_BACKUPS); 717 final long shuffleLogSize = conf.getLong(MRJobConfig.SHUFFLE_LOG_KB, 718 MRJobConfig.DEFAULT_SHUFFLE_LOG_KB) << 10; 719 final String shuffleLogger = logLevel 720 + (shuffleLogSize > 0L && numShuffleBackups > 0 721 ? ",shuffleCRLA" 722 : ",shuffleCLA"); 723 724 vargs.add("-D" + MRJobConfig.MR_PREFIX 725 + "shuffle.logger=" + shuffleLogger); 726 vargs.add("-D" + MRJobConfig.MR_PREFIX 727 + "shuffle.logfile=" + TaskLog.LogName.SYSLOG + ".shuffle"); 728 vargs.add("-D" + MRJobConfig.MR_PREFIX 729 + "shuffle.log.filesize=" + shuffleLogSize); 730 vargs.add("-D" + MRJobConfig.MR_PREFIX 731 + "shuffle.log.backups=" + numShuffleBackups); 732 } 733 } 734 735 public static void setEnvFromInputString(Map<String, String> env, 736 String envString, Configuration conf) { 737 String classPathSeparator = 738 conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, 739 MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM) 740 ? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator; 741 Apps.setEnvFromInputString(env, envString, classPathSeparator); 742 } 743 744 @Public 745 @Unstable 746 public static void addToEnvironment(Map<String, String> environment, 747 String variable, String value, Configuration conf) { 748 String classPathSeparator = 749 conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, 750 MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM) 751 ? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator; 752 Apps.addToEnvironment(environment, variable, value, classPathSeparator); 753 } 754 755 public static String crossPlatformifyMREnv(Configuration conf, Environment env) { 756 boolean crossPlatform = 757 conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, 758 MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM); 759 return crossPlatform ? env.$$() : env.$(); 760 } 761 } 762