1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.tools; 20 21 import java.io.BufferedReader; 22 import java.io.DataInput; 23 import java.io.DataOutput; 24 import java.io.FileNotFoundException; 25 import java.io.IOException; 26 import java.io.InputStreamReader; 27 import java.util.ArrayList; 28 import java.util.EnumSet; 29 import java.util.Iterator; 30 import java.util.List; 31 import java.util.Random; 32 import java.util.Stack; 33 import java.util.StringTokenizer; 34 35 import org.apache.commons.logging.Log; 36 import org.apache.commons.logging.LogFactory; 37 import org.apache.hadoop.conf.Configuration; 38 import org.apache.hadoop.fs.FSDataInputStream; 39 import org.apache.hadoop.fs.FSDataOutputStream; 40 import org.apache.hadoop.fs.FileChecksum; 41 import org.apache.hadoop.fs.FileStatus; 42 import org.apache.hadoop.fs.FileSystem; 43 import org.apache.hadoop.fs.FsShell; 44 import org.apache.hadoop.fs.Path; 45 import org.apache.hadoop.fs.permission.FsPermission; 46 import org.apache.hadoop.hdfs.HftpFileSystem; 47 import org.apache.hadoop.hdfs.protocol.QuotaExceededException; 48 import org.apache.hadoop.io.LongWritable; 49 import org.apache.hadoop.io.SequenceFile; 50 import org.apache.hadoop.io.Text; 51 import org.apache.hadoop.io.Writable; 52 import org.apache.hadoop.io.WritableComparable; 53 import org.apache.hadoop.ipc.RemoteException; 54 import org.apache.hadoop.mapred.FileOutputFormat; 55 import org.apache.hadoop.mapred.FileSplit; 56 import org.apache.hadoop.mapred.InputFormat; 57 import org.apache.hadoop.mapred.InputSplit; 58 import org.apache.hadoop.mapred.InvalidInputException; 59 import org.apache.hadoop.mapred.JobClient; 60 import org.apache.hadoop.mapred.JobConf; 61 import org.apache.hadoop.mapred.JobTracker; 62 import org.apache.hadoop.mapred.Mapper; 63 import org.apache.hadoop.mapred.OutputCollector; 64 import org.apache.hadoop.mapred.RecordReader; 65 import org.apache.hadoop.mapred.Reporter; 66 import org.apache.hadoop.mapred.SequenceFileRecordReader; 67 import org.apache.hadoop.mapreduce.JobSubmissionFiles; 68 import org.apache.hadoop.mapreduce.security.TokenCache; 69 import org.apache.hadoop.security.AccessControlException; 70 import org.apache.hadoop.util.StringUtils; 71 import org.apache.hadoop.util.Tool; 72 import org.apache.hadoop.util.ToolRunner; 73 74 /** 75 * A Map-reduce program to recursively copy directories between 76 * different file-systems. 77 */ 78 public class DistCp implements Tool { 79 public static final Log LOG = LogFactory.getLog(DistCp.class); 80 81 private static final String NAME = "distcp"; 82 83 private static final String usage = NAME 84 + " [OPTIONS] <srcurl>* <desturl>" + 85 "\n\nOPTIONS:" + 86 "\n-p[rbugp] Preserve status" + 87 "\n r: replication number" + 88 "\n b: block size" + 89 "\n u: user" + 90 "\n g: group" + 91 "\n p: permission" + 92 "\n -p alone is equivalent to -prbugp" + 93 "\n-i Ignore failures" + 94 "\n-log <logdir> Write logs to <logdir>" + 95 "\n-m <num_maps> Maximum number of simultaneous copies" + 96 "\n-overwrite Overwrite destination" + 97 "\n-update Overwrite if src size different from dst size" + 98 "\n-skipcrccheck Do not use CRC check to determine if src is " + 99 "\n different from dest. Relevant only if -update" + 100 "\n is specified" + 101 "\n-f <urilist_uri> Use list at <urilist_uri> as src list" + 102 "\n-filelimit <n> Limit the total number of files to be <= n" + 103 "\n-sizelimit <n> Limit the total size to be <= n bytes" + 104 "\n-delete Delete the files existing in the dst but not in src" + 105 "\n-mapredSslConf <f> Filename of SSL configuration for mapper task" + 106 107 "\n\nNOTE 1: if -overwrite or -update are set, each source URI is " + 108 "\n interpreted as an isomorphic update to an existing directory." + 109 "\nFor example:" + 110 "\nhadoop " + NAME + " -p -update \"hdfs://A:8020/user/foo/bar\" " + 111 "\"hdfs://B:8020/user/foo/baz\"\n" + 112 "\n would update all descendants of 'baz' also in 'bar'; it would " + 113 "\n *not* update /user/foo/baz/bar" + 114 115 "\n\nNOTE 2: The parameter <n> in -filelimit and -sizelimit can be " + 116 "\n specified with symbolic representation. For examples," + 117 "\n 1230k = 1230 * 1024 = 1259520" + 118 "\n 891g = 891 * 1024^3 = 956703965184" + 119 120 "\n"; 121 122 private static final long BYTES_PER_MAP = 256 * 1024 * 1024; 123 private static final int MAX_MAPS_PER_NODE = 20; 124 private static final int SYNC_FILE_MAX = 10; 125 126 static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED } 127 static enum Options { 128 DELETE("-delete", NAME + ".delete"), 129 FILE_LIMIT("-filelimit", NAME + ".limit.file"), 130 SIZE_LIMIT("-sizelimit", NAME + ".limit.size"), 131 IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"), 132 PRESERVE_STATUS("-p", NAME + ".preserve.status"), 133 OVERWRITE("-overwrite", NAME + ".overwrite.always"), 134 UPDATE("-update", NAME + ".overwrite.ifnewer"), 135 SKIPCRC("-skipcrccheck", NAME + ".skip.crc.check"); 136 137 final String cmd, propertyname; 138 Options(String cmd, String propertyname)139 private Options(String cmd, String propertyname) { 140 this.cmd = cmd; 141 this.propertyname = propertyname; 142 } 143 parseLong(String[] args, int offset)144 private long parseLong(String[] args, int offset) { 145 if (offset == args.length) { 146 throw new IllegalArgumentException("<n> not specified in " + cmd); 147 } 148 long n = StringUtils.TraditionalBinaryPrefix.string2long(args[offset]); 149 if (n <= 0) { 150 throw new IllegalArgumentException("n = " + n + " <= 0 in " + cmd); 151 } 152 return n; 153 } 154 } 155 static enum FileAttribute { 156 BLOCK_SIZE, REPLICATION, USER, GROUP, PERMISSION; 157 158 final char symbol; 159 FileAttribute()160 private FileAttribute() {symbol = toString().toLowerCase().charAt(0);} 161 parse(String s)162 static EnumSet<FileAttribute> parse(String s) { 163 if (s == null || s.length() == 0) { 164 return EnumSet.allOf(FileAttribute.class); 165 } 166 167 EnumSet<FileAttribute> set = EnumSet.noneOf(FileAttribute.class); 168 FileAttribute[] attributes = values(); 169 for(char c : s.toCharArray()) { 170 int i = 0; 171 for(; i < attributes.length && c != attributes[i].symbol; i++); 172 if (i < attributes.length) { 173 if (!set.contains(attributes[i])) { 174 set.add(attributes[i]); 175 } else { 176 throw new IllegalArgumentException("There are more than one '" 177 + attributes[i].symbol + "' in " + s); 178 } 179 } else { 180 throw new IllegalArgumentException("'" + c + "' in " + s 181 + " is undefined."); 182 } 183 } 184 return set; 185 } 186 } 187 188 static final String TMP_DIR_LABEL = NAME + ".tmp.dir"; 189 static final String DST_DIR_LABEL = NAME + ".dest.path"; 190 static final String JOB_DIR_LABEL = NAME + ".job.dir"; 191 static final String MAX_MAPS_LABEL = NAME + ".max.map.tasks"; 192 static final String SRC_LIST_LABEL = NAME + ".src.list"; 193 static final String SRC_COUNT_LABEL = NAME + ".src.count"; 194 static final String TOTAL_SIZE_LABEL = NAME + ".total.size"; 195 static final String DST_DIR_LIST_LABEL = NAME + ".dst.dir.list"; 196 static final String BYTES_PER_MAP_LABEL = NAME + ".bytes.per.map"; 197 static final String PRESERVE_STATUS_LABEL 198 = Options.PRESERVE_STATUS.propertyname + ".value"; 199 200 private JobConf conf; 201 setConf(Configuration conf)202 public void setConf(Configuration conf) { 203 if (conf instanceof JobConf) { 204 this.conf = (JobConf) conf; 205 } else { 206 this.conf = new JobConf(conf); 207 } 208 } 209 getConf()210 public Configuration getConf() { 211 return conf; 212 } 213 DistCp(Configuration conf)214 public DistCp(Configuration conf) { 215 setConf(conf); 216 } 217 218 /** 219 * An input/output pair of filenames. 220 */ 221 static class FilePair implements Writable { 222 FileStatus input = new FileStatus(); 223 String output; FilePair()224 FilePair() { } FilePair(FileStatus input, String output)225 FilePair(FileStatus input, String output) { 226 this.input = input; 227 this.output = output; 228 } readFields(DataInput in)229 public void readFields(DataInput in) throws IOException { 230 input.readFields(in); 231 output = Text.readString(in); 232 } write(DataOutput out)233 public void write(DataOutput out) throws IOException { 234 input.write(out); 235 Text.writeString(out, output); 236 } toString()237 public String toString() { 238 return input + " : " + output; 239 } 240 } 241 242 /** 243 * InputFormat of a distcp job responsible for generating splits of the src 244 * file list. 245 */ 246 static class CopyInputFormat implements InputFormat<Text, Text> { 247 248 /** 249 * Produce splits such that each is no greater than the quotient of the 250 * total size and the number of splits requested. 251 * @param job The handle to the JobConf object 252 * @param numSplits Number of splits requested 253 */ getSplits(JobConf job, int numSplits)254 public InputSplit[] getSplits(JobConf job, int numSplits) 255 throws IOException { 256 int cnfiles = job.getInt(SRC_COUNT_LABEL, -1); 257 long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1); 258 String srcfilelist = job.get(SRC_LIST_LABEL, ""); 259 if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) { 260 throw new RuntimeException("Invalid metadata: #files(" + cnfiles + 261 ") total_size(" + cbsize + ") listuri(" + 262 srcfilelist + ")"); 263 } 264 Path src = new Path(srcfilelist); 265 FileSystem fs = src.getFileSystem(job); 266 FileStatus srcst = fs.getFileStatus(src); 267 268 ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); 269 LongWritable key = new LongWritable(); 270 FilePair value = new FilePair(); 271 final long targetsize = cbsize / numSplits; 272 long pos = 0L; 273 long last = 0L; 274 long acc = 0L; 275 long cbrem = srcst.getLen(); 276 SequenceFile.Reader sl = null; 277 try { 278 sl = new SequenceFile.Reader(fs, src, job); 279 for (; sl.next(key, value); last = sl.getPosition()) { 280 // if adding this split would put this split past the target size, 281 // cut the last split and put this next file in the next split. 282 if (acc + key.get() > targetsize && acc != 0) { 283 long splitsize = last - pos; 284 splits.add(new FileSplit(src, pos, splitsize, (String[])null)); 285 cbrem -= splitsize; 286 pos = last; 287 acc = 0L; 288 } 289 acc += key.get(); 290 } 291 } 292 finally { 293 checkAndClose(sl); 294 } 295 if (cbrem != 0) { 296 splits.add(new FileSplit(src, pos, cbrem, (String[])null)); 297 } 298 299 return splits.toArray(new FileSplit[splits.size()]); 300 } 301 302 /** 303 * Returns a reader for this split of the src file list. 304 */ getRecordReader(InputSplit split, JobConf job, Reporter reporter)305 public RecordReader<Text, Text> getRecordReader(InputSplit split, 306 JobConf job, Reporter reporter) throws IOException { 307 return new SequenceFileRecordReader<Text, Text>(job, (FileSplit)split); 308 } 309 } 310 311 /** 312 * FSCopyFilesMapper: The mapper for copying files between FileSystems. 313 */ 314 static class CopyFilesMapper 315 implements Mapper<LongWritable, FilePair, WritableComparable<?>, Text> { 316 // config 317 private int sizeBuf = 128 * 1024; 318 private FileSystem destFileSys = null; 319 private boolean ignoreReadFailures; 320 private boolean preserve_status; 321 private EnumSet<FileAttribute> preseved; 322 private boolean overwrite; 323 private boolean update; 324 private Path destPath = null; 325 private byte[] buffer = null; 326 private JobConf job; 327 private boolean skipCRCCheck = false; 328 329 // stats 330 private int failcount = 0; 331 private int skipcount = 0; 332 private int copycount = 0; 333 getCountString()334 private String getCountString() { 335 return "Copied: " + copycount + " Skipped: " + skipcount 336 + " Failed: " + failcount; 337 } updateStatus(Reporter reporter)338 private void updateStatus(Reporter reporter) { 339 reporter.setStatus(getCountString()); 340 } 341 342 /** 343 * Return true if dst should be replaced by src and the update flag is set. 344 * Right now, this merely checks that the src and dst len are not equal. 345 * This should be improved on once modification times, CRCs, etc. can 346 * be meaningful in this context. 347 * @throws IOException 348 */ needsUpdate(FileStatus srcstatus, FileSystem dstfs, Path dstpath)349 private boolean needsUpdate(FileStatus srcstatus, 350 FileSystem dstfs, Path dstpath) throws IOException { 351 return update && !sameFile(srcstatus.getPath().getFileSystem(job), 352 srcstatus, dstfs, dstpath, skipCRCCheck); 353 } 354 create(Path f, Reporter reporter, FileStatus srcstat)355 private FSDataOutputStream create(Path f, Reporter reporter, 356 FileStatus srcstat) throws IOException { 357 if (destFileSys.exists(f)) { 358 destFileSys.delete(f, false); 359 } 360 if (!preserve_status) { 361 return destFileSys.create(f, true, sizeBuf, reporter); 362 } 363 364 FsPermission permission = preseved.contains(FileAttribute.PERMISSION)? 365 srcstat.getPermission(): null; 366 short replication = preseved.contains(FileAttribute.REPLICATION)? 367 srcstat.getReplication(): destFileSys.getDefaultReplication(); 368 long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)? 369 srcstat.getBlockSize(): destFileSys.getDefaultBlockSize(); 370 return destFileSys.create(f, permission, true, sizeBuf, replication, 371 blockSize, reporter); 372 } 373 374 /** 375 * Copy a file to a destination. 376 * @param srcstat src path and metadata 377 * @param dstpath dst path 378 * @param reporter 379 */ copy(FileStatus srcstat, Path relativedst, OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter)380 private void copy(FileStatus srcstat, Path relativedst, 381 OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter) 382 throws IOException { 383 Path absdst = new Path(destPath, relativedst); 384 int totfiles = job.getInt(SRC_COUNT_LABEL, -1); 385 assert totfiles >= 0 : "Invalid file count " + totfiles; 386 387 // if a directory, ensure created even if empty 388 if (srcstat.isDir()) { 389 if (destFileSys.exists(absdst)) { 390 if (!destFileSys.getFileStatus(absdst).isDir()) { 391 throw new IOException("Failed to mkdirs: " + absdst+" is a file."); 392 } 393 } 394 else if (!destFileSys.mkdirs(absdst)) { 395 throw new IOException("Failed to mkdirs " + absdst); 396 } 397 // TODO: when modification times can be set, directories should be 398 // emitted to reducers so they might be preserved. Also, mkdirs does 399 // not currently return an error when the directory already exists; 400 // if this changes, all directory work might as well be done in reduce 401 return; 402 } 403 404 if (destFileSys.exists(absdst) && !overwrite 405 && !needsUpdate(srcstat, destFileSys, absdst)) { 406 outc.collect(null, new Text("SKIP: " + srcstat.getPath())); 407 ++skipcount; 408 reporter.incrCounter(Counter.SKIP, 1); 409 updateStatus(reporter); 410 return; 411 } 412 413 Path tmpfile = new Path(job.get(TMP_DIR_LABEL), relativedst); 414 long cbcopied = 0L; 415 FSDataInputStream in = null; 416 FSDataOutputStream out = null; 417 try { 418 // open src file 419 in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath()); 420 reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen()); 421 // open tmp file 422 out = create(tmpfile, reporter, srcstat); 423 // copy file 424 for(int cbread; (cbread = in.read(buffer)) >= 0; ) { 425 out.write(buffer, 0, cbread); 426 cbcopied += cbread; 427 reporter.setStatus( 428 String.format("%.2f ", cbcopied*100.0/srcstat.getLen()) 429 + absdst + " [ " + 430 StringUtils.humanReadableInt(cbcopied) + " / " + 431 StringUtils.humanReadableInt(srcstat.getLen()) + " ]"); 432 } 433 } finally { 434 checkAndClose(in); 435 checkAndClose(out); 436 } 437 438 if (cbcopied != srcstat.getLen()) { 439 throw new IOException("File size not matched: copied " 440 + bytesString(cbcopied) + " to tmpfile (=" + tmpfile 441 + ") but expected " + bytesString(srcstat.getLen()) 442 + " from " + srcstat.getPath()); 443 } 444 else { 445 if (totfiles == 1) { 446 // Copying a single file; use dst path provided by user as destination 447 // rather than destination directory, if a file 448 Path dstparent = absdst.getParent(); 449 if (!(destFileSys.exists(dstparent) && 450 destFileSys.getFileStatus(dstparent).isDir())) { 451 absdst = dstparent; 452 } 453 } 454 if (destFileSys.exists(absdst) && 455 destFileSys.getFileStatus(absdst).isDir()) { 456 throw new IOException(absdst + " is a directory"); 457 } 458 if (!destFileSys.mkdirs(absdst.getParent())) { 459 throw new IOException("Failed to craete parent dir: " + absdst.getParent()); 460 } 461 rename(tmpfile, absdst); 462 463 FileStatus dststat = destFileSys.getFileStatus(absdst); 464 if (dststat.getLen() != srcstat.getLen()) { 465 destFileSys.delete(absdst, false); 466 throw new IOException("File size not matched: copied " 467 + bytesString(dststat.getLen()) + " to dst (=" + absdst 468 + ") but expected " + bytesString(srcstat.getLen()) 469 + " from " + srcstat.getPath()); 470 } 471 updatePermissions(srcstat, dststat); 472 } 473 474 // report at least once for each file 475 ++copycount; 476 reporter.incrCounter(Counter.BYTESCOPIED, cbcopied); 477 reporter.incrCounter(Counter.COPY, 1); 478 updateStatus(reporter); 479 } 480 481 /** rename tmp to dst, delete dst if already exists */ rename(Path tmp, Path dst)482 private void rename(Path tmp, Path dst) throws IOException { 483 try { 484 if (destFileSys.exists(dst)) { 485 destFileSys.delete(dst, true); 486 } 487 if (!destFileSys.rename(tmp, dst)) { 488 throw new IOException(); 489 } 490 } 491 catch(IOException cause) { 492 throw (IOException)new IOException("Fail to rename tmp file (=" + tmp 493 + ") to destination file (=" + dst + ")").initCause(cause); 494 } 495 } 496 updatePermissions(FileStatus src, FileStatus dst )497 private void updatePermissions(FileStatus src, FileStatus dst 498 ) throws IOException { 499 if (preserve_status) { 500 DistCp.updatePermissions(src, dst, preseved, destFileSys); 501 } 502 } 503 bytesString(long b)504 static String bytesString(long b) { 505 return b + " bytes (" + StringUtils.humanReadableInt(b) + ")"; 506 } 507 508 /** Mapper configuration. 509 * Extracts source and destination file system, as well as 510 * top-level paths on source and destination directories. 511 * Gets the named file systems, to be used later in map. 512 */ configure(JobConf job)513 public void configure(JobConf job) 514 { 515 destPath = new Path(job.get(DST_DIR_LABEL, "/")); 516 try { 517 destFileSys = destPath.getFileSystem(job); 518 } catch (IOException ex) { 519 throw new RuntimeException("Unable to get the named file system.", ex); 520 } 521 sizeBuf = job.getInt("copy.buf.size", 128 * 1024); 522 buffer = new byte[sizeBuf]; 523 ignoreReadFailures = job.getBoolean(Options.IGNORE_READ_FAILURES.propertyname, false); 524 preserve_status = job.getBoolean(Options.PRESERVE_STATUS.propertyname, false); 525 if (preserve_status) { 526 preseved = FileAttribute.parse(job.get(PRESERVE_STATUS_LABEL)); 527 } 528 update = job.getBoolean(Options.UPDATE.propertyname, false); 529 overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname, false); 530 skipCRCCheck = job.getBoolean(Options.SKIPCRC.propertyname, false); 531 this.job = job; 532 } 533 534 /** Map method. Copies one file from source file system to destination. 535 * @param key src len 536 * @param value FilePair (FileStatus src, Path dst) 537 * @param out Log of failed copies 538 * @param reporter 539 */ map(LongWritable key, FilePair value, OutputCollector<WritableComparable<?>, Text> out, Reporter reporter)540 public void map(LongWritable key, 541 FilePair value, 542 OutputCollector<WritableComparable<?>, Text> out, 543 Reporter reporter) throws IOException { 544 final FileStatus srcstat = value.input; 545 final Path relativedst = new Path(value.output); 546 try { 547 copy(srcstat, relativedst, out, reporter); 548 } catch (IOException e) { 549 ++failcount; 550 reporter.incrCounter(Counter.FAIL, 1); 551 updateStatus(reporter); 552 final String sfailure = "FAIL " + relativedst + " : " + 553 StringUtils.stringifyException(e); 554 out.collect(null, new Text(sfailure)); 555 LOG.info(sfailure); 556 try { 557 for (int i = 0; i < 3; ++i) { 558 try { 559 final Path tmp = new Path(job.get(TMP_DIR_LABEL), relativedst); 560 if (destFileSys.delete(tmp, true)) 561 break; 562 } catch (Throwable ex) { 563 // ignore, we are just cleaning up 564 LOG.debug("Ignoring cleanup exception", ex); 565 } 566 // update status, so we don't get timed out 567 updateStatus(reporter); 568 Thread.sleep(3 * 1000); 569 } 570 } catch (InterruptedException inte) { 571 throw (IOException)new IOException().initCause(inte); 572 } 573 } finally { 574 updateStatus(reporter); 575 } 576 } 577 close()578 public void close() throws IOException { 579 if (0 == failcount || ignoreReadFailures) { 580 return; 581 } 582 throw new IOException(getCountString()); 583 } 584 } 585 fetchFileList(Configuration conf, Path srcList)586 private static List<Path> fetchFileList(Configuration conf, Path srcList) 587 throws IOException { 588 List<Path> result = new ArrayList<Path>(); 589 FileSystem fs = srcList.getFileSystem(conf); 590 BufferedReader input = null; 591 try { 592 input = new BufferedReader(new InputStreamReader(fs.open(srcList))); 593 String line = input.readLine(); 594 while (line != null) { 595 result.add(new Path(line)); 596 line = input.readLine(); 597 } 598 } finally { 599 checkAndClose(input); 600 } 601 return result; 602 } 603 604 @Deprecated copy(Configuration conf, String srcPath, String destPath, Path logPath, boolean srcAsList, boolean ignoreReadFailures)605 public static void copy(Configuration conf, String srcPath, 606 String destPath, Path logPath, 607 boolean srcAsList, boolean ignoreReadFailures) 608 throws IOException { 609 final Path src = new Path(srcPath); 610 List<Path> tmp = new ArrayList<Path>(); 611 if (srcAsList) { 612 tmp.addAll(fetchFileList(conf, src)); 613 } else { 614 tmp.add(src); 615 } 616 EnumSet<Options> flags = ignoreReadFailures 617 ? EnumSet.of(Options.IGNORE_READ_FAILURES) 618 : EnumSet.noneOf(Options.class); 619 620 final Path dst = new Path(destPath); 621 copy(conf, new Arguments(tmp, dst, logPath, flags, null, 622 Long.MAX_VALUE, Long.MAX_VALUE, null)); 623 } 624 625 /** Sanity check for srcPath */ checkSrcPath(JobConf jobConf, List<Path> srcPaths)626 private static void checkSrcPath(JobConf jobConf, List<Path> srcPaths) 627 throws IOException { 628 List<IOException> rslt = new ArrayList<IOException>(); 629 630 Path[] ps = new Path[srcPaths.size()]; 631 ps = srcPaths.toArray(ps); 632 TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf); 633 634 for (Path p : srcPaths) { 635 FileSystem fs = p.getFileSystem(jobConf); 636 if (!fs.exists(p)) { 637 rslt.add(new IOException("Input source " + p + " does not exist.")); 638 } 639 } 640 if (!rslt.isEmpty()) { 641 throw new InvalidInputException(rslt); 642 } 643 } 644 645 /** 646 * Driver to copy srcPath to destPath depending on required protocol. 647 * @param args arguments 648 */ copy(final Configuration conf, final Arguments args )649 static void copy(final Configuration conf, final Arguments args 650 ) throws IOException { 651 LOG.info("srcPaths=" + args.srcs); 652 LOG.info("destPath=" + args.dst); 653 654 JobConf job = createJobConf(conf); 655 656 checkSrcPath(job, args.srcs); 657 if (args.preservedAttributes != null) { 658 job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes); 659 } 660 if (args.mapredSslConf != null) { 661 job.set("dfs.https.client.keystore.resource", args.mapredSslConf); 662 } 663 664 //Initialize the mapper 665 try { 666 if (setup(conf, job, args)) { 667 JobClient.runJob(job); 668 } 669 finalize(conf, job, args.dst, args.preservedAttributes); 670 } finally { 671 //delete tmp 672 fullyDelete(job.get(TMP_DIR_LABEL), job); 673 //delete jobDirectory 674 fullyDelete(job.get(JOB_DIR_LABEL), job); 675 } 676 } 677 updatePermissions(FileStatus src, FileStatus dst, EnumSet<FileAttribute> preseved, FileSystem destFileSys )678 private static void updatePermissions(FileStatus src, FileStatus dst, 679 EnumSet<FileAttribute> preseved, FileSystem destFileSys 680 ) throws IOException { 681 String owner = null; 682 String group = null; 683 if (preseved.contains(FileAttribute.USER) 684 && !src.getOwner().equals(dst.getOwner())) { 685 owner = src.getOwner(); 686 } 687 if (preseved.contains(FileAttribute.GROUP) 688 && !src.getGroup().equals(dst.getGroup())) { 689 group = src.getGroup(); 690 } 691 if (owner != null || group != null) { 692 destFileSys.setOwner(dst.getPath(), owner, group); 693 } 694 if (preseved.contains(FileAttribute.PERMISSION) 695 && !src.getPermission().equals(dst.getPermission())) { 696 destFileSys.setPermission(dst.getPath(), src.getPermission()); 697 } 698 } 699 finalize(Configuration conf, JobConf jobconf, final Path destPath, String presevedAttributes)700 static private void finalize(Configuration conf, JobConf jobconf, 701 final Path destPath, String presevedAttributes) throws IOException { 702 if (presevedAttributes == null) { 703 return; 704 } 705 EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes); 706 if (!preseved.contains(FileAttribute.USER) 707 && !preseved.contains(FileAttribute.GROUP) 708 && !preseved.contains(FileAttribute.PERMISSION)) { 709 return; 710 } 711 712 FileSystem dstfs = destPath.getFileSystem(conf); 713 Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL)); 714 SequenceFile.Reader in = null; 715 try { 716 in = new SequenceFile.Reader(dstdirlist.getFileSystem(jobconf), 717 dstdirlist, jobconf); 718 Text dsttext = new Text(); 719 FilePair pair = new FilePair(); 720 for(; in.next(dsttext, pair); ) { 721 Path absdst = new Path(destPath, pair.output); 722 updatePermissions(pair.input, dstfs.getFileStatus(absdst), 723 preseved, dstfs); 724 } 725 } finally { 726 checkAndClose(in); 727 } 728 } 729 730 static private class Arguments { 731 final List<Path> srcs; 732 final Path dst; 733 final Path log; 734 final EnumSet<Options> flags; 735 final String preservedAttributes; 736 final long filelimit; 737 final long sizelimit; 738 final String mapredSslConf; 739 740 /** 741 * Arguments for distcp 742 * @param srcs List of source paths 743 * @param dst Destination path 744 * @param log Log output directory 745 * @param flags Command-line flags 746 * @param preservedAttributes Preserved attributes 747 * @param filelimit File limit 748 * @param sizelimit Size limit 749 */ Arguments(List<Path> srcs, Path dst, Path log, EnumSet<Options> flags, String preservedAttributes, long filelimit, long sizelimit, String mapredSslConf)750 Arguments(List<Path> srcs, Path dst, Path log, 751 EnumSet<Options> flags, String preservedAttributes, 752 long filelimit, long sizelimit, String mapredSslConf) { 753 this.srcs = srcs; 754 this.dst = dst; 755 this.log = log; 756 this.flags = flags; 757 this.preservedAttributes = preservedAttributes; 758 this.filelimit = filelimit; 759 this.sizelimit = sizelimit; 760 this.mapredSslConf = mapredSslConf; 761 762 if (LOG.isTraceEnabled()) { 763 LOG.trace("this = " + this); 764 } 765 } 766 valueOf(String[] args, Configuration conf )767 static Arguments valueOf(String[] args, Configuration conf 768 ) throws IOException { 769 List<Path> srcs = new ArrayList<Path>(); 770 Path dst = null; 771 Path log = null; 772 EnumSet<Options> flags = EnumSet.noneOf(Options.class); 773 String presevedAttributes = null; 774 String mapredSslConf = null; 775 long filelimit = Long.MAX_VALUE; 776 long sizelimit = Long.MAX_VALUE; 777 778 for (int idx = 0; idx < args.length; idx++) { 779 Options[] opt = Options.values(); 780 int i = 0; 781 for(; i < opt.length && !args[idx].startsWith(opt[i].cmd); i++); 782 783 if (i < opt.length) { 784 flags.add(opt[i]); 785 if (opt[i] == Options.PRESERVE_STATUS) { 786 presevedAttributes = args[idx].substring(2); 787 FileAttribute.parse(presevedAttributes); //validation 788 } 789 else if (opt[i] == Options.FILE_LIMIT) { 790 filelimit = Options.FILE_LIMIT.parseLong(args, ++idx); 791 } 792 else if (opt[i] == Options.SIZE_LIMIT) { 793 sizelimit = Options.SIZE_LIMIT.parseLong(args, ++idx); 794 } 795 } else if ("-f".equals(args[idx])) { 796 if (++idx == args.length) { 797 throw new IllegalArgumentException("urilist_uri not specified in -f"); 798 } 799 srcs.addAll(fetchFileList(conf, new Path(args[idx]))); 800 } else if ("-log".equals(args[idx])) { 801 if (++idx == args.length) { 802 throw new IllegalArgumentException("logdir not specified in -log"); 803 } 804 log = new Path(args[idx]); 805 } else if ("-mapredSslConf".equals(args[idx])) { 806 if (++idx == args.length) { 807 throw new IllegalArgumentException("ssl conf file not specified in -mapredSslConf"); 808 } 809 mapredSslConf = args[idx]; 810 } else if ("-m".equals(args[idx])) { 811 if (++idx == args.length) { 812 throw new IllegalArgumentException("num_maps not specified in -m"); 813 } 814 try { 815 conf.setInt(MAX_MAPS_LABEL, Integer.valueOf(args[idx])); 816 } catch (NumberFormatException e) { 817 throw new IllegalArgumentException("Invalid argument to -m: " + 818 args[idx]); 819 } 820 } else if ('-' == args[idx].codePointAt(0)) { 821 throw new IllegalArgumentException("Invalid switch " + args[idx]); 822 } else if (idx == args.length -1) { 823 dst = new Path(args[idx]); 824 } else { 825 srcs.add(new Path(args[idx])); 826 } 827 } 828 // mandatory command-line parameters 829 if (srcs.isEmpty() || dst == null) { 830 throw new IllegalArgumentException("Missing " 831 + (dst == null ? "dst path" : "src")); 832 } 833 // incompatible command-line flags 834 final boolean isOverwrite = flags.contains(Options.OVERWRITE); 835 final boolean isUpdate = flags.contains(Options.UPDATE); 836 final boolean isDelete = flags.contains(Options.DELETE); 837 final boolean skipCRC = flags.contains(Options.SKIPCRC); 838 839 if (isOverwrite && isUpdate) { 840 throw new IllegalArgumentException("Conflicting overwrite policies"); 841 } 842 if (isDelete && !isOverwrite && !isUpdate) { 843 throw new IllegalArgumentException(Options.DELETE.cmd 844 + " must be specified with " + Options.OVERWRITE + " or " 845 + Options.UPDATE + "."); 846 } 847 if (!isUpdate && skipCRC) { 848 throw new IllegalArgumentException( 849 Options.SKIPCRC.cmd + " is relevant only with the " + 850 Options.UPDATE.cmd + " option"); 851 } 852 return new Arguments(srcs, dst, log, flags, presevedAttributes, 853 filelimit, sizelimit, mapredSslConf); 854 } 855 856 /** {@inheritDoc} */ toString()857 public String toString() { 858 return getClass().getName() + "{" 859 + "\n srcs = " + srcs 860 + "\n dst = " + dst 861 + "\n log = " + log 862 + "\n flags = " + flags 863 + "\n preservedAttributes = " + preservedAttributes 864 + "\n filelimit = " + filelimit 865 + "\n sizelimit = " + sizelimit 866 + "\n mapredSslConf = " + mapredSslConf 867 + "\n}"; 868 } 869 } 870 871 /** 872 * This is the main driver for recursively copying directories 873 * across file systems. It takes at least two cmdline parameters. A source 874 * URL and a destination URL. It then essentially does an "ls -lR" on the 875 * source URL, and writes the output in a round-robin manner to all the map 876 * input files. The mapper actually copies the files allotted to it. The 877 * reduce is empty. 878 */ run(String[] args)879 public int run(String[] args) { 880 try { 881 copy(conf, Arguments.valueOf(args, conf)); 882 return 0; 883 } catch (IllegalArgumentException e) { 884 System.err.println(StringUtils.stringifyException(e) + "\n" + usage); 885 ToolRunner.printGenericCommandUsage(System.err); 886 return -1; 887 } catch (DuplicationException e) { 888 System.err.println(StringUtils.stringifyException(e)); 889 return DuplicationException.ERROR_CODE; 890 } catch (RemoteException e) { 891 final IOException unwrapped = e.unwrapRemoteException( 892 FileNotFoundException.class, 893 AccessControlException.class, 894 QuotaExceededException.class); 895 System.err.println(StringUtils.stringifyException(unwrapped)); 896 return -3; 897 } catch (Exception e) { 898 System.err.println("With failures, global counters are inaccurate; " + 899 "consider running with -i"); 900 System.err.println("Copy failed: " + StringUtils.stringifyException(e)); 901 return -999; 902 } 903 } 904 main(String[] args)905 public static void main(String[] args) throws Exception { 906 JobConf job = new JobConf(DistCp.class); 907 DistCp distcp = new DistCp(job); 908 int res = ToolRunner.run(distcp, args); 909 System.exit(res); 910 } 911 912 /** 913 * Make a path relative with respect to a root path. 914 * absPath is always assumed to descend from root. 915 * Otherwise returned path is null. 916 */ makeRelative(Path root, Path absPath)917 static String makeRelative(Path root, Path absPath) { 918 if (!absPath.isAbsolute()) { 919 throw new IllegalArgumentException("!absPath.isAbsolute(), absPath=" 920 + absPath); 921 } 922 String p = absPath.toUri().getPath(); 923 924 StringTokenizer pathTokens = new StringTokenizer(p, "/"); 925 for(StringTokenizer rootTokens = new StringTokenizer( 926 root.toUri().getPath(), "/"); rootTokens.hasMoreTokens(); ) { 927 if (!rootTokens.nextToken().equals(pathTokens.nextToken())) { 928 return null; 929 } 930 } 931 StringBuilder sb = new StringBuilder(); 932 for(; pathTokens.hasMoreTokens(); ) { 933 sb.append(pathTokens.nextToken()); 934 if (pathTokens.hasMoreTokens()) { sb.append(Path.SEPARATOR); } 935 } 936 return sb.length() == 0? ".": sb.toString(); 937 } 938 939 /** 940 * Calculate how many maps to run. 941 * Number of maps is bounded by a minimum of the cumulative size of the 942 * copy / (distcp.bytes.per.map, default BYTES_PER_MAP or -m on the 943 * command line) and at most (distcp.max.map.tasks, default 944 * MAX_MAPS_PER_NODE * nodes in the cluster). 945 * @param totalBytes Count of total bytes for job 946 * @param job The job to configure 947 * @return Count of maps to run. 948 */ setMapCount(long totalBytes, JobConf job)949 private static void setMapCount(long totalBytes, JobConf job) 950 throws IOException { 951 int numMaps = 952 (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP)); 953 numMaps = Math.min(numMaps, 954 job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE * 955 new JobClient(job).getClusterStatus().getTaskTrackers())); 956 job.setNumMapTasks(Math.max(numMaps, 1)); 957 } 958 959 /** Fully delete dir */ fullyDelete(String dir, Configuration conf)960 static void fullyDelete(String dir, Configuration conf) throws IOException { 961 if (dir != null) { 962 Path tmp = new Path(dir); 963 tmp.getFileSystem(conf).delete(tmp, true); 964 } 965 } 966 967 //Job configuration createJobConf(Configuration conf)968 private static JobConf createJobConf(Configuration conf) { 969 JobConf jobconf = new JobConf(conf, DistCp.class); 970 jobconf.setJobName(NAME); 971 972 // turn off speculative execution, because DFS doesn't handle 973 // multiple writers to the same file. 974 jobconf.setMapSpeculativeExecution(false); 975 976 jobconf.setInputFormat(CopyInputFormat.class); 977 jobconf.setOutputKeyClass(Text.class); 978 jobconf.setOutputValueClass(Text.class); 979 980 jobconf.setMapperClass(CopyFilesMapper.class); 981 jobconf.setNumReduceTasks(0); 982 return jobconf; 983 } 984 985 private static final Random RANDOM = new Random(); getRandomId()986 public static String getRandomId() { 987 return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36); 988 } 989 990 /** 991 * Initialize DFSCopyFileMapper specific job-configuration. 992 * @param conf : The dfs/mapred configuration. 993 * @param jobConf : The handle to the jobConf object to be initialized. 994 * @param args Arguments 995 * @return true if it is necessary to launch a job. 996 */ setup(Configuration conf, JobConf jobConf, final Arguments args)997 private static boolean setup(Configuration conf, JobConf jobConf, 998 final Arguments args) 999 throws IOException { 1000 jobConf.set(DST_DIR_LABEL, args.dst.toUri().toString()); 1001 1002 //set boolean values 1003 final boolean update = args.flags.contains(Options.UPDATE); 1004 final boolean skipCRCCheck = args.flags.contains(Options.SKIPCRC); 1005 final boolean overwrite = !update && args.flags.contains(Options.OVERWRITE); 1006 jobConf.setBoolean(Options.UPDATE.propertyname, update); 1007 jobConf.setBoolean(Options.SKIPCRC.propertyname, skipCRCCheck); 1008 jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite); 1009 jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname, 1010 args.flags.contains(Options.IGNORE_READ_FAILURES)); 1011 jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname, 1012 args.flags.contains(Options.PRESERVE_STATUS)); 1013 1014 final String randomId = getRandomId(); 1015 JobClient jClient = new JobClient(jobConf); 1016 Path stagingArea; 1017 try { 1018 stagingArea = JobSubmissionFiles.getStagingDir(jClient, conf); 1019 } catch (InterruptedException e) { 1020 throw new IOException(e); 1021 } 1022 1023 Path jobDirectory = new Path(stagingArea + NAME + "_" + randomId); 1024 FsPermission mapredSysPerms = 1025 new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); 1026 // FileSystem.mkdirs(jClient.getFs(), jobDirectory, mapredSysPerms); 1027 FileSystem.mkdirs(FileSystem.get(jobDirectory.toUri(),conf), jobDirectory, mapredSysPerms); 1028 jobConf.set(JOB_DIR_LABEL, jobDirectory.toString()); 1029 1030 FileSystem dstfs = args.dst.getFileSystem(conf); 1031 1032 // get tokens for all the required FileSystems.. 1033 TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), 1034 new Path[] {args.dst}, conf); 1035 1036 boolean dstExists = dstfs.exists(args.dst); 1037 boolean dstIsDir = false; 1038 if (dstExists) { 1039 dstIsDir = dstfs.getFileStatus(args.dst).isDir(); 1040 } 1041 1042 // default logPath 1043 Path logPath = args.log; 1044 if (logPath == null) { 1045 String filename = "_distcp_logs_" + randomId; 1046 if (!dstExists || !dstIsDir) { 1047 Path parent = args.dst.getParent(); 1048 if (!dstfs.exists(parent)) { 1049 dstfs.mkdirs(parent); 1050 } 1051 logPath = new Path(parent, filename); 1052 } else { 1053 logPath = new Path(args.dst, filename); 1054 } 1055 } 1056 FileOutputFormat.setOutputPath(jobConf, logPath); 1057 1058 // create src list, dst list 1059 FileSystem jobfs = jobDirectory.getFileSystem(jobConf); 1060 1061 Path srcfilelist = new Path(jobDirectory, "_distcp_src_files"); 1062 jobConf.set(SRC_LIST_LABEL, srcfilelist.toString()); 1063 SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs, jobConf, 1064 srcfilelist, LongWritable.class, FilePair.class, 1065 SequenceFile.CompressionType.NONE); 1066 1067 Path dstfilelist = new Path(jobDirectory, "_distcp_dst_files"); 1068 SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs, jobConf, 1069 dstfilelist, Text.class, Text.class, 1070 SequenceFile.CompressionType.NONE); 1071 1072 Path dstdirlist = new Path(jobDirectory, "_distcp_dst_dirs"); 1073 jobConf.set(DST_DIR_LIST_LABEL, dstdirlist.toString()); 1074 SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs, jobConf, 1075 dstdirlist, Text.class, FilePair.class, 1076 SequenceFile.CompressionType.NONE); 1077 1078 // handle the case where the destination directory doesn't exist 1079 // and we've only a single src directory OR we're updating/overwriting 1080 // the contents of the destination directory. 1081 final boolean special = 1082 (args.srcs.size() == 1 && !dstExists) || update || overwrite; 1083 int srcCount = 0, cnsyncf = 0, dirsyn = 0; 1084 long fileCount = 0L, byteCount = 0L, cbsyncs = 0L; 1085 try { 1086 for(Iterator<Path> srcItr = args.srcs.iterator(); srcItr.hasNext(); ) { 1087 final Path src = srcItr.next(); 1088 FileSystem srcfs = src.getFileSystem(conf); 1089 FileStatus srcfilestat = srcfs.getFileStatus(src); 1090 Path root = special && srcfilestat.isDir()? src: src.getParent(); 1091 if (srcfilestat.isDir()) { 1092 ++srcCount; 1093 } 1094 1095 Stack<FileStatus> pathstack = new Stack<FileStatus>(); 1096 for(pathstack.push(srcfilestat); !pathstack.empty(); ) { 1097 FileStatus cur = pathstack.pop(); 1098 FileStatus[] children = srcfs.listStatus(cur.getPath()); 1099 for(int i = 0; i < children.length; i++) { 1100 boolean skipfile = false; 1101 final FileStatus child = children[i]; 1102 final String dst = makeRelative(root, child.getPath()); 1103 ++srcCount; 1104 1105 if (child.isDir()) { 1106 pathstack.push(child); 1107 } 1108 else { 1109 //skip file if the src and the dst files are the same. 1110 skipfile = update && 1111 sameFile(srcfs, child, dstfs, 1112 new Path(args.dst, dst), skipCRCCheck); 1113 //skip file if it exceed file limit or size limit 1114 skipfile |= fileCount == args.filelimit 1115 || byteCount + child.getLen() > args.sizelimit; 1116 1117 if (!skipfile) { 1118 ++fileCount; 1119 byteCount += child.getLen(); 1120 1121 if (LOG.isTraceEnabled()) { 1122 LOG.trace("adding file " + child.getPath()); 1123 } 1124 1125 ++cnsyncf; 1126 cbsyncs += child.getLen(); 1127 if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) { 1128 src_writer.sync(); 1129 dst_writer.sync(); 1130 cnsyncf = 0; 1131 cbsyncs = 0L; 1132 } 1133 } 1134 } 1135 1136 if (!skipfile) { 1137 src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()), 1138 new FilePair(child, dst)); 1139 } 1140 1141 dst_writer.append(new Text(dst), 1142 new Text(child.getPath().toString())); 1143 } 1144 1145 if (cur.isDir()) { 1146 String dst = makeRelative(root, cur.getPath()); 1147 dir_writer.append(new Text(dst), new FilePair(cur, dst)); 1148 if (++dirsyn > SYNC_FILE_MAX) { 1149 dirsyn = 0; 1150 dir_writer.sync(); 1151 } 1152 } 1153 } 1154 } 1155 } finally { 1156 checkAndClose(src_writer); 1157 checkAndClose(dst_writer); 1158 checkAndClose(dir_writer); 1159 } 1160 1161 FileStatus dststatus = null; 1162 try { 1163 dststatus = dstfs.getFileStatus(args.dst); 1164 } catch(FileNotFoundException fnfe) { 1165 LOG.info(args.dst + " does not exist."); 1166 } 1167 1168 // create dest path dir if copying > 1 file 1169 if (dststatus == null) { 1170 if (srcCount > 1 && !dstfs.mkdirs(args.dst)) { 1171 throw new IOException("Failed to create" + args.dst); 1172 } 1173 } 1174 1175 final Path sorted = new Path(jobDirectory, "_distcp_sorted"); 1176 checkDuplication(jobfs, dstfilelist, sorted, conf); 1177 1178 if (dststatus != null && args.flags.contains(Options.DELETE)) { 1179 deleteNonexisting(dstfs, dststatus, sorted, 1180 jobfs, jobDirectory, jobConf, conf); 1181 } 1182 1183 Path tmpDir = new Path( 1184 (dstExists && !dstIsDir) || (!dstExists && srcCount == 1)? 1185 args.dst.getParent(): args.dst, "_distcp_tmp_" + randomId); 1186 jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString()); 1187 LOG.info("sourcePathsCount=" + srcCount); 1188 LOG.info("filesToCopyCount=" + fileCount); 1189 LOG.info("bytesToCopyCount=" + StringUtils.humanReadableInt(byteCount)); 1190 jobConf.setInt(SRC_COUNT_LABEL, srcCount); 1191 jobConf.setLong(TOTAL_SIZE_LABEL, byteCount); 1192 setMapCount(byteCount, jobConf); 1193 return fileCount > 0; 1194 } 1195 1196 /** 1197 * Check whether the contents of src and dst are the same. 1198 * 1199 * Return false if dstpath does not exist 1200 * 1201 * If the files have different sizes, return false. 1202 * 1203 * If the files have the same sizes, the file checksums will be compared. 1204 * 1205 * When file checksum is not supported in any of file systems, 1206 * two files are considered as the same if they have the same size. 1207 */ sameFile(FileSystem srcfs, FileStatus srcstatus, FileSystem dstfs, Path dstpath, boolean skipCRCCheck)1208 static private boolean sameFile(FileSystem srcfs, FileStatus srcstatus, 1209 FileSystem dstfs, Path dstpath, boolean skipCRCCheck) throws IOException { 1210 FileStatus dststatus; 1211 try { 1212 dststatus = dstfs.getFileStatus(dstpath); 1213 } catch(FileNotFoundException fnfe) { 1214 return false; 1215 } 1216 1217 //same length? 1218 if (srcstatus.getLen() != dststatus.getLen()) { 1219 return false; 1220 } 1221 1222 if (skipCRCCheck) { 1223 LOG.debug("Skipping CRC Check"); 1224 return true; 1225 } 1226 1227 //get src checksum 1228 final FileChecksum srccs; 1229 try { 1230 srccs = srcfs.getFileChecksum(srcstatus.getPath()); 1231 } catch(FileNotFoundException fnfe) { 1232 /* 1233 * Two possible cases: 1234 * (1) src existed once but was deleted between the time period that 1235 * srcstatus was obtained and the try block above. 1236 * (2) srcfs does not support file checksum and (incorrectly) throws 1237 * FNFE, e.g. some previous versions of HftpFileSystem. 1238 * For case (1), it is okay to return true since src was already deleted. 1239 * For case (2), true should be returned. 1240 */ 1241 return true; 1242 } 1243 1244 //compare checksums 1245 try { 1246 final FileChecksum dstcs = dstfs.getFileChecksum(dststatus.getPath()); 1247 //return true if checksum is not supported 1248 //(i.e. some of the checksums is null) 1249 return srccs == null || dstcs == null || srccs.equals(dstcs); 1250 } catch(FileNotFoundException fnfe) { 1251 return false; 1252 } 1253 } 1254 1255 /** Delete the dst files/dirs which do not exist in src */ deleteNonexisting( FileSystem dstfs, FileStatus dstroot, Path dstsorted, FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf )1256 static private void deleteNonexisting( 1257 FileSystem dstfs, FileStatus dstroot, Path dstsorted, 1258 FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf 1259 ) throws IOException { 1260 if (!dstroot.isDir()) { 1261 throw new IOException("dst must be a directory when option " 1262 + Options.DELETE.cmd + " is set, but dst (= " + dstroot.getPath() 1263 + ") is not a directory."); 1264 } 1265 1266 //write dst lsr results 1267 final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr"); 1268 final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, jobconf, 1269 dstlsr, Text.class, FileStatus.class, 1270 SequenceFile.CompressionType.NONE); 1271 try { 1272 //do lsr to get all file statuses in dstroot 1273 final Stack<FileStatus> lsrstack = new Stack<FileStatus>(); 1274 for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) { 1275 final FileStatus status = lsrstack.pop(); 1276 if (status.isDir()) { 1277 for(FileStatus child : dstfs.listStatus(status.getPath())) { 1278 String relative = makeRelative(dstroot.getPath(), child.getPath()); 1279 writer.append(new Text(relative), child); 1280 lsrstack.push(child); 1281 } 1282 } 1283 } 1284 } finally { 1285 checkAndClose(writer); 1286 } 1287 1288 //sort lsr results 1289 final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted"); 1290 SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs, 1291 new Text.Comparator(), Text.class, FileStatus.class, jobconf); 1292 sorter.sort(dstlsr, sortedlsr); 1293 1294 //compare lsr list and dst list 1295 SequenceFile.Reader lsrin = null; 1296 SequenceFile.Reader dstin = null; 1297 try { 1298 lsrin = new SequenceFile.Reader(jobfs, sortedlsr, jobconf); 1299 dstin = new SequenceFile.Reader(jobfs, dstsorted, jobconf); 1300 1301 //compare sorted lsr list and sorted dst list 1302 final Text lsrpath = new Text(); 1303 final FileStatus lsrstatus = new FileStatus(); 1304 final Text dstpath = new Text(); 1305 final Text dstfrom = new Text(); 1306 final FsShell shell = new FsShell(conf); 1307 final String[] shellargs = {"-rmr", null}; 1308 1309 boolean hasnext = dstin.next(dstpath, dstfrom); 1310 for(; lsrin.next(lsrpath, lsrstatus); ) { 1311 int dst_cmp_lsr = dstpath.compareTo(lsrpath); 1312 for(; hasnext && dst_cmp_lsr < 0; ) { 1313 hasnext = dstin.next(dstpath, dstfrom); 1314 dst_cmp_lsr = dstpath.compareTo(lsrpath); 1315 } 1316 1317 if (dst_cmp_lsr == 0) { 1318 //lsrpath exists in dst, skip it 1319 hasnext = dstin.next(dstpath, dstfrom); 1320 } 1321 else { 1322 //lsrpath does not exist, delete it 1323 String s = new Path(dstroot.getPath(), lsrpath.toString()).toString(); 1324 if (shellargs[1] == null || !isAncestorPath(shellargs[1], s)) { 1325 shellargs[1] = s; 1326 int r = 0; 1327 try { 1328 r = shell.run(shellargs); 1329 } catch(Exception e) { 1330 throw new IOException("Exception from shell.", e); 1331 } 1332 if (r != 0) { 1333 throw new IOException("\"" + shellargs[0] + " " + shellargs[1] 1334 + "\" returns non-zero value " + r); 1335 } 1336 } 1337 } 1338 } 1339 } finally { 1340 checkAndClose(lsrin); 1341 checkAndClose(dstin); 1342 } 1343 } 1344 1345 //is x an ancestor path of y? isAncestorPath(String x, String y)1346 static private boolean isAncestorPath(String x, String y) { 1347 if (!y.startsWith(x)) { 1348 return false; 1349 } 1350 final int len = x.length(); 1351 return y.length() == len || y.charAt(len) == Path.SEPARATOR_CHAR; 1352 } 1353 1354 /** Check whether the file list have duplication. */ checkDuplication(FileSystem fs, Path file, Path sorted, Configuration conf)1355 static private void checkDuplication(FileSystem fs, Path file, Path sorted, 1356 Configuration conf) throws IOException { 1357 SequenceFile.Reader in = null; 1358 try { 1359 SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, 1360 new Text.Comparator(), Text.class, Text.class, conf); 1361 sorter.sort(file, sorted); 1362 in = new SequenceFile.Reader(fs, sorted, conf); 1363 1364 Text prevdst = null, curdst = new Text(); 1365 Text prevsrc = null, cursrc = new Text(); 1366 for(; in.next(curdst, cursrc); ) { 1367 if (prevdst != null && curdst.equals(prevdst)) { 1368 throw new DuplicationException( 1369 "Invalid input, there are duplicated files in the sources: " 1370 + prevsrc + ", " + cursrc); 1371 } 1372 prevdst = curdst; 1373 curdst = new Text(); 1374 prevsrc = cursrc; 1375 cursrc = new Text(); 1376 } 1377 } 1378 finally { 1379 checkAndClose(in); 1380 } 1381 } 1382 checkAndClose(java.io.Closeable io)1383 static boolean checkAndClose(java.io.Closeable io) { 1384 if (io != null) { 1385 try { 1386 io.close(); 1387 } 1388 catch(IOException ioe) { 1389 LOG.warn(StringUtils.stringifyException(ioe)); 1390 return false; 1391 } 1392 } 1393 return true; 1394 } 1395 1396 /** An exception class for duplicated source files. */ 1397 public static class DuplicationException extends IOException { 1398 private static final long serialVersionUID = 1L; 1399 /** Error code for this exception */ 1400 public static final int ERROR_CODE = -2; DuplicationException(String message)1401 DuplicationException(String message) {super(message);} 1402 } 1403 } 1404