1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.tools; 20 21 import java.io.BufferedReader; 22 import java.io.DataInput; 23 import java.io.DataOutput; 24 import java.io.FileNotFoundException; 25 import java.io.IOException; 26 import java.io.InputStreamReader; 27 import java.nio.charset.Charset; 28 import java.util.ArrayList; 29 import java.util.EnumSet; 30 import java.util.HashSet; 31 import java.util.Iterator; 32 import java.util.LinkedList; 33 import java.util.List; 34 import java.util.Random; 35 import java.util.Stack; 36 import java.util.StringTokenizer; 37 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 import org.apache.hadoop.conf.Configuration; 41 import org.apache.hadoop.fs.FSDataInputStream; 42 import org.apache.hadoop.fs.FSDataOutputStream; 43 import org.apache.hadoop.fs.FileAlreadyExistsException; 44 import org.apache.hadoop.fs.FileChecksum; 45 import org.apache.hadoop.fs.FileStatus; 46 import org.apache.hadoop.fs.FileSystem; 47 import org.apache.hadoop.fs.Path; 48 import org.apache.hadoop.fs.Trash; 49 import org.apache.hadoop.fs.permission.FsPermission; 50 import org.apache.hadoop.hdfs.protocol.QuotaExceededException; 51 import org.apache.hadoop.io.LongWritable; 52 import org.apache.hadoop.io.NullWritable; 53 import org.apache.hadoop.io.SequenceFile; 54 import org.apache.hadoop.io.SequenceFile.Reader; 55 import org.apache.hadoop.io.Text; 56 import org.apache.hadoop.io.Writable; 57 import org.apache.hadoop.io.WritableComparable; 58 import org.apache.hadoop.io.SequenceFile.Writer; 59 import org.apache.hadoop.ipc.RemoteException; 60 import org.apache.hadoop.mapred.FileOutputFormat; 61 import org.apache.hadoop.mapred.FileSplit; 62 import org.apache.hadoop.mapred.InputFormat; 63 import org.apache.hadoop.mapred.InputSplit; 64 import org.apache.hadoop.mapred.InvalidInputException; 65 import org.apache.hadoop.mapred.JobClient; 66 import org.apache.hadoop.mapred.JobConf; 67 import org.apache.hadoop.mapred.Mapper; 68 import org.apache.hadoop.mapred.OutputCollector; 69 import org.apache.hadoop.mapred.RecordReader; 70 import org.apache.hadoop.mapred.Reporter; 71 import org.apache.hadoop.mapred.SequenceFileRecordReader; 72 import org.apache.hadoop.mapreduce.JobSubmissionFiles; 73 import org.apache.hadoop.mapreduce.security.TokenCache; 74 import org.apache.hadoop.security.AccessControlException; 75 import org.apache.hadoop.util.StringUtils; 76 import org.apache.hadoop.util.Tool; 77 import org.apache.hadoop.util.ToolRunner; 78 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; 79 80 /** 81 * A Map-reduce program to recursively copy directories between 82 * different file-systems. 83 */ 84 public class DistCpV1 implements Tool { 85 public static final Log LOG = LogFactory.getLog(DistCpV1.class); 86 87 private static final String NAME = "distcp"; 88 89 private static final String usage = NAME 90 + " [OPTIONS] <srcurl>* <desturl>" + 91 "\n\nOPTIONS:" + 92 "\n-p[rbugpt] Preserve status" + 93 "\n r: replication number" + 94 "\n b: block size" + 95 "\n u: user" + 96 "\n g: group" + 97 "\n p: permission" + 98 "\n t: modification and access times" + 99 "\n -p alone is equivalent to -prbugpt" + 100 "\n-i Ignore failures" + 101 "\n-basedir <basedir> Use <basedir> as the base directory when copying files from <srcurl>" + 102 "\n-log <logdir> Write logs to <logdir>" + 103 "\n-m <num_maps> Maximum number of simultaneous copies" + 104 "\n-overwrite Overwrite destination" + 105 "\n-update Overwrite if src size different from dst size" + 106 "\n-skipcrccheck Do not use CRC check to determine if src is " + 107 "\n different from dest. Relevant only if -update" + 108 "\n is specified" + 109 "\n-f <urilist_uri> Use list at <urilist_uri> as src list" + 110 "\n-filelimit <n> Limit the total number of files to be <= n" + 111 "\n-sizelimit <n> Limit the total size to be <= n bytes" + 112 "\n-delete Delete the files existing in the dst but not in src" + 113 "\n-dryrun Display count of files and total size of files" + 114 "\n in src and then exit. Copy is not done at all." + 115 "\n desturl should not be speicified with out -update." + 116 "\n-mapredSslConf <f> Filename of SSL configuration for mapper task" + 117 118 "\n\nNOTE 1: if -overwrite or -update are set, each source URI is " + 119 "\n interpreted as an isomorphic update to an existing directory." + 120 "\nFor example:" + 121 "\nhadoop " + NAME + " -p -update \"hdfs://A:8020/user/foo/bar\" " + 122 "\"hdfs://B:8020/user/foo/baz\"\n" + 123 "\n would update all descendants of 'baz' also in 'bar'; it would " + 124 "\n *not* update /user/foo/baz/bar" + 125 126 "\n\nNOTE 2: The parameter <n> in -filelimit and -sizelimit can be " + 127 "\n specified with symbolic representation. For examples," + 128 "\n 1230k = 1230 * 1024 = 1259520" + 129 "\n 891g = 891 * 1024^3 = 956703965184" + 130 131 "\n"; 132 133 private static final long BYTES_PER_MAP = 256 * 1024 * 1024; 134 private static final int MAX_MAPS_PER_NODE = 20; 135 private static final int SYNC_FILE_MAX = 10; 136 private static final int DEFAULT_FILE_RETRIES = 3; 137 138 static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED } 139 static enum Options { 140 DELETE("-delete", NAME + ".delete"), 141 FILE_LIMIT("-filelimit", NAME + ".limit.file"), 142 SIZE_LIMIT("-sizelimit", NAME + ".limit.size"), 143 IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"), 144 PRESERVE_STATUS("-p", NAME + ".preserve.status"), 145 OVERWRITE("-overwrite", NAME + ".overwrite.always"), 146 UPDATE("-update", NAME + ".overwrite.ifnewer"), 147 SKIPCRC("-skipcrccheck", NAME + ".skip.crc.check"); 148 149 final String cmd, propertyname; 150 Options(String cmd, String propertyname)151 private Options(String cmd, String propertyname) { 152 this.cmd = cmd; 153 this.propertyname = propertyname; 154 } 155 parseLong(String[] args, int offset)156 private long parseLong(String[] args, int offset) { 157 if (offset == args.length) { 158 throw new IllegalArgumentException("<n> not specified in " + cmd); 159 } 160 long n = StringUtils.TraditionalBinaryPrefix.string2long(args[offset]); 161 if (n <= 0) { 162 throw new IllegalArgumentException("n = " + n + " <= 0 in " + cmd); 163 } 164 return n; 165 } 166 } 167 static enum FileAttribute { 168 BLOCK_SIZE, REPLICATION, USER, GROUP, PERMISSION, TIMES; 169 170 final char symbol; 171 FileAttribute()172 private FileAttribute() { 173 symbol = StringUtils.toLowerCase(toString()).charAt(0); 174 } 175 parse(String s)176 static EnumSet<FileAttribute> parse(String s) { 177 if (s == null || s.length() == 0) { 178 return EnumSet.allOf(FileAttribute.class); 179 } 180 181 EnumSet<FileAttribute> set = EnumSet.noneOf(FileAttribute.class); 182 FileAttribute[] attributes = values(); 183 for(char c : s.toCharArray()) { 184 int i = 0; 185 for(; i < attributes.length && c != attributes[i].symbol; i++); 186 if (i < attributes.length) { 187 if (!set.contains(attributes[i])) { 188 set.add(attributes[i]); 189 } else { 190 throw new IllegalArgumentException("There are more than one '" 191 + attributes[i].symbol + "' in " + s); 192 } 193 } else { 194 throw new IllegalArgumentException("'" + c + "' in " + s 195 + " is undefined."); 196 } 197 } 198 return set; 199 } 200 } 201 202 static final String TMP_DIR_LABEL = NAME + ".tmp.dir"; 203 static final String DST_DIR_LABEL = NAME + ".dest.path"; 204 static final String JOB_DIR_LABEL = NAME + ".job.dir"; 205 static final String MAX_MAPS_LABEL = NAME + ".max.map.tasks"; 206 static final String SRC_LIST_LABEL = NAME + ".src.list"; 207 static final String SRC_COUNT_LABEL = NAME + ".src.count"; 208 static final String TOTAL_SIZE_LABEL = NAME + ".total.size"; 209 static final String DST_DIR_LIST_LABEL = NAME + ".dst.dir.list"; 210 static final String BYTES_PER_MAP_LABEL = NAME + ".bytes.per.map"; 211 static final String PRESERVE_STATUS_LABEL 212 = Options.PRESERVE_STATUS.propertyname + ".value"; 213 static final String FILE_RETRIES_LABEL = NAME + ".file.retries"; 214 215 private JobConf conf; 216 setConf(Configuration conf)217 public void setConf(Configuration conf) { 218 if (conf instanceof JobConf) { 219 this.conf = (JobConf) conf; 220 } else { 221 this.conf = new JobConf(conf); 222 } 223 } 224 getConf()225 public Configuration getConf() { 226 return conf; 227 } 228 DistCpV1(Configuration conf)229 public DistCpV1(Configuration conf) { 230 setConf(conf); 231 } 232 233 /** 234 * An input/output pair of filenames. 235 */ 236 static class FilePair implements Writable { 237 FileStatus input = new FileStatus(); 238 String output; FilePair()239 FilePair() { } FilePair(FileStatus input, String output)240 FilePair(FileStatus input, String output) { 241 this.input = input; 242 this.output = output; 243 } readFields(DataInput in)244 public void readFields(DataInput in) throws IOException { 245 input.readFields(in); 246 output = Text.readString(in); 247 } write(DataOutput out)248 public void write(DataOutput out) throws IOException { 249 input.write(out); 250 Text.writeString(out, output); 251 } toString()252 public String toString() { 253 return input + " : " + output; 254 } 255 } 256 257 /** 258 * InputFormat of a distcp job responsible for generating splits of the src 259 * file list. 260 */ 261 static class CopyInputFormat implements InputFormat<Text, Text> { 262 263 /** 264 * Produce splits such that each is no greater than the quotient of the 265 * total size and the number of splits requested. 266 * @param job The handle to the JobConf object 267 * @param numSplits Number of splits requested 268 */ getSplits(JobConf job, int numSplits)269 public InputSplit[] getSplits(JobConf job, int numSplits) 270 throws IOException { 271 int cnfiles = job.getInt(SRC_COUNT_LABEL, -1); 272 long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1); 273 String srcfilelist = job.get(SRC_LIST_LABEL, ""); 274 if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) { 275 throw new RuntimeException("Invalid metadata: #files(" + cnfiles + 276 ") total_size(" + cbsize + ") listuri(" + 277 srcfilelist + ")"); 278 } 279 Path src = new Path(srcfilelist); 280 FileSystem fs = src.getFileSystem(job); 281 FileStatus srcst = fs.getFileStatus(src); 282 283 ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); 284 LongWritable key = new LongWritable(); 285 FilePair value = new FilePair(); 286 final long targetsize = cbsize / numSplits; 287 long pos = 0L; 288 long last = 0L; 289 long acc = 0L; 290 long cbrem = srcst.getLen(); 291 try (SequenceFile.Reader sl = 292 new SequenceFile.Reader(job, Reader.file(src))) { 293 for (; sl.next(key, value); last = sl.getPosition()) { 294 // if adding this split would put this split past the target size, 295 // cut the last split and put this next file in the next split. 296 if (acc + key.get() > targetsize && acc != 0) { 297 long splitsize = last - pos; 298 splits.add(new FileSplit(src, pos, splitsize, (String[])null)); 299 cbrem -= splitsize; 300 pos = last; 301 acc = 0L; 302 } 303 acc += key.get(); 304 } 305 } 306 if (cbrem != 0) { 307 splits.add(new FileSplit(src, pos, cbrem, (String[])null)); 308 } 309 310 return splits.toArray(new FileSplit[splits.size()]); 311 } 312 313 /** 314 * Returns a reader for this split of the src file list. 315 */ getRecordReader(InputSplit split, JobConf job, Reporter reporter)316 public RecordReader<Text, Text> getRecordReader(InputSplit split, 317 JobConf job, Reporter reporter) throws IOException { 318 return new SequenceFileRecordReader<Text, Text>(job, (FileSplit)split); 319 } 320 } 321 322 /** 323 * FSCopyFilesMapper: The mapper for copying files between FileSystems. 324 */ 325 static class CopyFilesMapper 326 implements Mapper<LongWritable, FilePair, WritableComparable<?>, Text> { 327 // config 328 private int sizeBuf = 128 * 1024; 329 private FileSystem destFileSys = null; 330 private boolean ignoreReadFailures; 331 private boolean preserve_status; 332 private EnumSet<FileAttribute> preseved; 333 private boolean overwrite; 334 private boolean update; 335 private Path destPath = null; 336 private byte[] buffer = null; 337 private JobConf job; 338 private boolean skipCRCCheck = false; 339 340 // stats 341 private int failcount = 0; 342 private int skipcount = 0; 343 private int copycount = 0; 344 getCountString()345 private String getCountString() { 346 return "Copied: " + copycount + " Skipped: " + skipcount 347 + " Failed: " + failcount; 348 } updateStatus(Reporter reporter)349 private void updateStatus(Reporter reporter) { 350 reporter.setStatus(getCountString()); 351 } 352 353 /** 354 * Return true if dst should be replaced by src and the update flag is set. 355 * Right now, this merely checks that the src and dst len are not equal. 356 * This should be improved on once modification times, CRCs, etc. can 357 * be meaningful in this context. 358 * @throws IOException 359 */ needsUpdate(FileStatus srcstatus, FileSystem dstfs, Path dstpath)360 private boolean needsUpdate(FileStatus srcstatus, 361 FileSystem dstfs, Path dstpath) throws IOException { 362 return update && !sameFile(srcstatus.getPath().getFileSystem(job), 363 srcstatus, dstfs, dstpath, skipCRCCheck); 364 } 365 create(Path f, Reporter reporter, FileStatus srcstat)366 private FSDataOutputStream create(Path f, Reporter reporter, 367 FileStatus srcstat) throws IOException { 368 if (destFileSys.exists(f)) { 369 destFileSys.delete(f, false); 370 } 371 if (!preserve_status) { 372 return destFileSys.create(f, true, sizeBuf, reporter); 373 } 374 375 FsPermission permission = preseved.contains(FileAttribute.PERMISSION)? 376 srcstat.getPermission(): null; 377 short replication = preseved.contains(FileAttribute.REPLICATION)? 378 srcstat.getReplication(): destFileSys.getDefaultReplication(f); 379 long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)? 380 srcstat.getBlockSize(): destFileSys.getDefaultBlockSize(f); 381 return destFileSys.create(f, permission, true, sizeBuf, replication, 382 blockSize, reporter); 383 } 384 385 /** 386 * Validates copy by checking the sizes of files first and then 387 * checksums, if the filesystems support checksums. 388 * @param srcstat src path and metadata 389 * @param absdst dst path 390 * @return true if src & destination files are same 391 */ validateCopy(FileStatus srcstat, Path absdst)392 private boolean validateCopy(FileStatus srcstat, Path absdst) 393 throws IOException { 394 if (destFileSys.exists(absdst)) { 395 if (sameFile(srcstat.getPath().getFileSystem(job), srcstat, 396 destFileSys, absdst, skipCRCCheck)) { 397 return true; 398 } 399 } 400 return false; 401 } 402 403 /** 404 * Increment number of files copied and bytes copied and then report status 405 */ updateCopyStatus(FileStatus srcstat, Reporter reporter)406 void updateCopyStatus(FileStatus srcstat, Reporter reporter) { 407 copycount++; 408 reporter.incrCounter(Counter.BYTESCOPIED, srcstat.getLen()); 409 reporter.incrCounter(Counter.COPY, 1); 410 updateStatus(reporter); 411 } 412 413 /** 414 * Skip copying this file if already exists at the destination. 415 * Updates counters and copy status if skipping this file. 416 * @return true if copy of this file can be skipped 417 */ skipCopyFile(FileStatus srcstat, Path absdst, OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter)418 private boolean skipCopyFile(FileStatus srcstat, Path absdst, 419 OutputCollector<WritableComparable<?>, Text> outc, 420 Reporter reporter) throws IOException { 421 if (destFileSys.exists(absdst) && !overwrite 422 && !needsUpdate(srcstat, destFileSys, absdst)) { 423 outc.collect(null, new Text("SKIP: " + srcstat.getPath())); 424 ++skipcount; 425 reporter.incrCounter(Counter.SKIP, 1); 426 updateStatus(reporter); 427 return true; 428 } 429 return false; 430 } 431 432 /** 433 * Copies single file to the path specified by tmpfile. 434 * @param srcstat src path and metadata 435 * @param tmpfile temporary file to which copy is to be done 436 * @param absdst actual destination path to which copy is to be done 437 * @param reporter 438 * @return Number of bytes copied 439 */ doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst, Reporter reporter)440 private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst, 441 Reporter reporter) throws IOException { 442 long bytesCopied = 0L; 443 Path srcPath = srcstat.getPath(); 444 // open src file 445 try (FSDataInputStream in = srcPath.getFileSystem(job).open(srcPath)) { 446 reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen()); 447 // open tmp file 448 try (FSDataOutputStream out = create(tmpfile, reporter, srcstat)) { 449 LOG.info("Copying file " + srcPath + " of size " + 450 srcstat.getLen() + " bytes..."); 451 452 // copy file 453 for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) { 454 out.write(buffer, 0, bytesRead); 455 bytesCopied += bytesRead; 456 reporter.setStatus( 457 String.format("%.2f ", bytesCopied*100.0/srcstat.getLen()) 458 + absdst + " [ " + 459 TraditionalBinaryPrefix.long2String(bytesCopied, "", 1) + " / " 460 + TraditionalBinaryPrefix.long2String(srcstat.getLen(), "", 1) 461 + " ]"); 462 } 463 } 464 } 465 return bytesCopied; 466 } 467 468 /** 469 * Copy a file to a destination. 470 * @param srcstat src path and metadata 471 * @param relativedst relative dst path 472 * @param outc Log of skipped files 473 * @param reporter 474 * @throws IOException if copy fails(even if the validation of copy fails) 475 */ copy(FileStatus srcstat, Path relativedst, OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter)476 private void copy(FileStatus srcstat, Path relativedst, 477 OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter) 478 throws IOException { 479 Path absdst = new Path(destPath, relativedst); 480 int totfiles = job.getInt(SRC_COUNT_LABEL, -1); 481 assert totfiles >= 0 : "Invalid file count " + totfiles; 482 483 if (totfiles == 1) { 484 // Copying a single file; use dst path provided by user as 485 // destination file rather than destination directory 486 Path dstparent = absdst.getParent(); 487 if (!(destFileSys.exists(dstparent) && 488 destFileSys.getFileStatus(dstparent).isDirectory())) { 489 absdst = dstparent; 490 } 491 } 492 493 // if a directory, ensure created even if empty 494 if (srcstat.isDirectory()) { 495 if (destFileSys.exists(absdst)) { 496 if (destFileSys.getFileStatus(absdst).isFile()) { 497 throw new IOException("Failed to mkdirs: " + absdst+" is a file."); 498 } 499 } 500 else if (!destFileSys.mkdirs(absdst)) { 501 throw new IOException("Failed to mkdirs " + absdst); 502 } 503 // TODO: when modification times can be set, directories should be 504 // emitted to reducers so they might be preserved. Also, mkdirs does 505 // not currently return an error when the directory already exists; 506 // if this changes, all directory work might as well be done in reduce 507 return; 508 } 509 510 // Can we skip copying this file ? 511 if (skipCopyFile(srcstat, absdst, outc, reporter)) { 512 return; 513 } 514 515 Path tmpfile = new Path(job.get(TMP_DIR_LABEL), relativedst); 516 // do the actual copy to tmpfile 517 long bytesCopied = doCopyFile(srcstat, tmpfile, absdst, reporter); 518 519 if (bytesCopied != srcstat.getLen()) { 520 throw new IOException("File size not matched: copied " 521 + bytesString(bytesCopied) + " to tmpfile (=" + tmpfile 522 + ") but expected " + bytesString(srcstat.getLen()) 523 + " from " + srcstat.getPath()); 524 } 525 else { 526 if (destFileSys.exists(absdst) && 527 destFileSys.getFileStatus(absdst).isDirectory()) { 528 throw new IOException(absdst + " is a directory"); 529 } 530 if (!destFileSys.mkdirs(absdst.getParent())) { 531 throw new IOException("Failed to create parent dir: " + absdst.getParent()); 532 } 533 rename(tmpfile, absdst); 534 535 if (!validateCopy(srcstat, absdst)) { 536 destFileSys.delete(absdst, false); 537 throw new IOException("Validation of copy of file " 538 + srcstat.getPath() + " failed."); 539 } 540 updateDestStatus(srcstat, destFileSys.getFileStatus(absdst)); 541 } 542 543 // report at least once for each file 544 updateCopyStatus(srcstat, reporter); 545 } 546 547 /** rename tmp to dst, delete dst if already exists */ rename(Path tmp, Path dst)548 private void rename(Path tmp, Path dst) throws IOException { 549 try { 550 if (destFileSys.exists(dst)) { 551 destFileSys.delete(dst, true); 552 } 553 if (!destFileSys.rename(tmp, dst)) { 554 throw new IOException(); 555 } 556 } 557 catch(IOException cause) { 558 throw (IOException)new IOException("Fail to rename tmp file (=" + tmp 559 + ") to destination file (=" + dst + ")").initCause(cause); 560 } 561 } 562 updateDestStatus(FileStatus src, FileStatus dst )563 private void updateDestStatus(FileStatus src, FileStatus dst 564 ) throws IOException { 565 if (preserve_status) { 566 DistCpV1.updateDestStatus(src, dst, preseved, destFileSys); 567 } 568 } 569 bytesString(long b)570 static String bytesString(long b) { 571 return b + " bytes (" + 572 TraditionalBinaryPrefix.long2String(b, "", 1) + ")"; 573 } 574 575 /** 576 * Copies a file and validates the copy by checking the checksums. 577 * If validation fails, retries (max number of tries is distcp.file.retries) 578 * to copy the file. 579 */ copyWithRetries(FileStatus srcstat, Path relativedst, OutputCollector<WritableComparable<?>, Text> out, Reporter reporter)580 void copyWithRetries(FileStatus srcstat, Path relativedst, 581 OutputCollector<WritableComparable<?>, Text> out, 582 Reporter reporter) throws IOException { 583 584 // max tries to copy when validation of copy fails 585 final int maxRetries = job.getInt(FILE_RETRIES_LABEL, DEFAULT_FILE_RETRIES); 586 // save update flag for later copies within the same map task 587 final boolean saveUpdate = update; 588 589 int retryCnt = 1; 590 for (; retryCnt <= maxRetries; retryCnt++) { 591 try { 592 //copy the file and validate copy 593 copy(srcstat, relativedst, out, reporter); 594 break;// copy successful 595 } catch (IOException e) { 596 LOG.warn("Copy of " + srcstat.getPath() + " failed.", e); 597 if (retryCnt < maxRetries) {// copy failed and need to retry 598 LOG.info("Retrying copy of file " + srcstat.getPath()); 599 update = true; // set update flag for retries 600 } 601 else {// no more retries... Give up 602 update = saveUpdate; 603 throw new IOException("Copy of file failed even with " + retryCnt 604 + " tries.", e); 605 } 606 } 607 } 608 } 609 610 /** Mapper configuration. 611 * Extracts source and destination file system, as well as 612 * top-level paths on source and destination directories. 613 * Gets the named file systems, to be used later in map. 614 */ configure(JobConf job)615 public void configure(JobConf job) 616 { 617 destPath = new Path(job.get(DST_DIR_LABEL, "/")); 618 try { 619 destFileSys = destPath.getFileSystem(job); 620 } catch (IOException ex) { 621 throw new RuntimeException("Unable to get the named file system.", ex); 622 } 623 sizeBuf = job.getInt("copy.buf.size", 128 * 1024); 624 buffer = new byte[sizeBuf]; 625 ignoreReadFailures = job.getBoolean(Options.IGNORE_READ_FAILURES.propertyname, false); 626 preserve_status = job.getBoolean(Options.PRESERVE_STATUS.propertyname, false); 627 if (preserve_status) { 628 preseved = FileAttribute.parse(job.get(PRESERVE_STATUS_LABEL)); 629 } 630 update = job.getBoolean(Options.UPDATE.propertyname, false); 631 overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname, false); 632 skipCRCCheck = job.getBoolean(Options.SKIPCRC.propertyname, false); 633 this.job = job; 634 } 635 636 /** Map method. Copies one file from source file system to destination. 637 * @param key src len 638 * @param value FilePair (FileStatus src, Path dst) 639 * @param out Log of failed copies 640 * @param reporter 641 */ map(LongWritable key, FilePair value, OutputCollector<WritableComparable<?>, Text> out, Reporter reporter)642 public void map(LongWritable key, 643 FilePair value, 644 OutputCollector<WritableComparable<?>, Text> out, 645 Reporter reporter) throws IOException { 646 final FileStatus srcstat = value.input; 647 final Path relativedst = new Path(value.output); 648 try { 649 copyWithRetries(srcstat, relativedst, out, reporter); 650 } catch (IOException e) { 651 ++failcount; 652 reporter.incrCounter(Counter.FAIL, 1); 653 updateStatus(reporter); 654 final String sfailure = "FAIL " + relativedst + " : " + 655 StringUtils.stringifyException(e); 656 out.collect(null, new Text(sfailure)); 657 LOG.info(sfailure); 658 if (e instanceof FileNotFoundException) { 659 final String s = "Possible Cause for failure: Either the filesystem " 660 + srcstat.getPath().getFileSystem(job) 661 + " is not accessible or the file is deleted"; 662 LOG.error(s); 663 out.collect(null, new Text(s)); 664 } 665 666 try { 667 for (int i = 0; i < 3; ++i) { 668 try { 669 final Path tmp = new Path(job.get(TMP_DIR_LABEL), relativedst); 670 if (destFileSys.delete(tmp, true)) 671 break; 672 } catch (Throwable ex) { 673 // ignore, we are just cleaning up 674 LOG.debug("Ignoring cleanup exception", ex); 675 } 676 // update status, so we don't get timed out 677 updateStatus(reporter); 678 Thread.sleep(3 * 1000); 679 } 680 } catch (InterruptedException inte) { 681 throw (IOException)new IOException().initCause(inte); 682 } 683 } finally { 684 updateStatus(reporter); 685 } 686 } 687 close()688 public void close() throws IOException { 689 if (0 == failcount || ignoreReadFailures) { 690 return; 691 } 692 throw new IOException(getCountString()); 693 } 694 } 695 fetchFileList(Configuration conf, Path srcList)696 private static List<Path> fetchFileList(Configuration conf, Path srcList) 697 throws IOException { 698 List<Path> result = new ArrayList<Path>(); 699 FileSystem fs = srcList.getFileSystem(conf); 700 try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.open(srcList), 701 Charset.forName("UTF-8")))) { 702 String line = input.readLine(); 703 while (line != null) { 704 result.add(new Path(line)); 705 line = input.readLine(); 706 } 707 } 708 return result; 709 } 710 711 @Deprecated copy(Configuration conf, String srcPath, String destPath, Path logPath, boolean srcAsList, boolean ignoreReadFailures)712 public static void copy(Configuration conf, String srcPath, 713 String destPath, Path logPath, 714 boolean srcAsList, boolean ignoreReadFailures) 715 throws IOException { 716 final Path src = new Path(srcPath); 717 List<Path> tmp = new ArrayList<Path>(); 718 if (srcAsList) { 719 tmp.addAll(fetchFileList(conf, src)); 720 } else { 721 tmp.add(src); 722 } 723 EnumSet<Options> flags = ignoreReadFailures 724 ? EnumSet.of(Options.IGNORE_READ_FAILURES) 725 : EnumSet.noneOf(Options.class); 726 727 final Path dst = new Path(destPath); 728 copy(conf, new Arguments(tmp, null, dst, logPath, flags, null, 729 Long.MAX_VALUE, Long.MAX_VALUE, null, false)); 730 } 731 732 /** Sanity check for srcPath */ checkSrcPath(JobConf jobConf, List<Path> srcPaths)733 private static void checkSrcPath(JobConf jobConf, List<Path> srcPaths) 734 throws IOException { 735 List<IOException> rslt = new ArrayList<IOException>(); 736 List<Path> unglobbed = new LinkedList<Path>(); 737 738 Path[] ps = new Path[srcPaths.size()]; 739 ps = srcPaths.toArray(ps); 740 TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf); 741 742 743 for (Path p : srcPaths) { 744 FileSystem fs = p.getFileSystem(jobConf); 745 FileStatus[] inputs = fs.globStatus(p); 746 747 if(inputs != null && inputs.length > 0) { 748 for (FileStatus onePath: inputs) { 749 unglobbed.add(onePath.getPath()); 750 } 751 } else { 752 rslt.add(new IOException("Input source " + p + " does not exist.")); 753 } 754 } 755 if (!rslt.isEmpty()) { 756 throw new InvalidInputException(rslt); 757 } 758 srcPaths.clear(); 759 srcPaths.addAll(unglobbed); 760 } 761 762 /** 763 * Driver to copy srcPath to destPath depending on required protocol. 764 * @param conf configuration 765 * @param args arguments 766 */ copy(final Configuration conf, final Arguments args )767 static void copy(final Configuration conf, final Arguments args 768 ) throws IOException { 769 LOG.info("srcPaths=" + args.srcs); 770 if (!args.dryrun || args.flags.contains(Options.UPDATE)) { 771 LOG.info("destPath=" + args.dst); 772 } 773 774 JobConf job = createJobConf(conf); 775 776 checkSrcPath(job, args.srcs); 777 if (args.preservedAttributes != null) { 778 job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes); 779 } 780 if (args.mapredSslConf != null) { 781 job.set("dfs.https.client.keystore.resource", args.mapredSslConf); 782 } 783 784 //Initialize the mapper 785 try { 786 if (setup(conf, job, args)) { 787 JobClient.runJob(job); 788 } 789 if(!args.dryrun) { 790 finalize(conf, job, args.dst, args.preservedAttributes); 791 } 792 } finally { 793 if (!args.dryrun) { 794 //delete tmp 795 fullyDelete(job.get(TMP_DIR_LABEL), job); 796 } 797 //delete jobDirectory 798 fullyDelete(job.get(JOB_DIR_LABEL), job); 799 } 800 } 801 updateDestStatus(FileStatus src, FileStatus dst, EnumSet<FileAttribute> preseved, FileSystem destFileSys )802 private static void updateDestStatus(FileStatus src, FileStatus dst, 803 EnumSet<FileAttribute> preseved, FileSystem destFileSys 804 ) throws IOException { 805 String owner = null; 806 String group = null; 807 if (preseved.contains(FileAttribute.USER) 808 && !src.getOwner().equals(dst.getOwner())) { 809 owner = src.getOwner(); 810 } 811 if (preseved.contains(FileAttribute.GROUP) 812 && !src.getGroup().equals(dst.getGroup())) { 813 group = src.getGroup(); 814 } 815 if (owner != null || group != null) { 816 destFileSys.setOwner(dst.getPath(), owner, group); 817 } 818 if (preseved.contains(FileAttribute.PERMISSION) 819 && !src.getPermission().equals(dst.getPermission())) { 820 destFileSys.setPermission(dst.getPath(), src.getPermission()); 821 } 822 if (preseved.contains(FileAttribute.TIMES)) { 823 destFileSys.setTimes(dst.getPath(), src.getModificationTime(), src.getAccessTime()); 824 } 825 } 826 finalize(Configuration conf, JobConf jobconf, final Path destPath, String presevedAttributes)827 static private void finalize(Configuration conf, JobConf jobconf, 828 final Path destPath, String presevedAttributes) throws IOException { 829 if (presevedAttributes == null) { 830 return; 831 } 832 EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes); 833 if (!preseved.contains(FileAttribute.USER) 834 && !preseved.contains(FileAttribute.GROUP) 835 && !preseved.contains(FileAttribute.PERMISSION)) { 836 return; 837 } 838 839 FileSystem dstfs = destPath.getFileSystem(conf); 840 Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL)); 841 try (SequenceFile.Reader in = 842 new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) { 843 Text dsttext = new Text(); 844 FilePair pair = new FilePair(); 845 for(; in.next(dsttext, pair); ) { 846 Path absdst = new Path(destPath, pair.output); 847 updateDestStatus(pair.input, dstfs.getFileStatus(absdst), 848 preseved, dstfs); 849 } 850 } 851 } 852 853 static class Arguments { 854 final List<Path> srcs; 855 final Path basedir; 856 final Path dst; 857 final Path log; 858 final EnumSet<Options> flags; 859 final String preservedAttributes; 860 final long filelimit; 861 final long sizelimit; 862 final String mapredSslConf; 863 final boolean dryrun; 864 865 /** 866 * Arguments for distcp 867 * @param srcs List of source paths 868 * @param basedir Base directory for copy 869 * @param dst Destination path 870 * @param log Log output directory 871 * @param flags Command-line flags 872 * @param preservedAttributes Preserved attributes 873 * @param filelimit File limit 874 * @param sizelimit Size limit 875 * @param mapredSslConf ssl configuration 876 * @param dryrun 877 */ Arguments(List<Path> srcs, Path basedir, Path dst, Path log, EnumSet<Options> flags, String preservedAttributes, long filelimit, long sizelimit, String mapredSslConf, boolean dryrun)878 Arguments(List<Path> srcs, Path basedir, Path dst, Path log, 879 EnumSet<Options> flags, String preservedAttributes, 880 long filelimit, long sizelimit, String mapredSslConf, 881 boolean dryrun) { 882 this.srcs = srcs; 883 this.basedir = basedir; 884 this.dst = dst; 885 this.log = log; 886 this.flags = flags; 887 this.preservedAttributes = preservedAttributes; 888 this.filelimit = filelimit; 889 this.sizelimit = sizelimit; 890 this.mapredSslConf = mapredSslConf; 891 this.dryrun = dryrun; 892 893 if (LOG.isTraceEnabled()) { 894 LOG.trace("this = " + this); 895 } 896 } 897 valueOf(String[] args, Configuration conf )898 static Arguments valueOf(String[] args, Configuration conf 899 ) throws IOException { 900 List<Path> srcs = new ArrayList<Path>(); 901 Path dst = null; 902 Path log = null; 903 Path basedir = null; 904 EnumSet<Options> flags = EnumSet.noneOf(Options.class); 905 String presevedAttributes = null; 906 String mapredSslConf = null; 907 long filelimit = Long.MAX_VALUE; 908 long sizelimit = Long.MAX_VALUE; 909 boolean dryrun = false; 910 911 for (int idx = 0; idx < args.length; idx++) { 912 Options[] opt = Options.values(); 913 int i = 0; 914 for(; i < opt.length && !args[idx].startsWith(opt[i].cmd); i++); 915 916 if (i < opt.length) { 917 flags.add(opt[i]); 918 if (opt[i] == Options.PRESERVE_STATUS) { 919 presevedAttributes = args[idx].substring(2); 920 FileAttribute.parse(presevedAttributes); //validation 921 } 922 else if (opt[i] == Options.FILE_LIMIT) { 923 filelimit = Options.FILE_LIMIT.parseLong(args, ++idx); 924 } 925 else if (opt[i] == Options.SIZE_LIMIT) { 926 sizelimit = Options.SIZE_LIMIT.parseLong(args, ++idx); 927 } 928 } else if ("-f".equals(args[idx])) { 929 if (++idx == args.length) { 930 throw new IllegalArgumentException("urilist_uri not specified in -f"); 931 } 932 srcs.addAll(fetchFileList(conf, new Path(args[idx]))); 933 } else if ("-log".equals(args[idx])) { 934 if (++idx == args.length) { 935 throw new IllegalArgumentException("logdir not specified in -log"); 936 } 937 log = new Path(args[idx]); 938 } else if ("-basedir".equals(args[idx])) { 939 if (++idx == args.length) { 940 throw new IllegalArgumentException("basedir not specified in -basedir"); 941 } 942 basedir = new Path(args[idx]); 943 } else if ("-mapredSslConf".equals(args[idx])) { 944 if (++idx == args.length) { 945 throw new IllegalArgumentException("ssl conf file not specified in -mapredSslConf"); 946 } 947 mapredSslConf = args[idx]; 948 } else if ("-dryrun".equals(args[idx])) { 949 dryrun = true; 950 dst = new Path("/tmp/distcp_dummy_dest");//dummy destination 951 } else if ("-m".equals(args[idx])) { 952 if (++idx == args.length) { 953 throw new IllegalArgumentException("num_maps not specified in -m"); 954 } 955 try { 956 conf.setInt(MAX_MAPS_LABEL, Integer.parseInt(args[idx])); 957 } catch (NumberFormatException e) { 958 throw new IllegalArgumentException("Invalid argument to -m: " + 959 args[idx]); 960 } 961 } else if ('-' == args[idx].codePointAt(0)) { 962 throw new IllegalArgumentException("Invalid switch " + args[idx]); 963 } else if (idx == args.length -1 && 964 (!dryrun || flags.contains(Options.UPDATE))) { 965 dst = new Path(args[idx]); 966 } else { 967 srcs.add(new Path(args[idx])); 968 } 969 } 970 // mandatory command-line parameters 971 if (srcs.isEmpty() || dst == null) { 972 throw new IllegalArgumentException("Missing " 973 + (dst == null ? "dst path" : "src")); 974 } 975 // incompatible command-line flags 976 final boolean isOverwrite = flags.contains(Options.OVERWRITE); 977 final boolean isUpdate = flags.contains(Options.UPDATE); 978 final boolean isDelete = flags.contains(Options.DELETE); 979 final boolean skipCRC = flags.contains(Options.SKIPCRC); 980 if (isOverwrite && isUpdate) { 981 throw new IllegalArgumentException("Conflicting overwrite policies"); 982 } 983 if (!isUpdate && skipCRC) { 984 throw new IllegalArgumentException( 985 Options.SKIPCRC.cmd + " is relevant only with the " + 986 Options.UPDATE.cmd + " option"); 987 } 988 if (isDelete && !isOverwrite && !isUpdate) { 989 throw new IllegalArgumentException(Options.DELETE.cmd 990 + " must be specified with " + Options.OVERWRITE + " or " 991 + Options.UPDATE + "."); 992 } 993 return new Arguments(srcs, basedir, dst, log, flags, presevedAttributes, 994 filelimit, sizelimit, mapredSslConf, dryrun); 995 } 996 997 /** {@inheritDoc} */ toString()998 public String toString() { 999 return getClass().getName() + "{" 1000 + "\n srcs = " + srcs 1001 + "\n dst = " + dst 1002 + "\n log = " + log 1003 + "\n flags = " + flags 1004 + "\n preservedAttributes = " + preservedAttributes 1005 + "\n filelimit = " + filelimit 1006 + "\n sizelimit = " + sizelimit 1007 + "\n mapredSslConf = " + mapredSslConf 1008 + "\n}"; 1009 } 1010 } 1011 1012 /** 1013 * This is the main driver for recursively copying directories 1014 * across file systems. It takes at least two cmdline parameters. A source 1015 * URL and a destination URL. It then essentially does an "ls -lR" on the 1016 * source URL, and writes the output in a round-robin manner to all the map 1017 * input files. The mapper actually copies the files allotted to it. The 1018 * reduce is empty. 1019 */ run(String[] args)1020 public int run(String[] args) { 1021 try { 1022 copy(conf, Arguments.valueOf(args, conf)); 1023 return 0; 1024 } catch (IllegalArgumentException e) { 1025 System.err.println(StringUtils.stringifyException(e) + "\n" + usage); 1026 ToolRunner.printGenericCommandUsage(System.err); 1027 return -1; 1028 } catch (DuplicationException e) { 1029 System.err.println(StringUtils.stringifyException(e)); 1030 return DuplicationException.ERROR_CODE; 1031 } catch (RemoteException e) { 1032 final IOException unwrapped = e.unwrapRemoteException( 1033 FileNotFoundException.class, 1034 AccessControlException.class, 1035 QuotaExceededException.class); 1036 System.err.println(StringUtils.stringifyException(unwrapped)); 1037 return -3; 1038 } catch (Exception e) { 1039 System.err.println("With failures, global counters are inaccurate; " + 1040 "consider running with -i"); 1041 System.err.println("Copy failed: " + StringUtils.stringifyException(e)); 1042 return -999; 1043 } 1044 } 1045 main(String[] args)1046 public static void main(String[] args) throws Exception { 1047 JobConf job = new JobConf(DistCpV1.class); 1048 DistCpV1 distcp = new DistCpV1(job); 1049 int res = ToolRunner.run(distcp, args); 1050 System.exit(res); 1051 } 1052 1053 /** 1054 * Make a path relative with respect to a root path. 1055 * absPath is always assumed to descend from root. 1056 * Otherwise returned path is null. 1057 */ makeRelative(Path root, Path absPath)1058 static String makeRelative(Path root, Path absPath) { 1059 if (!absPath.isAbsolute()) { 1060 throw new IllegalArgumentException("!absPath.isAbsolute(), absPath=" 1061 + absPath); 1062 } 1063 String p = absPath.toUri().getPath(); 1064 1065 StringTokenizer pathTokens = new StringTokenizer(p, "/"); 1066 for(StringTokenizer rootTokens = new StringTokenizer( 1067 root.toUri().getPath(), "/"); rootTokens.hasMoreTokens(); ) { 1068 if (!rootTokens.nextToken().equals(pathTokens.nextToken())) { 1069 return null; 1070 } 1071 } 1072 StringBuilder sb = new StringBuilder(); 1073 for(; pathTokens.hasMoreTokens(); ) { 1074 sb.append(pathTokens.nextToken()); 1075 if (pathTokens.hasMoreTokens()) { sb.append(Path.SEPARATOR); } 1076 } 1077 return sb.length() == 0? ".": sb.toString(); 1078 } 1079 1080 /** 1081 * Calculate how many maps to run. 1082 * Number of maps is bounded by a minimum of the cumulative size of the 1083 * copy / (distcp.bytes.per.map, default BYTES_PER_MAP or -m on the 1084 * command line) and at most (distcp.max.map.tasks, default 1085 * MAX_MAPS_PER_NODE * nodes in the cluster). 1086 * @param totalBytes Count of total bytes for job 1087 * @param job The job to configure 1088 * @return Count of maps to run. 1089 */ setMapCount(long totalBytes, JobConf job)1090 private static int setMapCount(long totalBytes, JobConf job) 1091 throws IOException { 1092 int numMaps = 1093 (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP)); 1094 numMaps = Math.min(numMaps, 1095 job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE * 1096 new JobClient(job).getClusterStatus().getTaskTrackers())); 1097 numMaps = Math.max(numMaps, 1); 1098 job.setNumMapTasks(numMaps); 1099 return numMaps; 1100 } 1101 1102 /** Fully delete dir */ fullyDelete(String dir, Configuration conf)1103 static void fullyDelete(String dir, Configuration conf) throws IOException { 1104 if (dir != null) { 1105 Path tmp = new Path(dir); 1106 boolean success = tmp.getFileSystem(conf).delete(tmp, true); 1107 if (!success) { 1108 LOG.warn("Could not fully delete " + tmp); 1109 } 1110 } 1111 } 1112 1113 //Job configuration createJobConf(Configuration conf)1114 private static JobConf createJobConf(Configuration conf) { 1115 JobConf jobconf = new JobConf(conf, DistCpV1.class); 1116 jobconf.setJobName(conf.get("mapred.job.name", NAME)); 1117 1118 // turn off speculative execution, because DFS doesn't handle 1119 // multiple writers to the same file. 1120 jobconf.setMapSpeculativeExecution(false); 1121 1122 jobconf.setInputFormat(CopyInputFormat.class); 1123 jobconf.setOutputKeyClass(Text.class); 1124 jobconf.setOutputValueClass(Text.class); 1125 1126 jobconf.setMapperClass(CopyFilesMapper.class); 1127 jobconf.setNumReduceTasks(0); 1128 return jobconf; 1129 } 1130 1131 private static final Random RANDOM = new Random(); getRandomId()1132 public static String getRandomId() { 1133 return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36); 1134 } 1135 1136 /** 1137 * Increase the replication factor of _distcp_src_files to 1138 * sqrt(min(maxMapsOnCluster, numMaps)). This is to reduce the chance of 1139 * failing of distcp because of "not having a replication of _distcp_src_files 1140 * available for reading for some maps". 1141 */ setReplication(Configuration conf, JobConf jobConf, Path srcfilelist, int numMaps)1142 private static void setReplication(Configuration conf, JobConf jobConf, 1143 Path srcfilelist, int numMaps) throws IOException { 1144 int numMaxMaps = new JobClient(jobConf).getClusterStatus().getMaxMapTasks(); 1145 short replication = (short) Math.ceil( 1146 Math.sqrt(Math.min(numMaxMaps, numMaps))); 1147 FileSystem fs = srcfilelist.getFileSystem(conf); 1148 FileStatus srcStatus = fs.getFileStatus(srcfilelist); 1149 1150 if (srcStatus.getReplication() < replication) { 1151 if (!fs.setReplication(srcfilelist, replication)) { 1152 throw new IOException("Unable to increase the replication of file " + 1153 srcfilelist); 1154 } 1155 } 1156 } 1157 1158 /** 1159 * Does the dir already exist at destination ? 1160 * @return true if the dir already exists at destination 1161 */ dirExists(Configuration conf, Path dst)1162 private static boolean dirExists(Configuration conf, Path dst) 1163 throws IOException { 1164 FileSystem destFileSys = dst.getFileSystem(conf); 1165 FileStatus status = null; 1166 try { 1167 status = destFileSys.getFileStatus(dst); 1168 }catch (FileNotFoundException e) { 1169 return false; 1170 } 1171 if (status.isFile()) { 1172 throw new FileAlreadyExistsException("Not a dir: " + dst+" is a file."); 1173 } 1174 return true; 1175 } 1176 1177 /** 1178 * Initialize DFSCopyFileMapper specific job-configuration. 1179 * @param conf : The dfs/mapred configuration. 1180 * @param jobConf : The handle to the jobConf object to be initialized. 1181 * @param args Arguments 1182 * @return true if it is necessary to launch a job. 1183 */ setup(Configuration conf, JobConf jobConf, final Arguments args)1184 static boolean setup(Configuration conf, JobConf jobConf, 1185 final Arguments args) 1186 throws IOException { 1187 jobConf.set(DST_DIR_LABEL, args.dst.toUri().toString()); 1188 1189 //set boolean values 1190 final boolean update = args.flags.contains(Options.UPDATE); 1191 final boolean skipCRCCheck = args.flags.contains(Options.SKIPCRC); 1192 final boolean overwrite = !update && args.flags.contains(Options.OVERWRITE) 1193 && !args.dryrun; 1194 jobConf.setBoolean(Options.UPDATE.propertyname, update); 1195 jobConf.setBoolean(Options.SKIPCRC.propertyname, skipCRCCheck); 1196 jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite); 1197 jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname, 1198 args.flags.contains(Options.IGNORE_READ_FAILURES)); 1199 jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname, 1200 args.flags.contains(Options.PRESERVE_STATUS)); 1201 1202 final String randomId = getRandomId(); 1203 JobClient jClient = new JobClient(jobConf); 1204 Path stagingArea; 1205 try { 1206 stagingArea = 1207 JobSubmissionFiles.getStagingDir(jClient.getClusterHandle(), conf); 1208 } catch (InterruptedException ie) { 1209 throw new IOException(ie); 1210 } 1211 1212 Path jobDirectory = new Path(stagingArea + NAME + "_" + randomId); 1213 FsPermission mapredSysPerms = 1214 new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); 1215 FileSystem.mkdirs(jClient.getFs(), jobDirectory, mapredSysPerms); 1216 jobConf.set(JOB_DIR_LABEL, jobDirectory.toString()); 1217 1218 long maxBytesPerMap = conf.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP); 1219 1220 FileSystem dstfs = args.dst.getFileSystem(conf); 1221 1222 // get tokens for all the required FileSystems.. 1223 TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), 1224 new Path[] {args.dst}, conf); 1225 1226 1227 boolean dstExists = dstfs.exists(args.dst); 1228 boolean dstIsDir = false; 1229 if (dstExists) { 1230 dstIsDir = dstfs.getFileStatus(args.dst).isDirectory(); 1231 } 1232 1233 // default logPath 1234 Path logPath = args.log; 1235 if (logPath == null) { 1236 String filename = "_distcp_logs_" + randomId; 1237 if (!dstExists || !dstIsDir) { 1238 Path parent = args.dst.getParent(); 1239 if (null == parent) { 1240 // If dst is '/' on S3, it might not exist yet, but dst.getParent() 1241 // will return null. In this case, use '/' as its own parent to prevent 1242 // NPE errors below. 1243 parent = args.dst; 1244 } 1245 if (!dstfs.exists(parent)) { 1246 dstfs.mkdirs(parent); 1247 } 1248 logPath = new Path(parent, filename); 1249 } else { 1250 logPath = new Path(args.dst, filename); 1251 } 1252 } 1253 FileOutputFormat.setOutputPath(jobConf, logPath); 1254 1255 // create src list, dst list 1256 FileSystem jobfs = jobDirectory.getFileSystem(jobConf); 1257 1258 Path srcfilelist = new Path(jobDirectory, "_distcp_src_files"); 1259 Path dstfilelist = new Path(jobDirectory, "_distcp_dst_files"); 1260 Path dstdirlist = new Path(jobDirectory, "_distcp_dst_dirs"); 1261 jobConf.set(SRC_LIST_LABEL, srcfilelist.toString()); 1262 jobConf.set(DST_DIR_LIST_LABEL, dstdirlist.toString()); 1263 int srcCount = 0, cnsyncf = 0, dirsyn = 0; 1264 long fileCount = 0L, dirCount = 0L, byteCount = 0L, cbsyncs = 0L, 1265 skipFileCount = 0L, skipByteCount = 0L; 1266 try ( 1267 SequenceFile.Writer src_writer = SequenceFile.createWriter(jobConf, 1268 Writer.file(srcfilelist), Writer.keyClass(LongWritable.class), 1269 Writer.valueClass(FilePair.class), Writer.compression( 1270 SequenceFile.CompressionType.NONE)); 1271 SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobConf, 1272 Writer.file(dstfilelist), Writer.keyClass(Text.class), 1273 Writer.valueClass(Text.class), Writer.compression( 1274 SequenceFile.CompressionType.NONE)); 1275 SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobConf, 1276 Writer.file(dstdirlist), Writer.keyClass(Text.class), 1277 Writer.valueClass(FilePair.class), Writer.compression( 1278 SequenceFile.CompressionType.NONE)); 1279 ) { 1280 // handle the case where the destination directory doesn't exist 1281 // and we've only a single src directory OR we're updating/overwriting 1282 // the contents of the destination directory. 1283 final boolean special = 1284 (args.srcs.size() == 1 && !dstExists) || update || overwrite; 1285 1286 Path basedir = null; 1287 HashSet<Path> parentDirsToCopy = new HashSet<Path>(); 1288 if (args.basedir != null) { 1289 FileSystem basefs = args.basedir.getFileSystem(conf); 1290 basedir = args.basedir.makeQualified( 1291 basefs.getUri(), basefs.getWorkingDirectory()); 1292 if (!basefs.isDirectory(basedir)) { 1293 throw new IOException("Basedir " + basedir + " is not a directory."); 1294 } 1295 } 1296 1297 for(Iterator<Path> srcItr = args.srcs.iterator(); srcItr.hasNext(); ) { 1298 final Path src = srcItr.next(); 1299 FileSystem srcfs = src.getFileSystem(conf); 1300 FileStatus srcfilestat = srcfs.getFileStatus(src); 1301 Path root = special && srcfilestat.isDirectory()? src: src.getParent(); 1302 if (dstExists && !dstIsDir && 1303 (args.srcs.size() > 1 || srcfilestat.isDirectory())) { 1304 // destination should not be a file 1305 throw new IOException("Destination " + args.dst + " should be a dir" + 1306 " if multiple source paths are there OR if" + 1307 " the source path is a dir"); 1308 } 1309 1310 if (basedir != null) { 1311 root = basedir; 1312 Path parent = src.getParent().makeQualified( 1313 srcfs.getUri(), srcfs.getWorkingDirectory()); 1314 while (parent != null && !parent.equals(basedir)) { 1315 if (!parentDirsToCopy.contains(parent)){ 1316 parentDirsToCopy.add(parent); 1317 String dst = makeRelative(root, parent); 1318 FileStatus pst = srcfs.getFileStatus(parent); 1319 src_writer.append(new LongWritable(0), new FilePair(pst, dst)); 1320 dst_writer.append(new Text(dst), new Text(parent.toString())); 1321 dir_writer.append(new Text(dst), new FilePair(pst, dst)); 1322 if (++dirsyn > SYNC_FILE_MAX) { 1323 dirsyn = 0; 1324 dir_writer.sync(); 1325 } 1326 } 1327 parent = parent.getParent(); 1328 } 1329 1330 if (parent == null) { 1331 throw new IOException("Basedir " + basedir + 1332 " is not a prefix of source path " + src); 1333 } 1334 } 1335 1336 if (srcfilestat.isDirectory()) { 1337 ++srcCount; 1338 final String dst = makeRelative(root,src); 1339 if (!update || !dirExists(conf, new Path(args.dst, dst))) { 1340 ++dirCount; 1341 src_writer.append(new LongWritable(0), 1342 new FilePair(srcfilestat, dst)); 1343 } 1344 dst_writer.append(new Text(dst), new Text(src.toString())); 1345 } 1346 1347 Stack<FileStatus> pathstack = new Stack<FileStatus>(); 1348 for(pathstack.push(srcfilestat); !pathstack.empty(); ) { 1349 FileStatus cur = pathstack.pop(); 1350 FileStatus[] children = srcfs.listStatus(cur.getPath()); 1351 for(int i = 0; i < children.length; i++) { 1352 boolean skipPath = false; 1353 final FileStatus child = children[i]; 1354 final String dst = makeRelative(root, child.getPath()); 1355 ++srcCount; 1356 1357 if (child.isDirectory()) { 1358 pathstack.push(child); 1359 if (!update || !dirExists(conf, new Path(args.dst, dst))) { 1360 ++dirCount; 1361 } 1362 else { 1363 skipPath = true; // skip creating dir at destination 1364 } 1365 } 1366 else { 1367 Path destPath = new Path(args.dst, dst); 1368 if (cur.isFile() && (args.srcs.size() == 1)) { 1369 // Copying a single file; use dst path provided by user as 1370 // destination file rather than destination directory 1371 Path dstparent = destPath.getParent(); 1372 FileSystem destFileSys = destPath.getFileSystem(jobConf); 1373 if (!(destFileSys.exists(dstparent) && 1374 destFileSys.getFileStatus(dstparent).isDirectory())) { 1375 destPath = dstparent; 1376 } 1377 } 1378 //skip path if the src and the dst files are the same. 1379 skipPath = update && 1380 sameFile(srcfs, child, dstfs, destPath, skipCRCCheck); 1381 //skip path if it exceed file limit or size limit 1382 skipPath |= fileCount == args.filelimit 1383 || byteCount + child.getLen() > args.sizelimit; 1384 1385 if (!skipPath) { 1386 ++fileCount; 1387 byteCount += child.getLen(); 1388 1389 if (LOG.isTraceEnabled()) { 1390 LOG.trace("adding file " + child.getPath()); 1391 } 1392 1393 ++cnsyncf; 1394 cbsyncs += child.getLen(); 1395 if (cnsyncf > SYNC_FILE_MAX || cbsyncs > maxBytesPerMap) { 1396 src_writer.sync(); 1397 dst_writer.sync(); 1398 cnsyncf = 0; 1399 cbsyncs = 0L; 1400 } 1401 } 1402 else { 1403 ++skipFileCount; 1404 skipByteCount += child.getLen(); 1405 if (LOG.isTraceEnabled()) { 1406 LOG.trace("skipping file " + child.getPath()); 1407 } 1408 } 1409 } 1410 1411 if (!skipPath) { 1412 src_writer.append(new LongWritable(child.isDirectory()? 0: child.getLen()), 1413 new FilePair(child, dst)); 1414 } 1415 1416 dst_writer.append(new Text(dst), 1417 new Text(child.getPath().toString())); 1418 } 1419 1420 if (cur.isDirectory()) { 1421 String dst = makeRelative(root, cur.getPath()); 1422 dir_writer.append(new Text(dst), new FilePair(cur, dst)); 1423 if (++dirsyn > SYNC_FILE_MAX) { 1424 dirsyn = 0; 1425 dir_writer.sync(); 1426 } 1427 } 1428 } 1429 } 1430 } 1431 LOG.info("sourcePathsCount(files+directories)=" + srcCount); 1432 LOG.info("filesToCopyCount=" + fileCount); 1433 LOG.info("bytesToCopyCount=" + 1434 TraditionalBinaryPrefix.long2String(byteCount, "", 1)); 1435 if (update) { 1436 LOG.info("filesToSkipCopyCount=" + skipFileCount); 1437 LOG.info("bytesToSkipCopyCount=" + 1438 TraditionalBinaryPrefix.long2String(skipByteCount, "", 1)); 1439 } 1440 if (args.dryrun) { 1441 return false; 1442 } 1443 int mapCount = setMapCount(byteCount, jobConf); 1444 // Increase the replication of _distcp_src_files, if needed 1445 setReplication(conf, jobConf, srcfilelist, mapCount); 1446 1447 FileStatus dststatus = null; 1448 try { 1449 dststatus = dstfs.getFileStatus(args.dst); 1450 } catch(FileNotFoundException fnfe) { 1451 LOG.info(args.dst + " does not exist."); 1452 } 1453 1454 // create dest path dir if copying > 1 file 1455 if (dststatus == null) { 1456 if (srcCount > 1 && !dstfs.mkdirs(args.dst)) { 1457 throw new IOException("Failed to create" + args.dst); 1458 } 1459 } 1460 1461 final Path sorted = new Path(jobDirectory, "_distcp_sorted"); 1462 checkDuplication(jobfs, dstfilelist, sorted, conf); 1463 1464 if (dststatus != null && args.flags.contains(Options.DELETE)) { 1465 long deletedPathsCount = deleteNonexisting(dstfs, dststatus, sorted, 1466 jobfs, jobDirectory, jobConf, conf); 1467 LOG.info("deletedPathsFromDestCount(files+directories)=" + 1468 deletedPathsCount); 1469 } 1470 1471 Path tmpDir = new Path( 1472 (dstExists && !dstIsDir) || (!dstExists && srcCount == 1)? 1473 args.dst.getParent(): args.dst, "_distcp_tmp_" + randomId); 1474 jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString()); 1475 1476 // Explicitly create the tmpDir to ensure that it can be cleaned 1477 // up by fullyDelete() later. 1478 tmpDir.getFileSystem(conf).mkdirs(tmpDir); 1479 1480 LOG.info("sourcePathsCount=" + srcCount); 1481 LOG.info("filesToCopyCount=" + fileCount); 1482 LOG.info("bytesToCopyCount=" + 1483 TraditionalBinaryPrefix.long2String(byteCount, "", 1)); 1484 jobConf.setInt(SRC_COUNT_LABEL, srcCount); 1485 jobConf.setLong(TOTAL_SIZE_LABEL, byteCount); 1486 1487 return (fileCount + dirCount) > 0; 1488 } 1489 1490 /** 1491 * Check whether the contents of src and dst are the same. 1492 * 1493 * Return false if dstpath does not exist 1494 * 1495 * If the files have different sizes, return false. 1496 * 1497 * If the files have the same sizes, the file checksums will be compared. 1498 * 1499 * When file checksum is not supported in any of file systems, 1500 * two files are considered as the same if they have the same size. 1501 */ sameFile(FileSystem srcfs, FileStatus srcstatus, FileSystem dstfs, Path dstpath, boolean skipCRCCheck)1502 static private boolean sameFile(FileSystem srcfs, FileStatus srcstatus, 1503 FileSystem dstfs, Path dstpath, boolean skipCRCCheck) throws IOException { 1504 FileStatus dststatus; 1505 try { 1506 dststatus = dstfs.getFileStatus(dstpath); 1507 } catch(FileNotFoundException fnfe) { 1508 return false; 1509 } 1510 1511 //same length? 1512 if (srcstatus.getLen() != dststatus.getLen()) { 1513 return false; 1514 } 1515 1516 if (skipCRCCheck) { 1517 LOG.debug("Skipping the CRC check"); 1518 return true; 1519 } 1520 1521 //get src checksum 1522 final FileChecksum srccs; 1523 try { 1524 srccs = srcfs.getFileChecksum(srcstatus.getPath()); 1525 } catch(FileNotFoundException fnfe) { 1526 /* 1527 * Two possible cases: 1528 * (1) src existed once but was deleted between the time period that 1529 * srcstatus was obtained and the try block above. 1530 * (2) srcfs does not support file checksum and (incorrectly) throws 1531 * FNFE, e.g. some previous versions of HftpFileSystem. 1532 * For case (1), it is okay to return true since src was already deleted. 1533 * For case (2), true should be returned. 1534 */ 1535 return true; 1536 } 1537 1538 //compare checksums 1539 try { 1540 final FileChecksum dstcs = dstfs.getFileChecksum(dststatus.getPath()); 1541 //return true if checksum is not supported 1542 //(i.e. some of the checksums is null) 1543 return srccs == null || dstcs == null || srccs.equals(dstcs); 1544 } catch(FileNotFoundException fnfe) { 1545 return false; 1546 } 1547 } 1548 1549 /** 1550 * Delete the dst files/dirs which do not exist in src 1551 * 1552 * @return total count of files and directories deleted from destination 1553 * @throws IOException 1554 */ deleteNonexisting( FileSystem dstfs, FileStatus dstroot, Path dstsorted, FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf )1555 static private long deleteNonexisting( 1556 FileSystem dstfs, FileStatus dstroot, Path dstsorted, 1557 FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf 1558 ) throws IOException { 1559 if (dstroot.isFile()) { 1560 throw new IOException("dst must be a directory when option " 1561 + Options.DELETE.cmd + " is set, but dst (= " + dstroot.getPath() 1562 + ") is not a directory."); 1563 } 1564 1565 //write dst lsr results 1566 final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr"); 1567 try (final SequenceFile.Writer writer = SequenceFile.createWriter(jobconf, 1568 Writer.file(dstlsr), Writer.keyClass(Text.class), 1569 Writer.valueClass(NullWritable.class), Writer.compression( 1570 SequenceFile.CompressionType.NONE))) { 1571 //do lsr to get all file statuses in dstroot 1572 final Stack<FileStatus> lsrstack = new Stack<FileStatus>(); 1573 for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) { 1574 final FileStatus status = lsrstack.pop(); 1575 if (status.isDirectory()) { 1576 for(FileStatus child : dstfs.listStatus(status.getPath())) { 1577 String relative = makeRelative(dstroot.getPath(), child.getPath()); 1578 writer.append(new Text(relative), NullWritable.get()); 1579 lsrstack.push(child); 1580 } 1581 } 1582 } 1583 } 1584 1585 //sort lsr results 1586 final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted"); 1587 SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs, 1588 new Text.Comparator(), Text.class, NullWritable.class, jobconf); 1589 sorter.sort(dstlsr, sortedlsr); 1590 1591 //compare lsr list and dst list 1592 long deletedPathsCount = 0; 1593 try (SequenceFile.Reader lsrin = 1594 new SequenceFile.Reader(jobconf, Reader.file(sortedlsr)); 1595 SequenceFile.Reader dstin = 1596 new SequenceFile.Reader(jobconf, Reader.file(dstsorted))) { 1597 //compare sorted lsr list and sorted dst list 1598 final Text lsrpath = new Text(); 1599 final Text dstpath = new Text(); 1600 final Text dstfrom = new Text(); 1601 final Trash trash = new Trash(dstfs, conf); 1602 Path lastpath = null; 1603 1604 boolean hasnext = dstin.next(dstpath, dstfrom); 1605 while (lsrin.next(lsrpath, NullWritable.get())) { 1606 int dst_cmp_lsr = dstpath.compareTo(lsrpath); 1607 while (hasnext && dst_cmp_lsr < 0) { 1608 hasnext = dstin.next(dstpath, dstfrom); 1609 dst_cmp_lsr = dstpath.compareTo(lsrpath); 1610 } 1611 1612 if (dst_cmp_lsr == 0) { 1613 //lsrpath exists in dst, skip it 1614 hasnext = dstin.next(dstpath, dstfrom); 1615 } else { 1616 //lsrpath does not exist, delete it 1617 final Path rmpath = new Path(dstroot.getPath(), lsrpath.toString()); 1618 ++deletedPathsCount; 1619 if ((lastpath == null || !isAncestorPath(lastpath, rmpath))) { 1620 if (!(trash.moveToTrash(rmpath) || dstfs.delete(rmpath, true))) { 1621 throw new IOException("Failed to delete " + rmpath); 1622 } 1623 lastpath = rmpath; 1624 } 1625 } 1626 } 1627 } 1628 return deletedPathsCount; 1629 } 1630 1631 //is x an ancestor path of y? isAncestorPath(Path xp, Path yp)1632 static private boolean isAncestorPath(Path xp, Path yp) { 1633 final String x = xp.toString(); 1634 final String y = yp.toString(); 1635 if (!y.startsWith(x)) { 1636 return false; 1637 } 1638 final int len = x.length(); 1639 return y.length() == len || y.charAt(len) == Path.SEPARATOR_CHAR; 1640 } 1641 1642 /** Check whether the file list have duplication. */ checkDuplication(FileSystem fs, Path file, Path sorted, Configuration conf)1643 static private void checkDuplication(FileSystem fs, Path file, Path sorted, 1644 Configuration conf) throws IOException { 1645 SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, 1646 new Text.Comparator(), Text.class, Text.class, conf); 1647 sorter.sort(file, sorted); 1648 try (SequenceFile.Reader in = 1649 new SequenceFile.Reader(conf, Reader.file(sorted))) { 1650 Text prevdst = null, curdst = new Text(); 1651 Text prevsrc = null, cursrc = new Text(); 1652 for(; in.next(curdst, cursrc); ) { 1653 if (prevdst != null && curdst.equals(prevdst)) { 1654 throw new DuplicationException( 1655 "Invalid input, there are duplicated files in the sources: " 1656 + prevsrc + ", " + cursrc); 1657 } 1658 prevdst = curdst; 1659 curdst = new Text(); 1660 prevsrc = cursrc; 1661 cursrc = new Text(); 1662 } 1663 } 1664 } 1665 1666 /** An exception class for duplicated source files. */ 1667 public static class DuplicationException extends IOException { 1668 private static final long serialVersionUID = 1L; 1669 /** Error code for this exception */ 1670 public static final int ERROR_CODE = -2; DuplicationException(String message)1671 DuplicationException(String message) {super(message);} 1672 } 1673 } 1674