1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.tools;
20 
21 import java.io.BufferedReader;
22 import java.io.DataInput;
23 import java.io.DataOutput;
24 import java.io.FileNotFoundException;
25 import java.io.IOException;
26 import java.io.InputStreamReader;
27 import java.nio.charset.Charset;
28 import java.util.ArrayList;
29 import java.util.EnumSet;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Random;
35 import java.util.Stack;
36 import java.util.StringTokenizer;
37 
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.FSDataInputStream;
42 import org.apache.hadoop.fs.FSDataOutputStream;
43 import org.apache.hadoop.fs.FileAlreadyExistsException;
44 import org.apache.hadoop.fs.FileChecksum;
45 import org.apache.hadoop.fs.FileStatus;
46 import org.apache.hadoop.fs.FileSystem;
47 import org.apache.hadoop.fs.Path;
48 import org.apache.hadoop.fs.Trash;
49 import org.apache.hadoop.fs.permission.FsPermission;
50 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
51 import org.apache.hadoop.io.LongWritable;
52 import org.apache.hadoop.io.NullWritable;
53 import org.apache.hadoop.io.SequenceFile;
54 import org.apache.hadoop.io.SequenceFile.Reader;
55 import org.apache.hadoop.io.Text;
56 import org.apache.hadoop.io.Writable;
57 import org.apache.hadoop.io.WritableComparable;
58 import org.apache.hadoop.io.SequenceFile.Writer;
59 import org.apache.hadoop.ipc.RemoteException;
60 import org.apache.hadoop.mapred.FileOutputFormat;
61 import org.apache.hadoop.mapred.FileSplit;
62 import org.apache.hadoop.mapred.InputFormat;
63 import org.apache.hadoop.mapred.InputSplit;
64 import org.apache.hadoop.mapred.InvalidInputException;
65 import org.apache.hadoop.mapred.JobClient;
66 import org.apache.hadoop.mapred.JobConf;
67 import org.apache.hadoop.mapred.Mapper;
68 import org.apache.hadoop.mapred.OutputCollector;
69 import org.apache.hadoop.mapred.RecordReader;
70 import org.apache.hadoop.mapred.Reporter;
71 import org.apache.hadoop.mapred.SequenceFileRecordReader;
72 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
73 import org.apache.hadoop.mapreduce.security.TokenCache;
74 import org.apache.hadoop.security.AccessControlException;
75 import org.apache.hadoop.util.StringUtils;
76 import org.apache.hadoop.util.Tool;
77 import org.apache.hadoop.util.ToolRunner;
78 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
79 
80 /**
81  * A Map-reduce program to recursively copy directories between
82  * different file-systems.
83  */
84 public class DistCpV1 implements Tool {
85   public static final Log LOG = LogFactory.getLog(DistCpV1.class);
86 
87   private static final String NAME = "distcp";
88 
89   private static final String usage = NAME
90     + " [OPTIONS] <srcurl>* <desturl>" +
91     "\n\nOPTIONS:" +
92     "\n-p[rbugpt]             Preserve status" +
93     "\n                       r: replication number" +
94     "\n                       b: block size" +
95     "\n                       u: user" +
96     "\n                       g: group" +
97     "\n                       p: permission" +
98     "\n                       t: modification and access times" +
99     "\n                       -p alone is equivalent to -prbugpt" +
100     "\n-i                     Ignore failures" +
101     "\n-basedir <basedir>     Use <basedir> as the base directory when copying files from <srcurl>" +
102     "\n-log <logdir>          Write logs to <logdir>" +
103     "\n-m <num_maps>          Maximum number of simultaneous copies" +
104     "\n-overwrite             Overwrite destination" +
105     "\n-update                Overwrite if src size different from dst size" +
106     "\n-skipcrccheck          Do not use CRC check to determine if src is " +
107     "\n                       different from dest. Relevant only if -update" +
108     "\n                       is specified" +
109     "\n-f <urilist_uri>       Use list at <urilist_uri> as src list" +
110     "\n-filelimit <n>         Limit the total number of files to be <= n" +
111     "\n-sizelimit <n>         Limit the total size to be <= n bytes" +
112     "\n-delete                Delete the files existing in the dst but not in src" +
113     "\n-dryrun                Display count of files and total size of files" +
114     "\n                        in src and then exit. Copy is not done at all." +
115     "\n                        desturl should not be speicified with out -update." +
116     "\n-mapredSslConf <f>     Filename of SSL configuration for mapper task" +
117 
118     "\n\nNOTE 1: if -overwrite or -update are set, each source URI is " +
119     "\n      interpreted as an isomorphic update to an existing directory." +
120     "\nFor example:" +
121     "\nhadoop " + NAME + " -p -update \"hdfs://A:8020/user/foo/bar\" " +
122     "\"hdfs://B:8020/user/foo/baz\"\n" +
123     "\n     would update all descendants of 'baz' also in 'bar'; it would " +
124     "\n     *not* update /user/foo/baz/bar" +
125 
126     "\n\nNOTE 2: The parameter <n> in -filelimit and -sizelimit can be " +
127     "\n     specified with symbolic representation.  For examples," +
128     "\n       1230k = 1230 * 1024 = 1259520" +
129     "\n       891g = 891 * 1024^3 = 956703965184" +
130 
131     "\n";
132 
133   private static final long BYTES_PER_MAP =  256 * 1024 * 1024;
134   private static final int MAX_MAPS_PER_NODE = 20;
135   private static final int SYNC_FILE_MAX = 10;
136   private static final int DEFAULT_FILE_RETRIES = 3;
137 
138   static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
139   static enum Options {
140     DELETE("-delete", NAME + ".delete"),
141     FILE_LIMIT("-filelimit", NAME + ".limit.file"),
142     SIZE_LIMIT("-sizelimit", NAME + ".limit.size"),
143     IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
144     PRESERVE_STATUS("-p", NAME + ".preserve.status"),
145     OVERWRITE("-overwrite", NAME + ".overwrite.always"),
146     UPDATE("-update", NAME + ".overwrite.ifnewer"),
147     SKIPCRC("-skipcrccheck", NAME + ".skip.crc.check");
148 
149     final String cmd, propertyname;
150 
Options(String cmd, String propertyname)151     private Options(String cmd, String propertyname) {
152       this.cmd = cmd;
153       this.propertyname = propertyname;
154     }
155 
parseLong(String[] args, int offset)156     private long parseLong(String[] args, int offset) {
157       if (offset ==  args.length) {
158         throw new IllegalArgumentException("<n> not specified in " + cmd);
159       }
160       long n = StringUtils.TraditionalBinaryPrefix.string2long(args[offset]);
161       if (n <= 0) {
162         throw new IllegalArgumentException("n = " + n + " <= 0 in " + cmd);
163       }
164       return n;
165     }
166   }
167   static enum FileAttribute {
168     BLOCK_SIZE, REPLICATION, USER, GROUP, PERMISSION, TIMES;
169 
170     final char symbol;
171 
FileAttribute()172     private FileAttribute() {
173       symbol = StringUtils.toLowerCase(toString()).charAt(0);
174     }
175 
parse(String s)176     static EnumSet<FileAttribute> parse(String s) {
177       if (s == null || s.length() == 0) {
178         return EnumSet.allOf(FileAttribute.class);
179       }
180 
181       EnumSet<FileAttribute> set = EnumSet.noneOf(FileAttribute.class);
182       FileAttribute[] attributes = values();
183       for(char c : s.toCharArray()) {
184         int i = 0;
185         for(; i < attributes.length && c != attributes[i].symbol; i++);
186         if (i < attributes.length) {
187           if (!set.contains(attributes[i])) {
188             set.add(attributes[i]);
189           } else {
190             throw new IllegalArgumentException("There are more than one '"
191                 + attributes[i].symbol + "' in " + s);
192           }
193         } else {
194           throw new IllegalArgumentException("'" + c + "' in " + s
195               + " is undefined.");
196         }
197       }
198       return set;
199     }
200   }
201 
202   static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
203   static final String DST_DIR_LABEL = NAME + ".dest.path";
204   static final String JOB_DIR_LABEL = NAME + ".job.dir";
205   static final String MAX_MAPS_LABEL = NAME + ".max.map.tasks";
206   static final String SRC_LIST_LABEL = NAME + ".src.list";
207   static final String SRC_COUNT_LABEL = NAME + ".src.count";
208   static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
209   static final String DST_DIR_LIST_LABEL = NAME + ".dst.dir.list";
210   static final String BYTES_PER_MAP_LABEL = NAME + ".bytes.per.map";
211   static final String PRESERVE_STATUS_LABEL
212       = Options.PRESERVE_STATUS.propertyname + ".value";
213   static final String FILE_RETRIES_LABEL = NAME + ".file.retries";
214 
215   private JobConf conf;
216 
setConf(Configuration conf)217   public void setConf(Configuration conf) {
218     if (conf instanceof JobConf) {
219       this.conf = (JobConf) conf;
220     } else {
221       this.conf = new JobConf(conf);
222     }
223   }
224 
getConf()225   public Configuration getConf() {
226     return conf;
227   }
228 
DistCpV1(Configuration conf)229   public DistCpV1(Configuration conf) {
230     setConf(conf);
231   }
232 
233   /**
234    * An input/output pair of filenames.
235    */
236   static class FilePair implements Writable {
237     FileStatus input = new FileStatus();
238     String output;
FilePair()239     FilePair() { }
FilePair(FileStatus input, String output)240     FilePair(FileStatus input, String output) {
241       this.input = input;
242       this.output = output;
243     }
readFields(DataInput in)244     public void readFields(DataInput in) throws IOException {
245       input.readFields(in);
246       output = Text.readString(in);
247     }
write(DataOutput out)248     public void write(DataOutput out) throws IOException {
249       input.write(out);
250       Text.writeString(out, output);
251     }
toString()252     public String toString() {
253       return input + " : " + output;
254     }
255   }
256 
257   /**
258    * InputFormat of a distcp job responsible for generating splits of the src
259    * file list.
260    */
261   static class CopyInputFormat implements InputFormat<Text, Text> {
262 
263     /**
264      * Produce splits such that each is no greater than the quotient of the
265      * total size and the number of splits requested.
266      * @param job The handle to the JobConf object
267      * @param numSplits Number of splits requested
268      */
getSplits(JobConf job, int numSplits)269     public InputSplit[] getSplits(JobConf job, int numSplits)
270         throws IOException {
271       int cnfiles = job.getInt(SRC_COUNT_LABEL, -1);
272       long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1);
273       String srcfilelist = job.get(SRC_LIST_LABEL, "");
274       if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) {
275         throw new RuntimeException("Invalid metadata: #files(" + cnfiles +
276                                    ") total_size(" + cbsize + ") listuri(" +
277                                    srcfilelist + ")");
278       }
279       Path src = new Path(srcfilelist);
280       FileSystem fs = src.getFileSystem(job);
281       FileStatus srcst = fs.getFileStatus(src);
282 
283       ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
284       LongWritable key = new LongWritable();
285       FilePair value = new FilePair();
286       final long targetsize = cbsize / numSplits;
287       long pos = 0L;
288       long last = 0L;
289       long acc = 0L;
290       long cbrem = srcst.getLen();
291       try (SequenceFile.Reader sl =
292           new SequenceFile.Reader(job, Reader.file(src))) {
293         for (; sl.next(key, value); last = sl.getPosition()) {
294           // if adding this split would put this split past the target size,
295           // cut the last split and put this next file in the next split.
296           if (acc + key.get() > targetsize && acc != 0) {
297             long splitsize = last - pos;
298             splits.add(new FileSplit(src, pos, splitsize, (String[])null));
299             cbrem -= splitsize;
300             pos = last;
301             acc = 0L;
302           }
303           acc += key.get();
304         }
305       }
306       if (cbrem != 0) {
307         splits.add(new FileSplit(src, pos, cbrem, (String[])null));
308       }
309 
310       return splits.toArray(new FileSplit[splits.size()]);
311     }
312 
313     /**
314      * Returns a reader for this split of the src file list.
315      */
getRecordReader(InputSplit split, JobConf job, Reporter reporter)316     public RecordReader<Text, Text> getRecordReader(InputSplit split,
317         JobConf job, Reporter reporter) throws IOException {
318       return new SequenceFileRecordReader<Text, Text>(job, (FileSplit)split);
319     }
320   }
321 
322   /**
323    * FSCopyFilesMapper: The mapper for copying files between FileSystems.
324    */
325   static class CopyFilesMapper
326       implements Mapper<LongWritable, FilePair, WritableComparable<?>, Text> {
327     // config
328     private int sizeBuf = 128 * 1024;
329     private FileSystem destFileSys = null;
330     private boolean ignoreReadFailures;
331     private boolean preserve_status;
332     private EnumSet<FileAttribute> preseved;
333     private boolean overwrite;
334     private boolean update;
335     private Path destPath = null;
336     private byte[] buffer = null;
337     private JobConf job;
338     private boolean skipCRCCheck = false;
339 
340     // stats
341     private int failcount = 0;
342     private int skipcount = 0;
343     private int copycount = 0;
344 
getCountString()345     private String getCountString() {
346       return "Copied: " + copycount + " Skipped: " + skipcount
347           + " Failed: " + failcount;
348     }
updateStatus(Reporter reporter)349     private void updateStatus(Reporter reporter) {
350       reporter.setStatus(getCountString());
351     }
352 
353     /**
354      * Return true if dst should be replaced by src and the update flag is set.
355      * Right now, this merely checks that the src and dst len are not equal.
356      * This should be improved on once modification times, CRCs, etc. can
357      * be meaningful in this context.
358      * @throws IOException
359      */
needsUpdate(FileStatus srcstatus, FileSystem dstfs, Path dstpath)360     private boolean needsUpdate(FileStatus srcstatus,
361         FileSystem dstfs, Path dstpath) throws IOException {
362       return update && !sameFile(srcstatus.getPath().getFileSystem(job),
363           srcstatus, dstfs, dstpath, skipCRCCheck);
364     }
365 
create(Path f, Reporter reporter, FileStatus srcstat)366     private FSDataOutputStream create(Path f, Reporter reporter,
367         FileStatus srcstat) throws IOException {
368       if (destFileSys.exists(f)) {
369         destFileSys.delete(f, false);
370       }
371       if (!preserve_status) {
372         return destFileSys.create(f, true, sizeBuf, reporter);
373       }
374 
375       FsPermission permission = preseved.contains(FileAttribute.PERMISSION)?
376           srcstat.getPermission(): null;
377       short replication = preseved.contains(FileAttribute.REPLICATION)?
378           srcstat.getReplication(): destFileSys.getDefaultReplication(f);
379       long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)?
380           srcstat.getBlockSize(): destFileSys.getDefaultBlockSize(f);
381       return destFileSys.create(f, permission, true, sizeBuf, replication,
382           blockSize, reporter);
383     }
384 
385     /**
386      * Validates copy by checking the sizes of files first and then
387      * checksums, if the filesystems support checksums.
388      * @param srcstat src path and metadata
389      * @param absdst dst path
390      * @return true if src & destination files are same
391      */
validateCopy(FileStatus srcstat, Path absdst)392     private boolean validateCopy(FileStatus srcstat, Path absdst)
393             throws IOException {
394       if (destFileSys.exists(absdst)) {
395         if (sameFile(srcstat.getPath().getFileSystem(job), srcstat,
396             destFileSys, absdst, skipCRCCheck)) {
397           return true;
398         }
399       }
400       return false;
401     }
402 
403     /**
404      * Increment number of files copied and bytes copied and then report status
405      */
updateCopyStatus(FileStatus srcstat, Reporter reporter)406     void updateCopyStatus(FileStatus srcstat, Reporter reporter) {
407       copycount++;
408       reporter.incrCounter(Counter.BYTESCOPIED, srcstat.getLen());
409       reporter.incrCounter(Counter.COPY, 1);
410       updateStatus(reporter);
411     }
412 
413     /**
414      * Skip copying this file if already exists at the destination.
415      * Updates counters and copy status if skipping this file.
416      * @return true    if copy of this file can be skipped
417      */
skipCopyFile(FileStatus srcstat, Path absdst, OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter)418     private boolean skipCopyFile(FileStatus srcstat, Path absdst,
419                             OutputCollector<WritableComparable<?>, Text> outc,
420                             Reporter reporter) throws IOException {
421       if (destFileSys.exists(absdst) && !overwrite
422           && !needsUpdate(srcstat, destFileSys, absdst)) {
423         outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
424         ++skipcount;
425         reporter.incrCounter(Counter.SKIP, 1);
426         updateStatus(reporter);
427         return true;
428       }
429       return false;
430     }
431 
432     /**
433      * Copies single file to the path specified by tmpfile.
434      * @param srcstat  src path and metadata
435      * @param tmpfile  temporary file to which copy is to be done
436      * @param absdst   actual destination path to which copy is to be done
437      * @param reporter
438      * @return Number of bytes copied
439      */
doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst, Reporter reporter)440     private long doCopyFile(FileStatus srcstat, Path tmpfile, Path absdst,
441                             Reporter reporter) throws IOException {
442       long bytesCopied = 0L;
443       Path srcPath = srcstat.getPath();
444       // open src file
445       try (FSDataInputStream in = srcPath.getFileSystem(job).open(srcPath)) {
446         reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
447         // open tmp file
448         try (FSDataOutputStream out = create(tmpfile, reporter, srcstat)) {
449           LOG.info("Copying file " + srcPath + " of size " +
450                    srcstat.getLen() + " bytes...");
451 
452           // copy file
453           for(int bytesRead; (bytesRead = in.read(buffer)) >= 0; ) {
454             out.write(buffer, 0, bytesRead);
455             bytesCopied += bytesRead;
456             reporter.setStatus(
457                 String.format("%.2f ", bytesCopied*100.0/srcstat.getLen())
458                 + absdst + " [ " +
459                 TraditionalBinaryPrefix.long2String(bytesCopied, "", 1) + " / "
460                 + TraditionalBinaryPrefix.long2String(srcstat.getLen(), "", 1)
461                 + " ]");
462           }
463         }
464       }
465       return bytesCopied;
466     }
467 
468     /**
469      * Copy a file to a destination.
470      * @param srcstat src path and metadata
471      * @param relativedst relative dst path
472      * @param outc Log of skipped files
473      * @param reporter
474      * @throws IOException if copy fails(even if the validation of copy fails)
475      */
copy(FileStatus srcstat, Path relativedst, OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter)476     private void copy(FileStatus srcstat, Path relativedst,
477         OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter)
478         throws IOException {
479       Path absdst = new Path(destPath, relativedst);
480       int totfiles = job.getInt(SRC_COUNT_LABEL, -1);
481       assert totfiles >= 0 : "Invalid file count " + totfiles;
482 
483       if (totfiles == 1) {
484         // Copying a single file; use dst path provided by user as
485         // destination file rather than destination directory
486         Path dstparent = absdst.getParent();
487         if (!(destFileSys.exists(dstparent) &&
488               destFileSys.getFileStatus(dstparent).isDirectory())) {
489           absdst = dstparent;
490         }
491       }
492 
493       // if a directory, ensure created even if empty
494       if (srcstat.isDirectory()) {
495         if (destFileSys.exists(absdst)) {
496           if (destFileSys.getFileStatus(absdst).isFile()) {
497             throw new IOException("Failed to mkdirs: " + absdst+" is a file.");
498           }
499         }
500         else if (!destFileSys.mkdirs(absdst)) {
501           throw new IOException("Failed to mkdirs " + absdst);
502         }
503         // TODO: when modification times can be set, directories should be
504         // emitted to reducers so they might be preserved. Also, mkdirs does
505         // not currently return an error when the directory already exists;
506         // if this changes, all directory work might as well be done in reduce
507         return;
508       }
509 
510       // Can we skip copying this file ?
511       if (skipCopyFile(srcstat, absdst, outc, reporter)) {
512         return;
513       }
514 
515       Path tmpfile = new Path(job.get(TMP_DIR_LABEL), relativedst);
516       // do the actual copy to tmpfile
517       long bytesCopied = doCopyFile(srcstat, tmpfile, absdst, reporter);
518 
519       if (bytesCopied != srcstat.getLen()) {
520         throw new IOException("File size not matched: copied "
521             + bytesString(bytesCopied) + " to tmpfile (=" + tmpfile
522             + ") but expected " + bytesString(srcstat.getLen())
523             + " from " + srcstat.getPath());
524       }
525       else {
526         if (destFileSys.exists(absdst) &&
527             destFileSys.getFileStatus(absdst).isDirectory()) {
528           throw new IOException(absdst + " is a directory");
529         }
530         if (!destFileSys.mkdirs(absdst.getParent())) {
531           throw new IOException("Failed to create parent dir: " + absdst.getParent());
532         }
533         rename(tmpfile, absdst);
534 
535         if (!validateCopy(srcstat, absdst)) {
536           destFileSys.delete(absdst, false);
537           throw new IOException("Validation of copy of file "
538               + srcstat.getPath() + " failed.");
539         }
540         updateDestStatus(srcstat, destFileSys.getFileStatus(absdst));
541       }
542 
543       // report at least once for each file
544       updateCopyStatus(srcstat, reporter);
545     }
546 
547     /** rename tmp to dst, delete dst if already exists */
rename(Path tmp, Path dst)548     private void rename(Path tmp, Path dst) throws IOException {
549       try {
550         if (destFileSys.exists(dst)) {
551           destFileSys.delete(dst, true);
552         }
553         if (!destFileSys.rename(tmp, dst)) {
554           throw new IOException();
555         }
556       }
557       catch(IOException cause) {
558         throw (IOException)new IOException("Fail to rename tmp file (=" + tmp
559             + ") to destination file (=" + dst + ")").initCause(cause);
560       }
561     }
562 
updateDestStatus(FileStatus src, FileStatus dst )563     private void updateDestStatus(FileStatus src, FileStatus dst
564         ) throws IOException {
565       if (preserve_status) {
566         DistCpV1.updateDestStatus(src, dst, preseved, destFileSys);
567       }
568     }
569 
bytesString(long b)570     static String bytesString(long b) {
571       return b + " bytes (" +
572           TraditionalBinaryPrefix.long2String(b, "", 1) + ")";
573     }
574 
575     /**
576      * Copies a file and validates the copy by checking the checksums.
577      * If validation fails, retries (max number of tries is distcp.file.retries)
578      * to copy the file.
579      */
copyWithRetries(FileStatus srcstat, Path relativedst, OutputCollector<WritableComparable<?>, Text> out, Reporter reporter)580     void copyWithRetries(FileStatus srcstat, Path relativedst,
581                          OutputCollector<WritableComparable<?>, Text> out,
582                          Reporter reporter) throws IOException {
583 
584       // max tries to copy when validation of copy fails
585       final int maxRetries = job.getInt(FILE_RETRIES_LABEL, DEFAULT_FILE_RETRIES);
586       // save update flag for later copies within the same map task
587       final boolean saveUpdate = update;
588 
589       int retryCnt = 1;
590       for (; retryCnt <= maxRetries; retryCnt++) {
591         try {
592           //copy the file and validate copy
593           copy(srcstat, relativedst, out, reporter);
594           break;// copy successful
595         } catch (IOException e) {
596           LOG.warn("Copy of " + srcstat.getPath() + " failed.", e);
597           if (retryCnt < maxRetries) {// copy failed and need to retry
598             LOG.info("Retrying copy of file " + srcstat.getPath());
599             update = true; // set update flag for retries
600           }
601           else {// no more retries... Give up
602             update = saveUpdate;
603             throw new IOException("Copy of file failed even with " + retryCnt
604                                   + " tries.", e);
605           }
606         }
607       }
608     }
609 
610     /** Mapper configuration.
611      * Extracts source and destination file system, as well as
612      * top-level paths on source and destination directories.
613      * Gets the named file systems, to be used later in map.
614      */
configure(JobConf job)615     public void configure(JobConf job)
616     {
617       destPath = new Path(job.get(DST_DIR_LABEL, "/"));
618       try {
619         destFileSys = destPath.getFileSystem(job);
620       } catch (IOException ex) {
621         throw new RuntimeException("Unable to get the named file system.", ex);
622       }
623       sizeBuf = job.getInt("copy.buf.size", 128 * 1024);
624       buffer = new byte[sizeBuf];
625       ignoreReadFailures = job.getBoolean(Options.IGNORE_READ_FAILURES.propertyname, false);
626       preserve_status = job.getBoolean(Options.PRESERVE_STATUS.propertyname, false);
627       if (preserve_status) {
628         preseved = FileAttribute.parse(job.get(PRESERVE_STATUS_LABEL));
629       }
630       update = job.getBoolean(Options.UPDATE.propertyname, false);
631       overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname, false);
632       skipCRCCheck = job.getBoolean(Options.SKIPCRC.propertyname, false);
633       this.job = job;
634     }
635 
636     /** Map method. Copies one file from source file system to destination.
637      * @param key src len
638      * @param value FilePair (FileStatus src, Path dst)
639      * @param out Log of failed copies
640      * @param reporter
641      */
map(LongWritable key, FilePair value, OutputCollector<WritableComparable<?>, Text> out, Reporter reporter)642     public void map(LongWritable key,
643                     FilePair value,
644                     OutputCollector<WritableComparable<?>, Text> out,
645                     Reporter reporter) throws IOException {
646       final FileStatus srcstat = value.input;
647       final Path relativedst = new Path(value.output);
648       try {
649         copyWithRetries(srcstat, relativedst, out, reporter);
650       } catch (IOException e) {
651         ++failcount;
652         reporter.incrCounter(Counter.FAIL, 1);
653         updateStatus(reporter);
654         final String sfailure = "FAIL " + relativedst + " : " +
655                           StringUtils.stringifyException(e);
656         out.collect(null, new Text(sfailure));
657         LOG.info(sfailure);
658         if (e instanceof FileNotFoundException) {
659           final String s = "Possible Cause for failure: Either the filesystem "
660                            + srcstat.getPath().getFileSystem(job)
661                            + " is not accessible or the file is deleted";
662           LOG.error(s);
663           out.collect(null, new Text(s));
664         }
665 
666         try {
667           for (int i = 0; i < 3; ++i) {
668             try {
669               final Path tmp = new Path(job.get(TMP_DIR_LABEL), relativedst);
670               if (destFileSys.delete(tmp, true))
671                 break;
672             } catch (Throwable ex) {
673               // ignore, we are just cleaning up
674               LOG.debug("Ignoring cleanup exception", ex);
675             }
676             // update status, so we don't get timed out
677             updateStatus(reporter);
678             Thread.sleep(3 * 1000);
679           }
680         } catch (InterruptedException inte) {
681           throw (IOException)new IOException().initCause(inte);
682         }
683       } finally {
684         updateStatus(reporter);
685       }
686     }
687 
close()688     public void close() throws IOException {
689       if (0 == failcount || ignoreReadFailures) {
690         return;
691       }
692       throw new IOException(getCountString());
693     }
694   }
695 
fetchFileList(Configuration conf, Path srcList)696   private static List<Path> fetchFileList(Configuration conf, Path srcList)
697       throws IOException {
698     List<Path> result = new ArrayList<Path>();
699     FileSystem fs = srcList.getFileSystem(conf);
700     try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.open(srcList),
701             Charset.forName("UTF-8")))) {
702       String line = input.readLine();
703       while (line != null) {
704         result.add(new Path(line));
705         line = input.readLine();
706       }
707     }
708     return result;
709   }
710 
711   @Deprecated
copy(Configuration conf, String srcPath, String destPath, Path logPath, boolean srcAsList, boolean ignoreReadFailures)712   public static void copy(Configuration conf, String srcPath,
713                           String destPath, Path logPath,
714                           boolean srcAsList, boolean ignoreReadFailures)
715       throws IOException {
716     final Path src = new Path(srcPath);
717     List<Path> tmp = new ArrayList<Path>();
718     if (srcAsList) {
719       tmp.addAll(fetchFileList(conf, src));
720     } else {
721       tmp.add(src);
722     }
723     EnumSet<Options> flags = ignoreReadFailures
724       ? EnumSet.of(Options.IGNORE_READ_FAILURES)
725       : EnumSet.noneOf(Options.class);
726 
727     final Path dst = new Path(destPath);
728     copy(conf, new Arguments(tmp, null, dst, logPath, flags, null,
729         Long.MAX_VALUE, Long.MAX_VALUE, null, false));
730   }
731 
732   /** Sanity check for srcPath */
checkSrcPath(JobConf jobConf, List<Path> srcPaths)733   private static void checkSrcPath(JobConf jobConf, List<Path> srcPaths)
734   throws IOException {
735     List<IOException> rslt = new ArrayList<IOException>();
736     List<Path> unglobbed = new LinkedList<Path>();
737 
738     Path[] ps = new Path[srcPaths.size()];
739     ps = srcPaths.toArray(ps);
740     TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf);
741 
742 
743     for (Path p : srcPaths) {
744       FileSystem fs = p.getFileSystem(jobConf);
745       FileStatus[] inputs = fs.globStatus(p);
746 
747       if(inputs != null && inputs.length > 0) {
748         for (FileStatus onePath: inputs) {
749           unglobbed.add(onePath.getPath());
750         }
751       } else {
752         rslt.add(new IOException("Input source " + p + " does not exist."));
753       }
754     }
755     if (!rslt.isEmpty()) {
756       throw new InvalidInputException(rslt);
757     }
758     srcPaths.clear();
759     srcPaths.addAll(unglobbed);
760   }
761 
762   /**
763    * Driver to copy srcPath to destPath depending on required protocol.
764    * @param conf configuration
765    * @param args arguments
766    */
copy(final Configuration conf, final Arguments args )767   static void copy(final Configuration conf, final Arguments args
768       ) throws IOException {
769     LOG.info("srcPaths=" + args.srcs);
770     if (!args.dryrun || args.flags.contains(Options.UPDATE)) {
771       LOG.info("destPath=" + args.dst);
772     }
773 
774     JobConf job = createJobConf(conf);
775 
776     checkSrcPath(job, args.srcs);
777     if (args.preservedAttributes != null) {
778       job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
779     }
780     if (args.mapredSslConf != null) {
781       job.set("dfs.https.client.keystore.resource", args.mapredSslConf);
782     }
783 
784     //Initialize the mapper
785     try {
786       if (setup(conf, job, args)) {
787         JobClient.runJob(job);
788       }
789       if(!args.dryrun) {
790         finalize(conf, job, args.dst, args.preservedAttributes);
791       }
792     } finally {
793       if (!args.dryrun) {
794         //delete tmp
795         fullyDelete(job.get(TMP_DIR_LABEL), job);
796       }
797       //delete jobDirectory
798       fullyDelete(job.get(JOB_DIR_LABEL), job);
799     }
800   }
801 
updateDestStatus(FileStatus src, FileStatus dst, EnumSet<FileAttribute> preseved, FileSystem destFileSys )802   private static void updateDestStatus(FileStatus src, FileStatus dst,
803       EnumSet<FileAttribute> preseved, FileSystem destFileSys
804       ) throws IOException {
805     String owner = null;
806     String group = null;
807     if (preseved.contains(FileAttribute.USER)
808         && !src.getOwner().equals(dst.getOwner())) {
809       owner = src.getOwner();
810     }
811     if (preseved.contains(FileAttribute.GROUP)
812         && !src.getGroup().equals(dst.getGroup())) {
813       group = src.getGroup();
814     }
815     if (owner != null || group != null) {
816       destFileSys.setOwner(dst.getPath(), owner, group);
817     }
818     if (preseved.contains(FileAttribute.PERMISSION)
819         && !src.getPermission().equals(dst.getPermission())) {
820       destFileSys.setPermission(dst.getPath(), src.getPermission());
821     }
822     if (preseved.contains(FileAttribute.TIMES)) {
823       destFileSys.setTimes(dst.getPath(), src.getModificationTime(), src.getAccessTime());
824     }
825   }
826 
finalize(Configuration conf, JobConf jobconf, final Path destPath, String presevedAttributes)827   static private void finalize(Configuration conf, JobConf jobconf,
828       final Path destPath, String presevedAttributes) throws IOException {
829     if (presevedAttributes == null) {
830       return;
831     }
832     EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
833     if (!preseved.contains(FileAttribute.USER)
834         && !preseved.contains(FileAttribute.GROUP)
835         && !preseved.contains(FileAttribute.PERMISSION)) {
836       return;
837     }
838 
839     FileSystem dstfs = destPath.getFileSystem(conf);
840     Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
841     try (SequenceFile.Reader in =
842         new SequenceFile.Reader(jobconf, Reader.file(dstdirlist))) {
843       Text dsttext = new Text();
844       FilePair pair = new FilePair();
845       for(; in.next(dsttext, pair); ) {
846         Path absdst = new Path(destPath, pair.output);
847         updateDestStatus(pair.input, dstfs.getFileStatus(absdst),
848             preseved, dstfs);
849       }
850     }
851   }
852 
853   static class Arguments {
854     final List<Path> srcs;
855     final Path basedir;
856     final Path dst;
857     final Path log;
858     final EnumSet<Options> flags;
859     final String preservedAttributes;
860     final long filelimit;
861     final long sizelimit;
862     final String mapredSslConf;
863     final boolean dryrun;
864 
865     /**
866      * Arguments for distcp
867      * @param srcs List of source paths
868      * @param basedir Base directory for copy
869      * @param dst Destination path
870      * @param log Log output directory
871      * @param flags Command-line flags
872      * @param preservedAttributes Preserved attributes
873      * @param filelimit File limit
874      * @param sizelimit Size limit
875      * @param mapredSslConf ssl configuration
876      * @param dryrun
877      */
Arguments(List<Path> srcs, Path basedir, Path dst, Path log, EnumSet<Options> flags, String preservedAttributes, long filelimit, long sizelimit, String mapredSslConf, boolean dryrun)878     Arguments(List<Path> srcs, Path basedir, Path dst, Path log,
879         EnumSet<Options> flags, String preservedAttributes,
880         long filelimit, long sizelimit, String mapredSslConf,
881         boolean dryrun) {
882       this.srcs = srcs;
883       this.basedir = basedir;
884       this.dst = dst;
885       this.log = log;
886       this.flags = flags;
887       this.preservedAttributes = preservedAttributes;
888       this.filelimit = filelimit;
889       this.sizelimit = sizelimit;
890       this.mapredSslConf = mapredSslConf;
891       this.dryrun = dryrun;
892 
893       if (LOG.isTraceEnabled()) {
894         LOG.trace("this = " + this);
895       }
896     }
897 
valueOf(String[] args, Configuration conf )898     static Arguments valueOf(String[] args, Configuration conf
899         ) throws IOException {
900       List<Path> srcs = new ArrayList<Path>();
901       Path dst = null;
902       Path log = null;
903       Path basedir = null;
904       EnumSet<Options> flags = EnumSet.noneOf(Options.class);
905       String presevedAttributes = null;
906       String mapredSslConf = null;
907       long filelimit = Long.MAX_VALUE;
908       long sizelimit = Long.MAX_VALUE;
909       boolean dryrun = false;
910 
911       for (int idx = 0; idx < args.length; idx++) {
912         Options[] opt = Options.values();
913         int i = 0;
914         for(; i < opt.length && !args[idx].startsWith(opt[i].cmd); i++);
915 
916         if (i < opt.length) {
917           flags.add(opt[i]);
918           if (opt[i] == Options.PRESERVE_STATUS) {
919             presevedAttributes =  args[idx].substring(2);
920             FileAttribute.parse(presevedAttributes); //validation
921           }
922           else if (opt[i] == Options.FILE_LIMIT) {
923             filelimit = Options.FILE_LIMIT.parseLong(args, ++idx);
924           }
925           else if (opt[i] == Options.SIZE_LIMIT) {
926             sizelimit = Options.SIZE_LIMIT.parseLong(args, ++idx);
927           }
928         } else if ("-f".equals(args[idx])) {
929           if (++idx ==  args.length) {
930             throw new IllegalArgumentException("urilist_uri not specified in -f");
931           }
932           srcs.addAll(fetchFileList(conf, new Path(args[idx])));
933         } else if ("-log".equals(args[idx])) {
934           if (++idx ==  args.length) {
935             throw new IllegalArgumentException("logdir not specified in -log");
936           }
937           log = new Path(args[idx]);
938         } else if ("-basedir".equals(args[idx])) {
939           if (++idx ==  args.length) {
940             throw new IllegalArgumentException("basedir not specified in -basedir");
941           }
942           basedir = new Path(args[idx]);
943         } else if ("-mapredSslConf".equals(args[idx])) {
944           if (++idx ==  args.length) {
945             throw new IllegalArgumentException("ssl conf file not specified in -mapredSslConf");
946           }
947           mapredSslConf = args[idx];
948         } else if ("-dryrun".equals(args[idx])) {
949           dryrun = true;
950           dst = new Path("/tmp/distcp_dummy_dest");//dummy destination
951         } else if ("-m".equals(args[idx])) {
952           if (++idx == args.length) {
953             throw new IllegalArgumentException("num_maps not specified in -m");
954           }
955           try {
956             conf.setInt(MAX_MAPS_LABEL, Integer.parseInt(args[idx]));
957           } catch (NumberFormatException e) {
958             throw new IllegalArgumentException("Invalid argument to -m: " +
959                                                args[idx]);
960           }
961         } else if ('-' == args[idx].codePointAt(0)) {
962           throw new IllegalArgumentException("Invalid switch " + args[idx]);
963         } else if (idx == args.length -1 &&
964                    (!dryrun || flags.contains(Options.UPDATE))) {
965           dst = new Path(args[idx]);
966         } else {
967           srcs.add(new Path(args[idx]));
968         }
969       }
970       // mandatory command-line parameters
971       if (srcs.isEmpty() || dst == null) {
972         throw new IllegalArgumentException("Missing "
973             + (dst == null ? "dst path" : "src"));
974       }
975       // incompatible command-line flags
976       final boolean isOverwrite = flags.contains(Options.OVERWRITE);
977       final boolean isUpdate = flags.contains(Options.UPDATE);
978       final boolean isDelete = flags.contains(Options.DELETE);
979       final boolean skipCRC = flags.contains(Options.SKIPCRC);
980       if (isOverwrite && isUpdate) {
981         throw new IllegalArgumentException("Conflicting overwrite policies");
982       }
983       if (!isUpdate && skipCRC) {
984         throw new IllegalArgumentException(
985             Options.SKIPCRC.cmd + " is relevant only with the " +
986             Options.UPDATE.cmd + " option");
987       }
988       if (isDelete && !isOverwrite && !isUpdate) {
989         throw new IllegalArgumentException(Options.DELETE.cmd
990             + " must be specified with " + Options.OVERWRITE + " or "
991             + Options.UPDATE + ".");
992       }
993       return new Arguments(srcs, basedir, dst, log, flags, presevedAttributes,
994           filelimit, sizelimit, mapredSslConf, dryrun);
995     }
996 
997     /** {@inheritDoc} */
toString()998     public String toString() {
999       return getClass().getName() + "{"
1000           + "\n  srcs = " + srcs
1001           + "\n  dst = " + dst
1002           + "\n  log = " + log
1003           + "\n  flags = " + flags
1004           + "\n  preservedAttributes = " + preservedAttributes
1005           + "\n  filelimit = " + filelimit
1006           + "\n  sizelimit = " + sizelimit
1007           + "\n  mapredSslConf = " + mapredSslConf
1008           + "\n}";
1009     }
1010   }
1011 
1012   /**
1013    * This is the main driver for recursively copying directories
1014    * across file systems. It takes at least two cmdline parameters. A source
1015    * URL and a destination URL. It then essentially does an "ls -lR" on the
1016    * source URL, and writes the output in a round-robin manner to all the map
1017    * input files. The mapper actually copies the files allotted to it. The
1018    * reduce is empty.
1019    */
run(String[] args)1020   public int run(String[] args) {
1021     try {
1022       copy(conf, Arguments.valueOf(args, conf));
1023       return 0;
1024     } catch (IllegalArgumentException e) {
1025       System.err.println(StringUtils.stringifyException(e) + "\n" + usage);
1026       ToolRunner.printGenericCommandUsage(System.err);
1027       return -1;
1028     } catch (DuplicationException e) {
1029       System.err.println(StringUtils.stringifyException(e));
1030       return DuplicationException.ERROR_CODE;
1031     } catch (RemoteException e) {
1032       final IOException unwrapped = e.unwrapRemoteException(
1033           FileNotFoundException.class,
1034           AccessControlException.class,
1035           QuotaExceededException.class);
1036       System.err.println(StringUtils.stringifyException(unwrapped));
1037       return -3;
1038     } catch (Exception e) {
1039       System.err.println("With failures, global counters are inaccurate; " +
1040           "consider running with -i");
1041       System.err.println("Copy failed: " + StringUtils.stringifyException(e));
1042       return -999;
1043     }
1044   }
1045 
main(String[] args)1046   public static void main(String[] args) throws Exception {
1047     JobConf job = new JobConf(DistCpV1.class);
1048     DistCpV1 distcp = new DistCpV1(job);
1049     int res = ToolRunner.run(distcp, args);
1050     System.exit(res);
1051   }
1052 
1053   /**
1054    * Make a path relative with respect to a root path.
1055    * absPath is always assumed to descend from root.
1056    * Otherwise returned path is null.
1057    */
makeRelative(Path root, Path absPath)1058   static String makeRelative(Path root, Path absPath) {
1059     if (!absPath.isAbsolute()) {
1060       throw new IllegalArgumentException("!absPath.isAbsolute(), absPath="
1061           + absPath);
1062     }
1063     String p = absPath.toUri().getPath();
1064 
1065     StringTokenizer pathTokens = new StringTokenizer(p, "/");
1066     for(StringTokenizer rootTokens = new StringTokenizer(
1067         root.toUri().getPath(), "/"); rootTokens.hasMoreTokens(); ) {
1068       if (!rootTokens.nextToken().equals(pathTokens.nextToken())) {
1069         return null;
1070       }
1071     }
1072     StringBuilder sb = new StringBuilder();
1073     for(; pathTokens.hasMoreTokens(); ) {
1074       sb.append(pathTokens.nextToken());
1075       if (pathTokens.hasMoreTokens()) { sb.append(Path.SEPARATOR); }
1076     }
1077     return sb.length() == 0? ".": sb.toString();
1078   }
1079 
1080   /**
1081    * Calculate how many maps to run.
1082    * Number of maps is bounded by a minimum of the cumulative size of the
1083    * copy / (distcp.bytes.per.map, default BYTES_PER_MAP or -m on the
1084    * command line) and at most (distcp.max.map.tasks, default
1085    * MAX_MAPS_PER_NODE * nodes in the cluster).
1086    * @param totalBytes Count of total bytes for job
1087    * @param job The job to configure
1088    * @return Count of maps to run.
1089    */
setMapCount(long totalBytes, JobConf job)1090   private static int setMapCount(long totalBytes, JobConf job)
1091       throws IOException {
1092     int numMaps =
1093       (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP));
1094     numMaps = Math.min(numMaps,
1095         job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE *
1096           new JobClient(job).getClusterStatus().getTaskTrackers()));
1097     numMaps = Math.max(numMaps, 1);
1098     job.setNumMapTasks(numMaps);
1099     return numMaps;
1100   }
1101 
1102   /** Fully delete dir */
fullyDelete(String dir, Configuration conf)1103   static void fullyDelete(String dir, Configuration conf) throws IOException {
1104     if (dir != null) {
1105       Path tmp = new Path(dir);
1106       boolean success = tmp.getFileSystem(conf).delete(tmp, true);
1107       if (!success) {
1108         LOG.warn("Could not fully delete " + tmp);
1109       }
1110     }
1111   }
1112 
1113   //Job configuration
createJobConf(Configuration conf)1114   private static JobConf createJobConf(Configuration conf) {
1115     JobConf jobconf = new JobConf(conf, DistCpV1.class);
1116     jobconf.setJobName(conf.get("mapred.job.name", NAME));
1117 
1118     // turn off speculative execution, because DFS doesn't handle
1119     // multiple writers to the same file.
1120     jobconf.setMapSpeculativeExecution(false);
1121 
1122     jobconf.setInputFormat(CopyInputFormat.class);
1123     jobconf.setOutputKeyClass(Text.class);
1124     jobconf.setOutputValueClass(Text.class);
1125 
1126     jobconf.setMapperClass(CopyFilesMapper.class);
1127     jobconf.setNumReduceTasks(0);
1128     return jobconf;
1129   }
1130 
1131   private static final Random RANDOM = new Random();
getRandomId()1132   public static String getRandomId() {
1133     return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
1134   }
1135 
1136   /**
1137    * Increase the replication factor of _distcp_src_files to
1138    * sqrt(min(maxMapsOnCluster, numMaps)). This is to reduce the chance of
1139    * failing of distcp because of "not having a replication of _distcp_src_files
1140    * available for reading for some maps".
1141    */
setReplication(Configuration conf, JobConf jobConf, Path srcfilelist, int numMaps)1142   private static void setReplication(Configuration conf, JobConf jobConf,
1143                          Path srcfilelist, int numMaps) throws IOException {
1144     int numMaxMaps = new JobClient(jobConf).getClusterStatus().getMaxMapTasks();
1145     short replication = (short) Math.ceil(
1146                                 Math.sqrt(Math.min(numMaxMaps, numMaps)));
1147     FileSystem fs = srcfilelist.getFileSystem(conf);
1148     FileStatus srcStatus = fs.getFileStatus(srcfilelist);
1149 
1150     if (srcStatus.getReplication() < replication) {
1151       if (!fs.setReplication(srcfilelist, replication)) {
1152         throw new IOException("Unable to increase the replication of file " +
1153                               srcfilelist);
1154       }
1155     }
1156   }
1157 
1158   /**
1159    * Does the dir already exist at destination ?
1160    * @return true   if the dir already exists at destination
1161    */
dirExists(Configuration conf, Path dst)1162   private static boolean dirExists(Configuration conf, Path dst)
1163                  throws IOException {
1164     FileSystem destFileSys = dst.getFileSystem(conf);
1165     FileStatus status = null;
1166     try {
1167       status = destFileSys.getFileStatus(dst);
1168     }catch (FileNotFoundException e) {
1169       return false;
1170     }
1171     if (status.isFile()) {
1172       throw new FileAlreadyExistsException("Not a dir: " + dst+" is a file.");
1173     }
1174     return true;
1175   }
1176 
1177   /**
1178    * Initialize DFSCopyFileMapper specific job-configuration.
1179    * @param conf : The dfs/mapred configuration.
1180    * @param jobConf : The handle to the jobConf object to be initialized.
1181    * @param args Arguments
1182    * @return true if it is necessary to launch a job.
1183    */
setup(Configuration conf, JobConf jobConf, final Arguments args)1184   static boolean setup(Configuration conf, JobConf jobConf,
1185                             final Arguments args)
1186       throws IOException {
1187     jobConf.set(DST_DIR_LABEL, args.dst.toUri().toString());
1188 
1189     //set boolean values
1190     final boolean update = args.flags.contains(Options.UPDATE);
1191     final boolean skipCRCCheck = args.flags.contains(Options.SKIPCRC);
1192     final boolean overwrite = !update && args.flags.contains(Options.OVERWRITE)
1193                               && !args.dryrun;
1194     jobConf.setBoolean(Options.UPDATE.propertyname, update);
1195     jobConf.setBoolean(Options.SKIPCRC.propertyname, skipCRCCheck);
1196     jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
1197     jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
1198         args.flags.contains(Options.IGNORE_READ_FAILURES));
1199     jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname,
1200         args.flags.contains(Options.PRESERVE_STATUS));
1201 
1202     final String randomId = getRandomId();
1203     JobClient jClient = new JobClient(jobConf);
1204     Path stagingArea;
1205     try {
1206       stagingArea =
1207         JobSubmissionFiles.getStagingDir(jClient.getClusterHandle(), conf);
1208     } catch (InterruptedException ie) {
1209       throw new IOException(ie);
1210     }
1211 
1212     Path jobDirectory = new Path(stagingArea + NAME + "_" + randomId);
1213     FsPermission mapredSysPerms =
1214       new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
1215     FileSystem.mkdirs(jClient.getFs(), jobDirectory, mapredSysPerms);
1216     jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
1217 
1218     long maxBytesPerMap = conf.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP);
1219 
1220     FileSystem dstfs = args.dst.getFileSystem(conf);
1221 
1222     // get tokens for all the required FileSystems..
1223     TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(),
1224                                         new Path[] {args.dst}, conf);
1225 
1226 
1227     boolean dstExists = dstfs.exists(args.dst);
1228     boolean dstIsDir = false;
1229     if (dstExists) {
1230       dstIsDir = dstfs.getFileStatus(args.dst).isDirectory();
1231     }
1232 
1233     // default logPath
1234     Path logPath = args.log;
1235     if (logPath == null) {
1236       String filename = "_distcp_logs_" + randomId;
1237       if (!dstExists || !dstIsDir) {
1238         Path parent = args.dst.getParent();
1239         if (null == parent) {
1240           // If dst is '/' on S3, it might not exist yet, but dst.getParent()
1241           // will return null. In this case, use '/' as its own parent to prevent
1242           // NPE errors below.
1243           parent = args.dst;
1244         }
1245         if (!dstfs.exists(parent)) {
1246           dstfs.mkdirs(parent);
1247         }
1248         logPath = new Path(parent, filename);
1249       } else {
1250         logPath = new Path(args.dst, filename);
1251       }
1252     }
1253     FileOutputFormat.setOutputPath(jobConf, logPath);
1254 
1255     // create src list, dst list
1256     FileSystem jobfs = jobDirectory.getFileSystem(jobConf);
1257 
1258     Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
1259     Path dstfilelist = new Path(jobDirectory, "_distcp_dst_files");
1260     Path dstdirlist = new Path(jobDirectory, "_distcp_dst_dirs");
1261     jobConf.set(SRC_LIST_LABEL, srcfilelist.toString());
1262     jobConf.set(DST_DIR_LIST_LABEL, dstdirlist.toString());
1263     int srcCount = 0, cnsyncf = 0, dirsyn = 0;
1264     long fileCount = 0L, dirCount = 0L, byteCount = 0L, cbsyncs = 0L,
1265          skipFileCount = 0L, skipByteCount = 0L;
1266     try (
1267         SequenceFile.Writer src_writer = SequenceFile.createWriter(jobConf,
1268             Writer.file(srcfilelist), Writer.keyClass(LongWritable.class),
1269             Writer.valueClass(FilePair.class), Writer.compression(
1270             SequenceFile.CompressionType.NONE));
1271         SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobConf,
1272             Writer.file(dstfilelist), Writer.keyClass(Text.class),
1273             Writer.valueClass(Text.class), Writer.compression(
1274             SequenceFile.CompressionType.NONE));
1275         SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobConf,
1276             Writer.file(dstdirlist), Writer.keyClass(Text.class),
1277             Writer.valueClass(FilePair.class), Writer.compression(
1278             SequenceFile.CompressionType.NONE));
1279     ) {
1280       // handle the case where the destination directory doesn't exist
1281       // and we've only a single src directory OR we're updating/overwriting
1282       // the contents of the destination directory.
1283       final boolean special =
1284         (args.srcs.size() == 1 && !dstExists) || update || overwrite;
1285 
1286       Path basedir = null;
1287       HashSet<Path> parentDirsToCopy = new HashSet<Path>();
1288       if (args.basedir != null) {
1289         FileSystem basefs = args.basedir.getFileSystem(conf);
1290         basedir = args.basedir.makeQualified(
1291             basefs.getUri(), basefs.getWorkingDirectory());
1292         if (!basefs.isDirectory(basedir)) {
1293           throw new IOException("Basedir " + basedir + " is not a directory.");
1294         }
1295       }
1296 
1297       for(Iterator<Path> srcItr = args.srcs.iterator(); srcItr.hasNext(); ) {
1298         final Path src = srcItr.next();
1299         FileSystem srcfs = src.getFileSystem(conf);
1300         FileStatus srcfilestat = srcfs.getFileStatus(src);
1301         Path root = special && srcfilestat.isDirectory()? src: src.getParent();
1302         if (dstExists && !dstIsDir &&
1303             (args.srcs.size() > 1 || srcfilestat.isDirectory())) {
1304           // destination should not be a file
1305           throw new IOException("Destination " + args.dst + " should be a dir" +
1306                                 " if multiple source paths are there OR if" +
1307                                 " the source path is a dir");
1308         }
1309 
1310         if (basedir != null) {
1311           root = basedir;
1312           Path parent = src.getParent().makeQualified(
1313               srcfs.getUri(), srcfs.getWorkingDirectory());
1314           while (parent != null && !parent.equals(basedir)) {
1315             if (!parentDirsToCopy.contains(parent)){
1316               parentDirsToCopy.add(parent);
1317               String dst = makeRelative(root, parent);
1318               FileStatus pst = srcfs.getFileStatus(parent);
1319               src_writer.append(new LongWritable(0), new FilePair(pst, dst));
1320               dst_writer.append(new Text(dst), new Text(parent.toString()));
1321               dir_writer.append(new Text(dst), new FilePair(pst, dst));
1322               if (++dirsyn > SYNC_FILE_MAX) {
1323                 dirsyn = 0;
1324                 dir_writer.sync();
1325               }
1326             }
1327             parent = parent.getParent();
1328           }
1329 
1330           if (parent == null) {
1331             throw new IOException("Basedir " + basedir +
1332                 " is not a prefix of source path " + src);
1333           }
1334         }
1335 
1336         if (srcfilestat.isDirectory()) {
1337           ++srcCount;
1338           final String dst = makeRelative(root,src);
1339           if (!update || !dirExists(conf, new Path(args.dst, dst))) {
1340             ++dirCount;
1341             src_writer.append(new LongWritable(0),
1342                               new FilePair(srcfilestat, dst));
1343           }
1344           dst_writer.append(new Text(dst), new Text(src.toString()));
1345         }
1346 
1347         Stack<FileStatus> pathstack = new Stack<FileStatus>();
1348         for(pathstack.push(srcfilestat); !pathstack.empty(); ) {
1349           FileStatus cur = pathstack.pop();
1350           FileStatus[] children = srcfs.listStatus(cur.getPath());
1351           for(int i = 0; i < children.length; i++) {
1352             boolean skipPath = false;
1353             final FileStatus child = children[i];
1354             final String dst = makeRelative(root, child.getPath());
1355             ++srcCount;
1356 
1357             if (child.isDirectory()) {
1358               pathstack.push(child);
1359               if (!update || !dirExists(conf, new Path(args.dst, dst))) {
1360                 ++dirCount;
1361               }
1362               else {
1363                 skipPath = true; // skip creating dir at destination
1364               }
1365             }
1366             else {
1367               Path destPath = new Path(args.dst, dst);
1368               if (cur.isFile() && (args.srcs.size() == 1)) {
1369                 // Copying a single file; use dst path provided by user as
1370                 // destination file rather than destination directory
1371                 Path dstparent = destPath.getParent();
1372                 FileSystem destFileSys = destPath.getFileSystem(jobConf);
1373                 if (!(destFileSys.exists(dstparent) &&
1374                     destFileSys.getFileStatus(dstparent).isDirectory())) {
1375                   destPath = dstparent;
1376                 }
1377               }
1378               //skip path if the src and the dst files are the same.
1379               skipPath = update &&
1380               	sameFile(srcfs, child, dstfs, destPath, skipCRCCheck);
1381               //skip path if it exceed file limit or size limit
1382               skipPath |= fileCount == args.filelimit
1383                           || byteCount + child.getLen() > args.sizelimit;
1384 
1385               if (!skipPath) {
1386                 ++fileCount;
1387                 byteCount += child.getLen();
1388 
1389                 if (LOG.isTraceEnabled()) {
1390                   LOG.trace("adding file " + child.getPath());
1391                 }
1392 
1393                 ++cnsyncf;
1394                 cbsyncs += child.getLen();
1395                 if (cnsyncf > SYNC_FILE_MAX || cbsyncs > maxBytesPerMap) {
1396                   src_writer.sync();
1397                   dst_writer.sync();
1398                   cnsyncf = 0;
1399                   cbsyncs = 0L;
1400                 }
1401               }
1402               else {
1403                 ++skipFileCount;
1404                 skipByteCount += child.getLen();
1405                 if (LOG.isTraceEnabled()) {
1406                   LOG.trace("skipping file " + child.getPath());
1407                 }
1408               }
1409             }
1410 
1411             if (!skipPath) {
1412               src_writer.append(new LongWritable(child.isDirectory()? 0: child.getLen()),
1413                   new FilePair(child, dst));
1414             }
1415 
1416             dst_writer.append(new Text(dst),
1417                 new Text(child.getPath().toString()));
1418           }
1419 
1420           if (cur.isDirectory()) {
1421             String dst = makeRelative(root, cur.getPath());
1422             dir_writer.append(new Text(dst), new FilePair(cur, dst));
1423             if (++dirsyn > SYNC_FILE_MAX) {
1424               dirsyn = 0;
1425               dir_writer.sync();
1426             }
1427           }
1428         }
1429       }
1430     }
1431     LOG.info("sourcePathsCount(files+directories)=" + srcCount);
1432     LOG.info("filesToCopyCount=" + fileCount);
1433     LOG.info("bytesToCopyCount=" +
1434              TraditionalBinaryPrefix.long2String(byteCount, "", 1));
1435     if (update) {
1436       LOG.info("filesToSkipCopyCount=" + skipFileCount);
1437       LOG.info("bytesToSkipCopyCount=" +
1438                TraditionalBinaryPrefix.long2String(skipByteCount, "", 1));
1439     }
1440     if (args.dryrun) {
1441       return false;
1442     }
1443     int mapCount = setMapCount(byteCount, jobConf);
1444     // Increase the replication of _distcp_src_files, if needed
1445     setReplication(conf, jobConf, srcfilelist, mapCount);
1446 
1447     FileStatus dststatus = null;
1448     try {
1449       dststatus = dstfs.getFileStatus(args.dst);
1450     } catch(FileNotFoundException fnfe) {
1451       LOG.info(args.dst + " does not exist.");
1452     }
1453 
1454     // create dest path dir if copying > 1 file
1455     if (dststatus == null) {
1456       if (srcCount > 1 && !dstfs.mkdirs(args.dst)) {
1457         throw new IOException("Failed to create" + args.dst);
1458       }
1459     }
1460 
1461     final Path sorted = new Path(jobDirectory, "_distcp_sorted");
1462     checkDuplication(jobfs, dstfilelist, sorted, conf);
1463 
1464     if (dststatus != null && args.flags.contains(Options.DELETE)) {
1465       long deletedPathsCount = deleteNonexisting(dstfs, dststatus, sorted,
1466           jobfs, jobDirectory, jobConf, conf);
1467       LOG.info("deletedPathsFromDestCount(files+directories)=" +
1468                deletedPathsCount);
1469     }
1470 
1471     Path tmpDir = new Path(
1472         (dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
1473         args.dst.getParent(): args.dst, "_distcp_tmp_" + randomId);
1474     jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString());
1475 
1476     // Explicitly create the tmpDir to ensure that it can be cleaned
1477     // up by fullyDelete() later.
1478     tmpDir.getFileSystem(conf).mkdirs(tmpDir);
1479 
1480     LOG.info("sourcePathsCount=" + srcCount);
1481     LOG.info("filesToCopyCount=" + fileCount);
1482     LOG.info("bytesToCopyCount=" +
1483              TraditionalBinaryPrefix.long2String(byteCount, "", 1));
1484     jobConf.setInt(SRC_COUNT_LABEL, srcCount);
1485     jobConf.setLong(TOTAL_SIZE_LABEL, byteCount);
1486 
1487     return (fileCount + dirCount) > 0;
1488   }
1489 
1490   /**
1491    * Check whether the contents of src and dst are the same.
1492    *
1493    * Return false if dstpath does not exist
1494    *
1495    * If the files have different sizes, return false.
1496    *
1497    * If the files have the same sizes, the file checksums will be compared.
1498    *
1499    * When file checksum is not supported in any of file systems,
1500    * two files are considered as the same if they have the same size.
1501    */
sameFile(FileSystem srcfs, FileStatus srcstatus, FileSystem dstfs, Path dstpath, boolean skipCRCCheck)1502   static private boolean sameFile(FileSystem srcfs, FileStatus srcstatus,
1503       FileSystem dstfs, Path dstpath, boolean skipCRCCheck) throws IOException {
1504     FileStatus dststatus;
1505     try {
1506       dststatus = dstfs.getFileStatus(dstpath);
1507     } catch(FileNotFoundException fnfe) {
1508       return false;
1509     }
1510 
1511     //same length?
1512     if (srcstatus.getLen() != dststatus.getLen()) {
1513       return false;
1514     }
1515 
1516     if (skipCRCCheck) {
1517       LOG.debug("Skipping the CRC check");
1518       return true;
1519     }
1520 
1521     //get src checksum
1522     final FileChecksum srccs;
1523     try {
1524       srccs = srcfs.getFileChecksum(srcstatus.getPath());
1525     } catch(FileNotFoundException fnfe) {
1526       /*
1527        * Two possible cases:
1528        * (1) src existed once but was deleted between the time period that
1529        *     srcstatus was obtained and the try block above.
1530        * (2) srcfs does not support file checksum and (incorrectly) throws
1531        *     FNFE, e.g. some previous versions of HftpFileSystem.
1532        * For case (1), it is okay to return true since src was already deleted.
1533        * For case (2), true should be returned.
1534        */
1535       return true;
1536     }
1537 
1538     //compare checksums
1539     try {
1540       final FileChecksum dstcs = dstfs.getFileChecksum(dststatus.getPath());
1541       //return true if checksum is not supported
1542       //(i.e. some of the checksums is null)
1543       return srccs == null || dstcs == null || srccs.equals(dstcs);
1544     } catch(FileNotFoundException fnfe) {
1545       return false;
1546     }
1547   }
1548 
1549   /**
1550    * Delete the dst files/dirs which do not exist in src
1551    *
1552    * @return total count of files and directories deleted from destination
1553    * @throws IOException
1554    */
deleteNonexisting( FileSystem dstfs, FileStatus dstroot, Path dstsorted, FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf )1555   static private long deleteNonexisting(
1556       FileSystem dstfs, FileStatus dstroot, Path dstsorted,
1557       FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf
1558       ) throws IOException {
1559     if (dstroot.isFile()) {
1560       throw new IOException("dst must be a directory when option "
1561           + Options.DELETE.cmd + " is set, but dst (= " + dstroot.getPath()
1562           + ") is not a directory.");
1563     }
1564 
1565     //write dst lsr results
1566     final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
1567     try (final SequenceFile.Writer writer = SequenceFile.createWriter(jobconf,
1568         Writer.file(dstlsr), Writer.keyClass(Text.class),
1569         Writer.valueClass(NullWritable.class), Writer.compression(
1570         SequenceFile.CompressionType.NONE))) {
1571       //do lsr to get all file statuses in dstroot
1572       final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
1573       for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
1574         final FileStatus status = lsrstack.pop();
1575         if (status.isDirectory()) {
1576           for(FileStatus child : dstfs.listStatus(status.getPath())) {
1577             String relative = makeRelative(dstroot.getPath(), child.getPath());
1578             writer.append(new Text(relative), NullWritable.get());
1579             lsrstack.push(child);
1580           }
1581         }
1582       }
1583     }
1584 
1585     //sort lsr results
1586     final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted");
1587     SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs,
1588         new Text.Comparator(), Text.class, NullWritable.class, jobconf);
1589     sorter.sort(dstlsr, sortedlsr);
1590 
1591     //compare lsr list and dst list
1592     long deletedPathsCount = 0;
1593     try (SequenceFile.Reader lsrin =
1594              new SequenceFile.Reader(jobconf, Reader.file(sortedlsr));
1595          SequenceFile.Reader  dstin =
1596              new SequenceFile.Reader(jobconf, Reader.file(dstsorted))) {
1597       //compare sorted lsr list and sorted dst list
1598       final Text lsrpath = new Text();
1599       final Text dstpath = new Text();
1600       final Text dstfrom = new Text();
1601       final Trash trash = new Trash(dstfs, conf);
1602       Path lastpath = null;
1603 
1604       boolean hasnext = dstin.next(dstpath, dstfrom);
1605       while (lsrin.next(lsrpath, NullWritable.get())) {
1606         int dst_cmp_lsr = dstpath.compareTo(lsrpath);
1607         while (hasnext && dst_cmp_lsr < 0) {
1608           hasnext = dstin.next(dstpath, dstfrom);
1609           dst_cmp_lsr = dstpath.compareTo(lsrpath);
1610         }
1611 
1612         if (dst_cmp_lsr == 0) {
1613           //lsrpath exists in dst, skip it
1614           hasnext = dstin.next(dstpath, dstfrom);
1615         } else {
1616           //lsrpath does not exist, delete it
1617           final Path rmpath = new Path(dstroot.getPath(), lsrpath.toString());
1618           ++deletedPathsCount;
1619           if ((lastpath == null || !isAncestorPath(lastpath, rmpath))) {
1620             if (!(trash.moveToTrash(rmpath) || dstfs.delete(rmpath, true))) {
1621               throw new IOException("Failed to delete " + rmpath);
1622             }
1623             lastpath = rmpath;
1624           }
1625         }
1626       }
1627     }
1628     return deletedPathsCount;
1629   }
1630 
1631   //is x an ancestor path of y?
isAncestorPath(Path xp, Path yp)1632   static private boolean isAncestorPath(Path xp, Path yp) {
1633     final String x = xp.toString();
1634     final String y = yp.toString();
1635     if (!y.startsWith(x)) {
1636       return false;
1637     }
1638     final int len = x.length();
1639     return y.length() == len || y.charAt(len) == Path.SEPARATOR_CHAR;
1640   }
1641 
1642   /** Check whether the file list have duplication. */
checkDuplication(FileSystem fs, Path file, Path sorted, Configuration conf)1643   static private void checkDuplication(FileSystem fs, Path file, Path sorted,
1644     Configuration conf) throws IOException {
1645     SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
1646       new Text.Comparator(), Text.class, Text.class, conf);
1647     sorter.sort(file, sorted);
1648     try (SequenceFile.Reader in =
1649          new SequenceFile.Reader(conf, Reader.file(sorted))) {
1650       Text prevdst = null, curdst = new Text();
1651       Text prevsrc = null, cursrc = new Text();
1652       for(; in.next(curdst, cursrc); ) {
1653         if (prevdst != null && curdst.equals(prevdst)) {
1654           throw new DuplicationException(
1655             "Invalid input, there are duplicated files in the sources: "
1656             + prevsrc + ", " + cursrc);
1657         }
1658         prevdst = curdst;
1659         curdst = new Text();
1660         prevsrc = cursrc;
1661         cursrc = new Text();
1662       }
1663     }
1664   }
1665 
1666   /** An exception class for duplicated source files. */
1667   public static class DuplicationException extends IOException {
1668     private static final long serialVersionUID = 1L;
1669     /** Error code for this exception */
1670     public static final int ERROR_CODE = -2;
DuplicationException(String message)1671     DuplicationException(String message) {super(message);}
1672   }
1673 }
1674