1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.tools; 19 20 import java.io.DataInput; 21 import java.io.DataOutput; 22 import java.io.IOException; 23 import java.util.ArrayList; 24 import java.util.List; 25 import java.util.Stack; 26 27 import org.apache.hadoop.conf.Configuration; 28 import org.apache.hadoop.fs.FileStatus; 29 import org.apache.hadoop.fs.FileSystem; 30 import org.apache.hadoop.fs.Path; 31 import org.apache.hadoop.fs.permission.FsPermission; 32 import org.apache.hadoop.io.SequenceFile; 33 import org.apache.hadoop.io.Text; 34 import org.apache.hadoop.io.Writable; 35 import org.apache.hadoop.io.WritableComparable; 36 import org.apache.hadoop.mapred.FileOutputFormat; 37 import org.apache.hadoop.mapred.FileSplit; 38 import org.apache.hadoop.mapred.InputFormat; 39 import org.apache.hadoop.mapred.InputSplit; 40 import org.apache.hadoop.mapred.InvalidInputException; 41 import org.apache.hadoop.mapred.JobClient; 42 import org.apache.hadoop.mapred.JobConf; 43 import org.apache.hadoop.mapred.Mapper; 44 import org.apache.hadoop.mapred.OutputCollector; 45 import org.apache.hadoop.mapred.RecordReader; 46 import org.apache.hadoop.mapred.Reporter; 47 import org.apache.hadoop.mapred.SequenceFileRecordReader; 48 import org.apache.hadoop.mapreduce.JobSubmissionFiles; 49 import org.apache.hadoop.util.StringUtils; 50 import org.apache.hadoop.util.ToolRunner; 51 52 /** 53 * A Map-reduce program to recursively change files properties 54 * such as owner, group and permission. 55 */ 56 public class DistCh extends DistTool { 57 static final String NAME = "distch"; 58 static final String JOB_DIR_LABEL = NAME + ".job.dir"; 59 static final String OP_LIST_LABEL = NAME + ".op.list"; 60 static final String OP_COUNT_LABEL = NAME + ".op.count"; 61 62 static final String USAGE = "java " + DistCh.class.getName() 63 + " [OPTIONS] <path:owner:group:permission>+ " 64 65 + "\n\nThe values of owner, group and permission can be empty." 66 + "\nPermission is a octal number." 67 68 + "\n\nOPTIONS:" 69 + "\n-f <urilist_uri> Use list at <urilist_uri> as src list" 70 + "\n-i Ignore failures" 71 + "\n-log <logdir> Write logs to <logdir>" 72 ; 73 74 private static final long OP_PER_MAP = 1000; 75 private static final int MAX_MAPS_PER_NODE = 20; 76 private static final int SYNC_FILE_MAX = 10; 77 78 static enum Counter { SUCCEED, FAIL } 79 80 static enum Option { 81 IGNORE_FAILURES("-i", NAME + ".ignore.failures"); 82 83 final String cmd, propertyname; 84 Option(String cmd, String propertyname)85 private Option(String cmd, String propertyname) { 86 this.cmd = cmd; 87 this.propertyname = propertyname; 88 } 89 } 90 DistCh(Configuration conf)91 DistCh(Configuration conf) { 92 super(createJobConf(conf)); 93 } 94 createJobConf(Configuration conf)95 private static JobConf createJobConf(Configuration conf) { 96 JobConf jobconf = new JobConf(conf, DistCh.class); 97 jobconf.setJobName(NAME); 98 jobconf.setMapSpeculativeExecution(false); 99 100 jobconf.setInputFormat(ChangeInputFormat.class); 101 jobconf.setOutputKeyClass(Text.class); 102 jobconf.setOutputValueClass(Text.class); 103 104 jobconf.setMapperClass(ChangeFilesMapper.class); 105 jobconf.setNumReduceTasks(0); 106 return jobconf; 107 } 108 109 /** File operations. */ 110 static class FileOperation implements Writable { 111 private Path src; 112 private String owner; 113 private String group; 114 private FsPermission permission; 115 FileOperation()116 FileOperation() {} 117 FileOperation(Path src, FileOperation that)118 FileOperation(Path src, FileOperation that) { 119 this.src = src; 120 this.owner = that.owner; 121 this.group = that.group; 122 this.permission = that.permission; 123 checkState(); 124 } 125 126 /** 127 * path:owner:group:permission 128 * e.g. 129 * /user/foo:foo:bar:700 130 */ FileOperation(String line)131 FileOperation(String line) { 132 try { 133 String[] t = line.split(":", 4); 134 for(int i = 0; i < t.length; i++) { 135 if ("".equals(t[i])) { 136 t[i] = null; 137 } 138 } 139 140 src = new Path(t[0]); 141 owner = t[1]; 142 group = t[2]; 143 permission = t[3] == null? null: 144 new FsPermission(Short.parseShort(t[3], 8)); 145 146 checkState(); 147 } 148 catch(Exception e) { 149 throw (IllegalArgumentException)new IllegalArgumentException( 150 "line=" + line).initCause(e); 151 } 152 } 153 checkState()154 private void checkState() throws IllegalStateException { 155 if (owner == null && group == null && permission == null) { 156 throw new IllegalStateException( 157 "owner == null && group == null && permission == null"); 158 } 159 } 160 161 static final FsPermission FILE_UMASK 162 = FsPermission.createImmutable((short)0111); 163 isDifferent(FileStatus original)164 private boolean isDifferent(FileStatus original) { 165 if (owner != null && !owner.equals(original.getOwner())) { 166 return true; 167 } 168 if (group != null && !group.equals(original.getGroup())) { 169 return true; 170 } 171 if (permission != null) { 172 FsPermission orig = original.getPermission(); 173 return original.isDirectory()? !permission.equals(orig): 174 !permission.applyUMask(FILE_UMASK).equals(orig); 175 } 176 return false; 177 } 178 run(Configuration conf)179 void run(Configuration conf) throws IOException { 180 FileSystem fs = src.getFileSystem(conf); 181 if (permission != null) { 182 fs.setPermission(src, permission); 183 } 184 if (owner != null || group != null) { 185 fs.setOwner(src, owner, group); 186 } 187 } 188 189 /** {@inheritDoc} */ readFields(DataInput in)190 public void readFields(DataInput in) throws IOException { 191 this.src = new Path(Text.readString(in)); 192 owner = DistTool.readString(in); 193 group = DistTool.readString(in); 194 permission = in.readBoolean()? FsPermission.read(in): null; 195 } 196 197 /** {@inheritDoc} */ write(DataOutput out)198 public void write(DataOutput out) throws IOException { 199 Text.writeString(out, src.toString()); 200 DistTool.writeString(out, owner); 201 DistTool.writeString(out, group); 202 203 boolean b = permission != null; 204 out.writeBoolean(b); 205 if (b) {permission.write(out);} 206 } 207 208 /** {@inheritDoc} */ toString()209 public String toString() { 210 return src + ":" + owner + ":" + group + ":" + permission; 211 } 212 } 213 214 /** Responsible for generating splits of the src file list. */ 215 static class ChangeInputFormat implements InputFormat<Text, FileOperation> { 216 /** Do nothing. */ validateInput(JobConf job)217 public void validateInput(JobConf job) {} 218 219 /** 220 * Produce splits such that each is no greater than the quotient of the 221 * total size and the number of splits requested. 222 * @param job The handle to the JobConf object 223 * @param numSplits Number of splits requested 224 */ getSplits(JobConf job, int numSplits )225 public InputSplit[] getSplits(JobConf job, int numSplits 226 ) throws IOException { 227 final int srcCount = job.getInt(OP_COUNT_LABEL, -1); 228 final int targetcount = srcCount / numSplits; 229 String srclist = job.get(OP_LIST_LABEL, ""); 230 if (srcCount < 0 || "".equals(srclist)) { 231 throw new RuntimeException("Invalid metadata: #files(" + srcCount + 232 ") listuri(" + srclist + ")"); 233 } 234 Path srcs = new Path(srclist); 235 FileSystem fs = srcs.getFileSystem(job); 236 237 List<FileSplit> splits = new ArrayList<FileSplit>(numSplits); 238 239 Text key = new Text(); 240 FileOperation value = new FileOperation(); 241 long prev = 0L; 242 int count = 0; //count src 243 try (SequenceFile.Reader in = new SequenceFile.Reader(fs, srcs, job)) { 244 for ( ; in.next(key, value); ) { 245 long curr = in.getPosition(); 246 long delta = curr - prev; 247 if (++count > targetcount) { 248 count = 0; 249 splits.add(new FileSplit(srcs, prev, delta, (String[])null)); 250 prev = curr; 251 } 252 } 253 } 254 long remaining = fs.getFileStatus(srcs).getLen() - prev; 255 if (remaining != 0) { 256 splits.add(new FileSplit(srcs, prev, remaining, (String[])null)); 257 } 258 LOG.info("numSplits=" + numSplits + ", splits.size()=" + splits.size()); 259 return splits.toArray(new FileSplit[splits.size()]); 260 } 261 262 /** {@inheritDoc} */ getRecordReader(InputSplit split, JobConf job, Reporter reporter)263 public RecordReader<Text, FileOperation> getRecordReader(InputSplit split, 264 JobConf job, Reporter reporter) throws IOException { 265 return new SequenceFileRecordReader<Text, FileOperation>(job, 266 (FileSplit)split); 267 } 268 } 269 270 /** The mapper for changing files. */ 271 static class ChangeFilesMapper 272 implements Mapper<Text, FileOperation, WritableComparable<?>, Text> { 273 private JobConf jobconf; 274 private boolean ignoreFailures; 275 276 private int failcount = 0; 277 private int succeedcount = 0; 278 getCountString()279 private String getCountString() { 280 return "Succeeded: " + succeedcount + " Failed: " + failcount; 281 } 282 283 /** {@inheritDoc} */ configure(JobConf job)284 public void configure(JobConf job) { 285 this.jobconf = job; 286 ignoreFailures=job.getBoolean(Option.IGNORE_FAILURES.propertyname,false); 287 } 288 289 /** Run a FileOperation */ map(Text key, FileOperation value, OutputCollector<WritableComparable<?>, Text> out, Reporter reporter )290 public void map(Text key, FileOperation value, 291 OutputCollector<WritableComparable<?>, Text> out, Reporter reporter 292 ) throws IOException { 293 try { 294 value.run(jobconf); 295 ++succeedcount; 296 reporter.incrCounter(Counter.SUCCEED, 1); 297 } catch (IOException e) { 298 ++failcount; 299 reporter.incrCounter(Counter.FAIL, 1); 300 301 String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e); 302 out.collect(null, new Text(s)); 303 LOG.info(s); 304 } finally { 305 reporter.setStatus(getCountString()); 306 } 307 } 308 309 /** {@inheritDoc} */ close()310 public void close() throws IOException { 311 if (failcount == 0 || ignoreFailures) { 312 return; 313 } 314 throw new IOException(getCountString()); 315 } 316 } 317 check(Configuration conf, List<FileOperation> ops )318 private static void check(Configuration conf, List<FileOperation> ops 319 ) throws InvalidInputException { 320 List<Path> srcs = new ArrayList<Path>(); 321 for(FileOperation op : ops) { 322 srcs.add(op.src); 323 } 324 DistTool.checkSource(conf, srcs); 325 } 326 fetchList(Configuration conf, Path inputfile )327 private static List<FileOperation> fetchList(Configuration conf, Path inputfile 328 ) throws IOException { 329 List<FileOperation> result = new ArrayList<FileOperation>(); 330 for(String line : readFile(conf, inputfile)) { 331 result.add(new FileOperation(line)); 332 } 333 return result; 334 } 335 336 /** This is the main driver for recursively changing files properties. */ run(String[] args)337 public int run(String[] args) throws Exception { 338 List<FileOperation> ops = new ArrayList<FileOperation>(); 339 Path logpath = null; 340 boolean isIgnoreFailures = false; 341 342 try { 343 for (int idx = 0; idx < args.length; idx++) { 344 if ("-f".equals(args[idx])) { 345 if (++idx == args.length) { 346 System.out.println("urilist_uri not specified"); 347 System.out.println(USAGE); 348 return -1; 349 } 350 ops.addAll(fetchList(jobconf, new Path(args[idx]))); 351 } else if (Option.IGNORE_FAILURES.cmd.equals(args[idx])) { 352 isIgnoreFailures = true; 353 } else if ("-log".equals(args[idx])) { 354 if (++idx == args.length) { 355 System.out.println("logdir not specified"); 356 System.out.println(USAGE); 357 return -1; 358 } 359 logpath = new Path(args[idx]); 360 } else if ('-' == args[idx].codePointAt(0)) { 361 System.out.println("Invalid switch " + args[idx]); 362 System.out.println(USAGE); 363 ToolRunner.printGenericCommandUsage(System.out); 364 return -1; 365 } else { 366 ops.add(new FileOperation(args[idx])); 367 } 368 } 369 // mandatory command-line parameters 370 if (ops.isEmpty()) { 371 throw new IllegalStateException("Operation is empty"); 372 } 373 LOG.info("ops=" + ops); 374 LOG.info("isIgnoreFailures=" + isIgnoreFailures); 375 jobconf.setBoolean(Option.IGNORE_FAILURES.propertyname, isIgnoreFailures); 376 check(jobconf, ops); 377 378 try { 379 if (setup(ops, logpath)) { 380 JobClient.runJob(jobconf); 381 } 382 } finally { 383 try { 384 if (logpath == null) { 385 //delete log directory 386 final Path logdir = FileOutputFormat.getOutputPath(jobconf); 387 if (logdir != null) { 388 logdir.getFileSystem(jobconf).delete(logdir, true); 389 } 390 } 391 } 392 finally { 393 //delete job directory 394 final String jobdir = jobconf.get(JOB_DIR_LABEL); 395 if (jobdir != null) { 396 final Path jobpath = new Path(jobdir); 397 jobpath.getFileSystem(jobconf).delete(jobpath, true); 398 } 399 } 400 } 401 } catch(DuplicationException e) { 402 LOG.error("Input error:", e); 403 return DuplicationException.ERROR_CODE; 404 } catch(Exception e) { 405 LOG.error(NAME + " failed: ", e); 406 System.out.println(USAGE); 407 ToolRunner.printGenericCommandUsage(System.out); 408 return -1; 409 } 410 return 0; 411 } 412 413 /** Calculate how many maps to run. */ getMapCount(int srcCount, int numNodes)414 private static int getMapCount(int srcCount, int numNodes) { 415 int numMaps = (int)(srcCount / OP_PER_MAP); 416 numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE); 417 return Math.max(numMaps, 1); 418 } 419 setup(List<FileOperation> ops, Path log)420 private boolean setup(List<FileOperation> ops, Path log) 421 throws IOException { 422 final String randomId = getRandomId(); 423 JobClient jClient = new JobClient(jobconf); 424 Path stagingArea; 425 try { 426 stagingArea = JobSubmissionFiles.getStagingDir( 427 jClient.getClusterHandle(), jobconf); 428 } catch (InterruptedException ie){ 429 throw new IOException(ie); 430 } 431 Path jobdir = new Path(stagingArea + NAME + "_" + randomId); 432 FsPermission mapredSysPerms = 433 new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); 434 FileSystem.mkdirs(jClient.getFs(), jobdir, mapredSysPerms); 435 LOG.info(JOB_DIR_LABEL + "=" + jobdir); 436 437 if (log == null) { 438 log = new Path(jobdir, "_logs"); 439 } 440 FileOutputFormat.setOutputPath(jobconf, log); 441 LOG.info("log=" + log); 442 443 //create operation list 444 FileSystem fs = jobdir.getFileSystem(jobconf); 445 Path opList = new Path(jobdir, "_" + OP_LIST_LABEL); 446 jobconf.set(OP_LIST_LABEL, opList.toString()); 447 int opCount = 0, synCount = 0; 448 try (SequenceFile.Writer opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class, 449 FileOperation.class, SequenceFile.CompressionType.NONE)) { 450 for(FileOperation op : ops) { 451 FileStatus srcstat = fs.getFileStatus(op.src); 452 if (srcstat.isDirectory() && op.isDifferent(srcstat)) { 453 ++opCount; 454 opWriter.append(new Text(op.src.toString()), op); 455 } 456 457 Stack<Path> pathstack = new Stack<Path>(); 458 for(pathstack.push(op.src); !pathstack.empty(); ) { 459 for(FileStatus stat : fs.listStatus(pathstack.pop())) { 460 if (stat.isDirectory()) { 461 pathstack.push(stat.getPath()); 462 } 463 464 if (op.isDifferent(stat)) { 465 ++opCount; 466 if (++synCount > SYNC_FILE_MAX) { 467 opWriter.sync(); 468 synCount = 0; 469 } 470 Path f = stat.getPath(); 471 opWriter.append(new Text(f.toString()), new FileOperation(f, op)); 472 } 473 } 474 } 475 } 476 } 477 478 checkDuplication(fs, opList, new Path(jobdir, "_sorted"), jobconf); 479 jobconf.setInt(OP_COUNT_LABEL, opCount); 480 LOG.info(OP_COUNT_LABEL + "=" + opCount); 481 jobconf.setNumMapTasks(getMapCount(opCount, 482 new JobClient(jobconf).getClusterStatus().getTaskTrackers())); 483 return opCount != 0; 484 } 485 checkDuplication(FileSystem fs, Path file, Path sorted, Configuration conf)486 private static void checkDuplication(FileSystem fs, Path file, Path sorted, 487 Configuration conf) throws IOException { 488 SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, 489 new Text.Comparator(), Text.class, FileOperation.class, conf); 490 sorter.sort(file, sorted); 491 try (SequenceFile.Reader in = new SequenceFile.Reader(fs, sorted, conf)) { 492 FileOperation curop = new FileOperation(); 493 Text prevsrc = null, cursrc = new Text(); 494 for(; in.next(cursrc, curop); ) { 495 if (prevsrc != null && cursrc.equals(prevsrc)) { 496 throw new DuplicationException( 497 "Invalid input, there are duplicated files in the sources: " 498 + prevsrc + ", " + cursrc); 499 } 500 prevsrc = cursrc; 501 cursrc = new Text(); 502 curop = new FileOperation(); 503 } 504 } 505 } 506 main(String[] args)507 public static void main(String[] args) throws Exception { 508 System.exit(ToolRunner.run(new DistCh(new Configuration()), args)); 509 } 510 }