1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.tools;
20 
21 import java.io.BufferedReader;
22 import java.io.DataInput;
23 import java.io.DataOutput;
24 import java.io.FileNotFoundException;
25 import java.io.IOException;
26 import java.io.InputStreamReader;
27 import java.util.ArrayList;
28 import java.util.EnumSet;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Random;
32 import java.util.Stack;
33 import java.util.StringTokenizer;
34 
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FSDataInputStream;
39 import org.apache.hadoop.fs.FSDataOutputStream;
40 import org.apache.hadoop.fs.FileChecksum;
41 import org.apache.hadoop.fs.FileStatus;
42 import org.apache.hadoop.fs.FileSystem;
43 import org.apache.hadoop.fs.FsShell;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.fs.permission.FsPermission;
46 import org.apache.hadoop.hdfs.HftpFileSystem;
47 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
48 import org.apache.hadoop.io.LongWritable;
49 import org.apache.hadoop.io.SequenceFile;
50 import org.apache.hadoop.io.Text;
51 import org.apache.hadoop.io.Writable;
52 import org.apache.hadoop.io.WritableComparable;
53 import org.apache.hadoop.ipc.RemoteException;
54 import org.apache.hadoop.mapred.FileOutputFormat;
55 import org.apache.hadoop.mapred.FileSplit;
56 import org.apache.hadoop.mapred.InputFormat;
57 import org.apache.hadoop.mapred.InputSplit;
58 import org.apache.hadoop.mapred.InvalidInputException;
59 import org.apache.hadoop.mapred.JobClient;
60 import org.apache.hadoop.mapred.JobConf;
61 import org.apache.hadoop.mapred.JobTracker;
62 import org.apache.hadoop.mapred.Mapper;
63 import org.apache.hadoop.mapred.OutputCollector;
64 import org.apache.hadoop.mapred.RecordReader;
65 import org.apache.hadoop.mapred.Reporter;
66 import org.apache.hadoop.mapred.SequenceFileRecordReader;
67 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
68 import org.apache.hadoop.mapreduce.security.TokenCache;
69 import org.apache.hadoop.security.AccessControlException;
70 import org.apache.hadoop.util.StringUtils;
71 import org.apache.hadoop.util.Tool;
72 import org.apache.hadoop.util.ToolRunner;
73 
74 /**
75  * A Map-reduce program to recursively copy directories between
76  * different file-systems.
77  */
78 public class DistCp implements Tool {
79   public static final Log LOG = LogFactory.getLog(DistCp.class);
80 
81   private static final String NAME = "distcp";
82 
83   private static final String usage = NAME
84     + " [OPTIONS] <srcurl>* <desturl>" +
85     "\n\nOPTIONS:" +
86     "\n-p[rbugp]              Preserve status" +
87     "\n                       r: replication number" +
88     "\n                       b: block size" +
89     "\n                       u: user" +
90     "\n                       g: group" +
91     "\n                       p: permission" +
92     "\n                       -p alone is equivalent to -prbugp" +
93     "\n-i                     Ignore failures" +
94     "\n-log <logdir>          Write logs to <logdir>" +
95     "\n-m <num_maps>          Maximum number of simultaneous copies" +
96     "\n-overwrite             Overwrite destination" +
97     "\n-update                Overwrite if src size different from dst size" +
98     "\n-skipcrccheck          Do not use CRC check to determine if src is " +
99     "\n                       different from dest. Relevant only if -update" +
100     "\n                       is specified" +
101     "\n-f <urilist_uri>       Use list at <urilist_uri> as src list" +
102     "\n-filelimit <n>         Limit the total number of files to be <= n" +
103     "\n-sizelimit <n>         Limit the total size to be <= n bytes" +
104     "\n-delete                Delete the files existing in the dst but not in src" +
105     "\n-mapredSslConf <f>     Filename of SSL configuration for mapper task" +
106 
107     "\n\nNOTE 1: if -overwrite or -update are set, each source URI is " +
108     "\n      interpreted as an isomorphic update to an existing directory." +
109     "\nFor example:" +
110     "\nhadoop " + NAME + " -p -update \"hdfs://A:8020/user/foo/bar\" " +
111     "\"hdfs://B:8020/user/foo/baz\"\n" +
112     "\n     would update all descendants of 'baz' also in 'bar'; it would " +
113     "\n     *not* update /user/foo/baz/bar" +
114 
115     "\n\nNOTE 2: The parameter <n> in -filelimit and -sizelimit can be " +
116     "\n     specified with symbolic representation.  For examples," +
117     "\n       1230k = 1230 * 1024 = 1259520" +
118     "\n       891g = 891 * 1024^3 = 956703965184" +
119 
120     "\n";
121 
122   private static final long BYTES_PER_MAP =  256 * 1024 * 1024;
123   private static final int MAX_MAPS_PER_NODE = 20;
124   private static final int SYNC_FILE_MAX = 10;
125 
126   static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
127   static enum Options {
128     DELETE("-delete", NAME + ".delete"),
129     FILE_LIMIT("-filelimit", NAME + ".limit.file"),
130     SIZE_LIMIT("-sizelimit", NAME + ".limit.size"),
131     IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
132     PRESERVE_STATUS("-p", NAME + ".preserve.status"),
133     OVERWRITE("-overwrite", NAME + ".overwrite.always"),
134     UPDATE("-update", NAME + ".overwrite.ifnewer"),
135     SKIPCRC("-skipcrccheck", NAME + ".skip.crc.check");
136 
137     final String cmd, propertyname;
138 
Options(String cmd, String propertyname)139     private Options(String cmd, String propertyname) {
140       this.cmd = cmd;
141       this.propertyname = propertyname;
142     }
143 
parseLong(String[] args, int offset)144     private long parseLong(String[] args, int offset) {
145       if (offset ==  args.length) {
146         throw new IllegalArgumentException("<n> not specified in " + cmd);
147       }
148       long n = StringUtils.TraditionalBinaryPrefix.string2long(args[offset]);
149       if (n <= 0) {
150         throw new IllegalArgumentException("n = " + n + " <= 0 in " + cmd);
151       }
152       return n;
153     }
154   }
155   static enum FileAttribute {
156     BLOCK_SIZE, REPLICATION, USER, GROUP, PERMISSION;
157 
158     final char symbol;
159 
FileAttribute()160     private FileAttribute() {symbol = toString().toLowerCase().charAt(0);}
161 
parse(String s)162     static EnumSet<FileAttribute> parse(String s) {
163       if (s == null || s.length() == 0) {
164         return EnumSet.allOf(FileAttribute.class);
165       }
166 
167       EnumSet<FileAttribute> set = EnumSet.noneOf(FileAttribute.class);
168       FileAttribute[] attributes = values();
169       for(char c : s.toCharArray()) {
170         int i = 0;
171         for(; i < attributes.length && c != attributes[i].symbol; i++);
172         if (i < attributes.length) {
173           if (!set.contains(attributes[i])) {
174             set.add(attributes[i]);
175           } else {
176             throw new IllegalArgumentException("There are more than one '"
177                 + attributes[i].symbol + "' in " + s);
178           }
179         } else {
180           throw new IllegalArgumentException("'" + c + "' in " + s
181               + " is undefined.");
182         }
183       }
184       return set;
185     }
186   }
187 
188   static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
189   static final String DST_DIR_LABEL = NAME + ".dest.path";
190   static final String JOB_DIR_LABEL = NAME + ".job.dir";
191   static final String MAX_MAPS_LABEL = NAME + ".max.map.tasks";
192   static final String SRC_LIST_LABEL = NAME + ".src.list";
193   static final String SRC_COUNT_LABEL = NAME + ".src.count";
194   static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
195   static final String DST_DIR_LIST_LABEL = NAME + ".dst.dir.list";
196   static final String BYTES_PER_MAP_LABEL = NAME + ".bytes.per.map";
197   static final String PRESERVE_STATUS_LABEL
198       = Options.PRESERVE_STATUS.propertyname + ".value";
199 
200   private JobConf conf;
201 
setConf(Configuration conf)202   public void setConf(Configuration conf) {
203     if (conf instanceof JobConf) {
204       this.conf = (JobConf) conf;
205     } else {
206       this.conf = new JobConf(conf);
207     }
208   }
209 
getConf()210   public Configuration getConf() {
211     return conf;
212   }
213 
DistCp(Configuration conf)214   public DistCp(Configuration conf) {
215     setConf(conf);
216   }
217 
218   /**
219    * An input/output pair of filenames.
220    */
221   static class FilePair implements Writable {
222     FileStatus input = new FileStatus();
223     String output;
FilePair()224     FilePair() { }
FilePair(FileStatus input, String output)225     FilePair(FileStatus input, String output) {
226       this.input = input;
227       this.output = output;
228     }
readFields(DataInput in)229     public void readFields(DataInput in) throws IOException {
230       input.readFields(in);
231       output = Text.readString(in);
232     }
write(DataOutput out)233     public void write(DataOutput out) throws IOException {
234       input.write(out);
235       Text.writeString(out, output);
236     }
toString()237     public String toString() {
238       return input + " : " + output;
239     }
240   }
241 
242   /**
243    * InputFormat of a distcp job responsible for generating splits of the src
244    * file list.
245    */
246   static class CopyInputFormat implements InputFormat<Text, Text> {
247 
248     /**
249      * Produce splits such that each is no greater than the quotient of the
250      * total size and the number of splits requested.
251      * @param job The handle to the JobConf object
252      * @param numSplits Number of splits requested
253      */
getSplits(JobConf job, int numSplits)254     public InputSplit[] getSplits(JobConf job, int numSplits)
255         throws IOException {
256       int cnfiles = job.getInt(SRC_COUNT_LABEL, -1);
257       long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1);
258       String srcfilelist = job.get(SRC_LIST_LABEL, "");
259       if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) {
260         throw new RuntimeException("Invalid metadata: #files(" + cnfiles +
261                                    ") total_size(" + cbsize + ") listuri(" +
262                                    srcfilelist + ")");
263       }
264       Path src = new Path(srcfilelist);
265       FileSystem fs = src.getFileSystem(job);
266       FileStatus srcst = fs.getFileStatus(src);
267 
268       ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
269       LongWritable key = new LongWritable();
270       FilePair value = new FilePair();
271       final long targetsize = cbsize / numSplits;
272       long pos = 0L;
273       long last = 0L;
274       long acc = 0L;
275       long cbrem = srcst.getLen();
276       SequenceFile.Reader sl = null;
277       try {
278         sl = new SequenceFile.Reader(fs, src, job);
279         for (; sl.next(key, value); last = sl.getPosition()) {
280           // if adding this split would put this split past the target size,
281           // cut the last split and put this next file in the next split.
282           if (acc + key.get() > targetsize && acc != 0) {
283             long splitsize = last - pos;
284             splits.add(new FileSplit(src, pos, splitsize, (String[])null));
285             cbrem -= splitsize;
286             pos = last;
287             acc = 0L;
288           }
289           acc += key.get();
290         }
291       }
292       finally {
293         checkAndClose(sl);
294       }
295       if (cbrem != 0) {
296         splits.add(new FileSplit(src, pos, cbrem, (String[])null));
297       }
298 
299       return splits.toArray(new FileSplit[splits.size()]);
300     }
301 
302     /**
303      * Returns a reader for this split of the src file list.
304      */
getRecordReader(InputSplit split, JobConf job, Reporter reporter)305     public RecordReader<Text, Text> getRecordReader(InputSplit split,
306         JobConf job, Reporter reporter) throws IOException {
307       return new SequenceFileRecordReader<Text, Text>(job, (FileSplit)split);
308     }
309   }
310 
311   /**
312    * FSCopyFilesMapper: The mapper for copying files between FileSystems.
313    */
314   static class CopyFilesMapper
315       implements Mapper<LongWritable, FilePair, WritableComparable<?>, Text> {
316     // config
317     private int sizeBuf = 128 * 1024;
318     private FileSystem destFileSys = null;
319     private boolean ignoreReadFailures;
320     private boolean preserve_status;
321     private EnumSet<FileAttribute> preseved;
322     private boolean overwrite;
323     private boolean update;
324     private Path destPath = null;
325     private byte[] buffer = null;
326     private JobConf job;
327     private boolean skipCRCCheck = false;
328 
329     // stats
330     private int failcount = 0;
331     private int skipcount = 0;
332     private int copycount = 0;
333 
getCountString()334     private String getCountString() {
335       return "Copied: " + copycount + " Skipped: " + skipcount
336           + " Failed: " + failcount;
337     }
updateStatus(Reporter reporter)338     private void updateStatus(Reporter reporter) {
339       reporter.setStatus(getCountString());
340     }
341 
342     /**
343      * Return true if dst should be replaced by src and the update flag is set.
344      * Right now, this merely checks that the src and dst len are not equal.
345      * This should be improved on once modification times, CRCs, etc. can
346      * be meaningful in this context.
347      * @throws IOException
348      */
needsUpdate(FileStatus srcstatus, FileSystem dstfs, Path dstpath)349     private boolean needsUpdate(FileStatus srcstatus,
350         FileSystem dstfs, Path dstpath) throws IOException {
351       return update && !sameFile(srcstatus.getPath().getFileSystem(job),
352           srcstatus, dstfs, dstpath, skipCRCCheck);
353     }
354 
create(Path f, Reporter reporter, FileStatus srcstat)355     private FSDataOutputStream create(Path f, Reporter reporter,
356         FileStatus srcstat) throws IOException {
357       if (destFileSys.exists(f)) {
358         destFileSys.delete(f, false);
359       }
360       if (!preserve_status) {
361         return destFileSys.create(f, true, sizeBuf, reporter);
362       }
363 
364       FsPermission permission = preseved.contains(FileAttribute.PERMISSION)?
365           srcstat.getPermission(): null;
366       short replication = preseved.contains(FileAttribute.REPLICATION)?
367           srcstat.getReplication(): destFileSys.getDefaultReplication();
368       long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)?
369           srcstat.getBlockSize(): destFileSys.getDefaultBlockSize();
370       return destFileSys.create(f, permission, true, sizeBuf, replication,
371           blockSize, reporter);
372     }
373 
374     /**
375      * Copy a file to a destination.
376      * @param srcstat src path and metadata
377      * @param dstpath dst path
378      * @param reporter
379      */
copy(FileStatus srcstat, Path relativedst, OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter)380     private void copy(FileStatus srcstat, Path relativedst,
381         OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter)
382         throws IOException {
383       Path absdst = new Path(destPath, relativedst);
384       int totfiles = job.getInt(SRC_COUNT_LABEL, -1);
385       assert totfiles >= 0 : "Invalid file count " + totfiles;
386 
387       // if a directory, ensure created even if empty
388       if (srcstat.isDir()) {
389         if (destFileSys.exists(absdst)) {
390           if (!destFileSys.getFileStatus(absdst).isDir()) {
391             throw new IOException("Failed to mkdirs: " + absdst+" is a file.");
392           }
393         }
394         else if (!destFileSys.mkdirs(absdst)) {
395           throw new IOException("Failed to mkdirs " + absdst);
396         }
397         // TODO: when modification times can be set, directories should be
398         // emitted to reducers so they might be preserved. Also, mkdirs does
399         // not currently return an error when the directory already exists;
400         // if this changes, all directory work might as well be done in reduce
401         return;
402       }
403 
404       if (destFileSys.exists(absdst) && !overwrite
405           && !needsUpdate(srcstat, destFileSys, absdst)) {
406         outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
407         ++skipcount;
408         reporter.incrCounter(Counter.SKIP, 1);
409         updateStatus(reporter);
410         return;
411       }
412 
413       Path tmpfile = new Path(job.get(TMP_DIR_LABEL), relativedst);
414       long cbcopied = 0L;
415       FSDataInputStream in = null;
416       FSDataOutputStream out = null;
417       try {
418         // open src file
419         in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
420         reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
421         // open tmp file
422         out = create(tmpfile, reporter, srcstat);
423         // copy file
424         for(int cbread; (cbread = in.read(buffer)) >= 0; ) {
425           out.write(buffer, 0, cbread);
426           cbcopied += cbread;
427           reporter.setStatus(
428               String.format("%.2f ", cbcopied*100.0/srcstat.getLen())
429               + absdst + " [ " +
430               StringUtils.humanReadableInt(cbcopied) + " / " +
431               StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
432         }
433       } finally {
434         checkAndClose(in);
435         checkAndClose(out);
436       }
437 
438       if (cbcopied != srcstat.getLen()) {
439         throw new IOException("File size not matched: copied "
440             + bytesString(cbcopied) + " to tmpfile (=" + tmpfile
441             + ") but expected " + bytesString(srcstat.getLen())
442             + " from " + srcstat.getPath());
443       }
444       else {
445         if (totfiles == 1) {
446           // Copying a single file; use dst path provided by user as destination
447           // rather than destination directory, if a file
448           Path dstparent = absdst.getParent();
449           if (!(destFileSys.exists(dstparent) &&
450                 destFileSys.getFileStatus(dstparent).isDir())) {
451             absdst = dstparent;
452           }
453         }
454         if (destFileSys.exists(absdst) &&
455             destFileSys.getFileStatus(absdst).isDir()) {
456           throw new IOException(absdst + " is a directory");
457         }
458         if (!destFileSys.mkdirs(absdst.getParent())) {
459           throw new IOException("Failed to craete parent dir: " + absdst.getParent());
460         }
461         rename(tmpfile, absdst);
462 
463         FileStatus dststat = destFileSys.getFileStatus(absdst);
464         if (dststat.getLen() != srcstat.getLen()) {
465           destFileSys.delete(absdst, false);
466           throw new IOException("File size not matched: copied "
467               + bytesString(dststat.getLen()) + " to dst (=" + absdst
468               + ") but expected " + bytesString(srcstat.getLen())
469               + " from " + srcstat.getPath());
470         }
471         updatePermissions(srcstat, dststat);
472       }
473 
474       // report at least once for each file
475       ++copycount;
476       reporter.incrCounter(Counter.BYTESCOPIED, cbcopied);
477       reporter.incrCounter(Counter.COPY, 1);
478       updateStatus(reporter);
479     }
480 
481     /** rename tmp to dst, delete dst if already exists */
rename(Path tmp, Path dst)482     private void rename(Path tmp, Path dst) throws IOException {
483       try {
484         if (destFileSys.exists(dst)) {
485           destFileSys.delete(dst, true);
486         }
487         if (!destFileSys.rename(tmp, dst)) {
488           throw new IOException();
489         }
490       }
491       catch(IOException cause) {
492         throw (IOException)new IOException("Fail to rename tmp file (=" + tmp
493             + ") to destination file (=" + dst + ")").initCause(cause);
494       }
495     }
496 
updatePermissions(FileStatus src, FileStatus dst )497     private void updatePermissions(FileStatus src, FileStatus dst
498         ) throws IOException {
499       if (preserve_status) {
500         DistCp.updatePermissions(src, dst, preseved, destFileSys);
501       }
502     }
503 
bytesString(long b)504     static String bytesString(long b) {
505       return b + " bytes (" + StringUtils.humanReadableInt(b) + ")";
506     }
507 
508     /** Mapper configuration.
509      * Extracts source and destination file system, as well as
510      * top-level paths on source and destination directories.
511      * Gets the named file systems, to be used later in map.
512      */
configure(JobConf job)513     public void configure(JobConf job)
514     {
515       destPath = new Path(job.get(DST_DIR_LABEL, "/"));
516       try {
517         destFileSys = destPath.getFileSystem(job);
518       } catch (IOException ex) {
519         throw new RuntimeException("Unable to get the named file system.", ex);
520       }
521       sizeBuf = job.getInt("copy.buf.size", 128 * 1024);
522       buffer = new byte[sizeBuf];
523       ignoreReadFailures = job.getBoolean(Options.IGNORE_READ_FAILURES.propertyname, false);
524       preserve_status = job.getBoolean(Options.PRESERVE_STATUS.propertyname, false);
525       if (preserve_status) {
526         preseved = FileAttribute.parse(job.get(PRESERVE_STATUS_LABEL));
527       }
528       update = job.getBoolean(Options.UPDATE.propertyname, false);
529       overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname, false);
530       skipCRCCheck = job.getBoolean(Options.SKIPCRC.propertyname, false);
531       this.job = job;
532     }
533 
534     /** Map method. Copies one file from source file system to destination.
535      * @param key src len
536      * @param value FilePair (FileStatus src, Path dst)
537      * @param out Log of failed copies
538      * @param reporter
539      */
map(LongWritable key, FilePair value, OutputCollector<WritableComparable<?>, Text> out, Reporter reporter)540     public void map(LongWritable key,
541                     FilePair value,
542                     OutputCollector<WritableComparable<?>, Text> out,
543                     Reporter reporter) throws IOException {
544       final FileStatus srcstat = value.input;
545       final Path relativedst = new Path(value.output);
546       try {
547         copy(srcstat, relativedst, out, reporter);
548       } catch (IOException e) {
549         ++failcount;
550         reporter.incrCounter(Counter.FAIL, 1);
551         updateStatus(reporter);
552         final String sfailure = "FAIL " + relativedst + " : " +
553                           StringUtils.stringifyException(e);
554         out.collect(null, new Text(sfailure));
555         LOG.info(sfailure);
556         try {
557           for (int i = 0; i < 3; ++i) {
558             try {
559               final Path tmp = new Path(job.get(TMP_DIR_LABEL), relativedst);
560               if (destFileSys.delete(tmp, true))
561                 break;
562             } catch (Throwable ex) {
563               // ignore, we are just cleaning up
564               LOG.debug("Ignoring cleanup exception", ex);
565             }
566             // update status, so we don't get timed out
567             updateStatus(reporter);
568             Thread.sleep(3 * 1000);
569           }
570         } catch (InterruptedException inte) {
571           throw (IOException)new IOException().initCause(inte);
572         }
573       } finally {
574         updateStatus(reporter);
575       }
576     }
577 
close()578     public void close() throws IOException {
579       if (0 == failcount || ignoreReadFailures) {
580         return;
581       }
582       throw new IOException(getCountString());
583     }
584   }
585 
fetchFileList(Configuration conf, Path srcList)586   private static List<Path> fetchFileList(Configuration conf, Path srcList)
587       throws IOException {
588     List<Path> result = new ArrayList<Path>();
589     FileSystem fs = srcList.getFileSystem(conf);
590     BufferedReader input = null;
591     try {
592       input = new BufferedReader(new InputStreamReader(fs.open(srcList)));
593       String line = input.readLine();
594       while (line != null) {
595         result.add(new Path(line));
596         line = input.readLine();
597       }
598     } finally {
599       checkAndClose(input);
600     }
601     return result;
602   }
603 
604   @Deprecated
copy(Configuration conf, String srcPath, String destPath, Path logPath, boolean srcAsList, boolean ignoreReadFailures)605   public static void copy(Configuration conf, String srcPath,
606                           String destPath, Path logPath,
607                           boolean srcAsList, boolean ignoreReadFailures)
608       throws IOException {
609     final Path src = new Path(srcPath);
610     List<Path> tmp = new ArrayList<Path>();
611     if (srcAsList) {
612       tmp.addAll(fetchFileList(conf, src));
613     } else {
614       tmp.add(src);
615     }
616     EnumSet<Options> flags = ignoreReadFailures
617       ? EnumSet.of(Options.IGNORE_READ_FAILURES)
618       : EnumSet.noneOf(Options.class);
619 
620     final Path dst = new Path(destPath);
621     copy(conf, new Arguments(tmp, dst, logPath, flags, null,
622         Long.MAX_VALUE, Long.MAX_VALUE, null));
623   }
624 
625   /** Sanity check for srcPath */
checkSrcPath(JobConf jobConf, List<Path> srcPaths)626   private static void checkSrcPath(JobConf jobConf, List<Path> srcPaths)
627   throws IOException {
628     List<IOException> rslt = new ArrayList<IOException>();
629 
630     Path[] ps = new Path[srcPaths.size()];
631     ps = srcPaths.toArray(ps);
632     TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(), ps, jobConf);
633 
634     for (Path p : srcPaths) {
635       FileSystem fs = p.getFileSystem(jobConf);
636       if (!fs.exists(p)) {
637         rslt.add(new IOException("Input source " + p + " does not exist."));
638       }
639     }
640     if (!rslt.isEmpty()) {
641       throw new InvalidInputException(rslt);
642     }
643   }
644 
645   /**
646    * Driver to copy srcPath to destPath depending on required protocol.
647    * @param args arguments
648    */
copy(final Configuration conf, final Arguments args )649   static void copy(final Configuration conf, final Arguments args
650       ) throws IOException {
651     LOG.info("srcPaths=" + args.srcs);
652     LOG.info("destPath=" + args.dst);
653 
654     JobConf job = createJobConf(conf);
655 
656     checkSrcPath(job, args.srcs);
657     if (args.preservedAttributes != null) {
658       job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
659     }
660     if (args.mapredSslConf != null) {
661       job.set("dfs.https.client.keystore.resource", args.mapredSslConf);
662     }
663 
664     //Initialize the mapper
665     try {
666       if (setup(conf, job, args)) {
667         JobClient.runJob(job);
668       }
669       finalize(conf, job, args.dst, args.preservedAttributes);
670     } finally {
671       //delete tmp
672       fullyDelete(job.get(TMP_DIR_LABEL), job);
673       //delete jobDirectory
674       fullyDelete(job.get(JOB_DIR_LABEL), job);
675     }
676   }
677 
updatePermissions(FileStatus src, FileStatus dst, EnumSet<FileAttribute> preseved, FileSystem destFileSys )678   private static void updatePermissions(FileStatus src, FileStatus dst,
679       EnumSet<FileAttribute> preseved, FileSystem destFileSys
680       ) throws IOException {
681     String owner = null;
682     String group = null;
683     if (preseved.contains(FileAttribute.USER)
684         && !src.getOwner().equals(dst.getOwner())) {
685       owner = src.getOwner();
686     }
687     if (preseved.contains(FileAttribute.GROUP)
688         && !src.getGroup().equals(dst.getGroup())) {
689       group = src.getGroup();
690     }
691     if (owner != null || group != null) {
692       destFileSys.setOwner(dst.getPath(), owner, group);
693     }
694     if (preseved.contains(FileAttribute.PERMISSION)
695         && !src.getPermission().equals(dst.getPermission())) {
696       destFileSys.setPermission(dst.getPath(), src.getPermission());
697     }
698   }
699 
finalize(Configuration conf, JobConf jobconf, final Path destPath, String presevedAttributes)700   static private void finalize(Configuration conf, JobConf jobconf,
701       final Path destPath, String presevedAttributes) throws IOException {
702     if (presevedAttributes == null) {
703       return;
704     }
705     EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
706     if (!preseved.contains(FileAttribute.USER)
707         && !preseved.contains(FileAttribute.GROUP)
708         && !preseved.contains(FileAttribute.PERMISSION)) {
709       return;
710     }
711 
712     FileSystem dstfs = destPath.getFileSystem(conf);
713     Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
714     SequenceFile.Reader in = null;
715     try {
716       in = new SequenceFile.Reader(dstdirlist.getFileSystem(jobconf),
717           dstdirlist, jobconf);
718       Text dsttext = new Text();
719       FilePair pair = new FilePair();
720       for(; in.next(dsttext, pair); ) {
721         Path absdst = new Path(destPath, pair.output);
722         updatePermissions(pair.input, dstfs.getFileStatus(absdst),
723             preseved, dstfs);
724       }
725     } finally {
726       checkAndClose(in);
727     }
728   }
729 
730   static private class Arguments {
731     final List<Path> srcs;
732     final Path dst;
733     final Path log;
734     final EnumSet<Options> flags;
735     final String preservedAttributes;
736     final long filelimit;
737     final long sizelimit;
738     final String mapredSslConf;
739 
740     /**
741      * Arguments for distcp
742      * @param srcs List of source paths
743      * @param dst Destination path
744      * @param log Log output directory
745      * @param flags Command-line flags
746      * @param preservedAttributes Preserved attributes
747      * @param filelimit File limit
748      * @param sizelimit Size limit
749      */
Arguments(List<Path> srcs, Path dst, Path log, EnumSet<Options> flags, String preservedAttributes, long filelimit, long sizelimit, String mapredSslConf)750     Arguments(List<Path> srcs, Path dst, Path log,
751         EnumSet<Options> flags, String preservedAttributes,
752         long filelimit, long sizelimit, String mapredSslConf) {
753       this.srcs = srcs;
754       this.dst = dst;
755       this.log = log;
756       this.flags = flags;
757       this.preservedAttributes = preservedAttributes;
758       this.filelimit = filelimit;
759       this.sizelimit = sizelimit;
760       this.mapredSslConf = mapredSslConf;
761 
762       if (LOG.isTraceEnabled()) {
763         LOG.trace("this = " + this);
764       }
765     }
766 
valueOf(String[] args, Configuration conf )767     static Arguments valueOf(String[] args, Configuration conf
768         ) throws IOException {
769       List<Path> srcs = new ArrayList<Path>();
770       Path dst = null;
771       Path log = null;
772       EnumSet<Options> flags = EnumSet.noneOf(Options.class);
773       String presevedAttributes = null;
774       String mapredSslConf = null;
775       long filelimit = Long.MAX_VALUE;
776       long sizelimit = Long.MAX_VALUE;
777 
778       for (int idx = 0; idx < args.length; idx++) {
779         Options[] opt = Options.values();
780         int i = 0;
781         for(; i < opt.length && !args[idx].startsWith(opt[i].cmd); i++);
782 
783         if (i < opt.length) {
784           flags.add(opt[i]);
785           if (opt[i] == Options.PRESERVE_STATUS) {
786             presevedAttributes =  args[idx].substring(2);
787             FileAttribute.parse(presevedAttributes); //validation
788           }
789           else if (opt[i] == Options.FILE_LIMIT) {
790             filelimit = Options.FILE_LIMIT.parseLong(args, ++idx);
791           }
792           else if (opt[i] == Options.SIZE_LIMIT) {
793             sizelimit = Options.SIZE_LIMIT.parseLong(args, ++idx);
794           }
795         } else if ("-f".equals(args[idx])) {
796           if (++idx ==  args.length) {
797             throw new IllegalArgumentException("urilist_uri not specified in -f");
798           }
799           srcs.addAll(fetchFileList(conf, new Path(args[idx])));
800         } else if ("-log".equals(args[idx])) {
801           if (++idx ==  args.length) {
802             throw new IllegalArgumentException("logdir not specified in -log");
803           }
804           log = new Path(args[idx]);
805         } else if ("-mapredSslConf".equals(args[idx])) {
806           if (++idx ==  args.length) {
807             throw new IllegalArgumentException("ssl conf file not specified in -mapredSslConf");
808           }
809           mapredSslConf = args[idx];
810         } else if ("-m".equals(args[idx])) {
811           if (++idx == args.length) {
812             throw new IllegalArgumentException("num_maps not specified in -m");
813           }
814           try {
815             conf.setInt(MAX_MAPS_LABEL, Integer.valueOf(args[idx]));
816           } catch (NumberFormatException e) {
817             throw new IllegalArgumentException("Invalid argument to -m: " +
818                                                args[idx]);
819           }
820         } else if ('-' == args[idx].codePointAt(0)) {
821           throw new IllegalArgumentException("Invalid switch " + args[idx]);
822         } else if (idx == args.length -1) {
823           dst = new Path(args[idx]);
824         } else {
825           srcs.add(new Path(args[idx]));
826         }
827       }
828       // mandatory command-line parameters
829       if (srcs.isEmpty() || dst == null) {
830         throw new IllegalArgumentException("Missing "
831             + (dst == null ? "dst path" : "src"));
832       }
833       // incompatible command-line flags
834       final boolean isOverwrite = flags.contains(Options.OVERWRITE);
835       final boolean isUpdate = flags.contains(Options.UPDATE);
836       final boolean isDelete = flags.contains(Options.DELETE);
837       final boolean skipCRC = flags.contains(Options.SKIPCRC);
838 
839       if (isOverwrite && isUpdate) {
840         throw new IllegalArgumentException("Conflicting overwrite policies");
841       }
842       if (isDelete && !isOverwrite && !isUpdate) {
843         throw new IllegalArgumentException(Options.DELETE.cmd
844             + " must be specified with " + Options.OVERWRITE + " or "
845             + Options.UPDATE + ".");
846       }
847       if (!isUpdate && skipCRC) {
848         throw new IllegalArgumentException(
849             Options.SKIPCRC.cmd + " is relevant only with the " +
850             Options.UPDATE.cmd + " option");
851       }
852       return new Arguments(srcs, dst, log, flags, presevedAttributes,
853           filelimit, sizelimit, mapredSslConf);
854     }
855 
856     /** {@inheritDoc} */
toString()857     public String toString() {
858       return getClass().getName() + "{"
859           + "\n  srcs = " + srcs
860           + "\n  dst = " + dst
861           + "\n  log = " + log
862           + "\n  flags = " + flags
863           + "\n  preservedAttributes = " + preservedAttributes
864           + "\n  filelimit = " + filelimit
865           + "\n  sizelimit = " + sizelimit
866           + "\n  mapredSslConf = " + mapredSslConf
867           + "\n}";
868     }
869   }
870 
871   /**
872    * This is the main driver for recursively copying directories
873    * across file systems. It takes at least two cmdline parameters. A source
874    * URL and a destination URL. It then essentially does an "ls -lR" on the
875    * source URL, and writes the output in a round-robin manner to all the map
876    * input files. The mapper actually copies the files allotted to it. The
877    * reduce is empty.
878    */
run(String[] args)879   public int run(String[] args) {
880     try {
881       copy(conf, Arguments.valueOf(args, conf));
882       return 0;
883     } catch (IllegalArgumentException e) {
884       System.err.println(StringUtils.stringifyException(e) + "\n" + usage);
885       ToolRunner.printGenericCommandUsage(System.err);
886       return -1;
887     } catch (DuplicationException e) {
888       System.err.println(StringUtils.stringifyException(e));
889       return DuplicationException.ERROR_CODE;
890     } catch (RemoteException e) {
891       final IOException unwrapped = e.unwrapRemoteException(
892           FileNotFoundException.class,
893           AccessControlException.class,
894           QuotaExceededException.class);
895       System.err.println(StringUtils.stringifyException(unwrapped));
896       return -3;
897     } catch (Exception e) {
898       System.err.println("With failures, global counters are inaccurate; " +
899           "consider running with -i");
900       System.err.println("Copy failed: " + StringUtils.stringifyException(e));
901       return -999;
902     }
903   }
904 
main(String[] args)905   public static void main(String[] args) throws Exception {
906     JobConf job = new JobConf(DistCp.class);
907     DistCp distcp = new DistCp(job);
908     int res = ToolRunner.run(distcp, args);
909     System.exit(res);
910   }
911 
912   /**
913    * Make a path relative with respect to a root path.
914    * absPath is always assumed to descend from root.
915    * Otherwise returned path is null.
916    */
makeRelative(Path root, Path absPath)917   static String makeRelative(Path root, Path absPath) {
918     if (!absPath.isAbsolute()) {
919       throw new IllegalArgumentException("!absPath.isAbsolute(), absPath="
920           + absPath);
921     }
922     String p = absPath.toUri().getPath();
923 
924     StringTokenizer pathTokens = new StringTokenizer(p, "/");
925     for(StringTokenizer rootTokens = new StringTokenizer(
926         root.toUri().getPath(), "/"); rootTokens.hasMoreTokens(); ) {
927       if (!rootTokens.nextToken().equals(pathTokens.nextToken())) {
928         return null;
929       }
930     }
931     StringBuilder sb = new StringBuilder();
932     for(; pathTokens.hasMoreTokens(); ) {
933       sb.append(pathTokens.nextToken());
934       if (pathTokens.hasMoreTokens()) { sb.append(Path.SEPARATOR); }
935     }
936     return sb.length() == 0? ".": sb.toString();
937   }
938 
939   /**
940    * Calculate how many maps to run.
941    * Number of maps is bounded by a minimum of the cumulative size of the
942    * copy / (distcp.bytes.per.map, default BYTES_PER_MAP or -m on the
943    * command line) and at most (distcp.max.map.tasks, default
944    * MAX_MAPS_PER_NODE * nodes in the cluster).
945    * @param totalBytes Count of total bytes for job
946    * @param job The job to configure
947    * @return Count of maps to run.
948    */
setMapCount(long totalBytes, JobConf job)949   private static void setMapCount(long totalBytes, JobConf job)
950       throws IOException {
951     int numMaps =
952       (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP));
953     numMaps = Math.min(numMaps,
954         job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE *
955           new JobClient(job).getClusterStatus().getTaskTrackers()));
956     job.setNumMapTasks(Math.max(numMaps, 1));
957   }
958 
959   /** Fully delete dir */
fullyDelete(String dir, Configuration conf)960   static void fullyDelete(String dir, Configuration conf) throws IOException {
961     if (dir != null) {
962       Path tmp = new Path(dir);
963       tmp.getFileSystem(conf).delete(tmp, true);
964     }
965   }
966 
967   //Job configuration
createJobConf(Configuration conf)968   private static JobConf createJobConf(Configuration conf) {
969     JobConf jobconf = new JobConf(conf, DistCp.class);
970     jobconf.setJobName(NAME);
971 
972     // turn off speculative execution, because DFS doesn't handle
973     // multiple writers to the same file.
974     jobconf.setMapSpeculativeExecution(false);
975 
976     jobconf.setInputFormat(CopyInputFormat.class);
977     jobconf.setOutputKeyClass(Text.class);
978     jobconf.setOutputValueClass(Text.class);
979 
980     jobconf.setMapperClass(CopyFilesMapper.class);
981     jobconf.setNumReduceTasks(0);
982     return jobconf;
983   }
984 
985   private static final Random RANDOM = new Random();
getRandomId()986   public static String getRandomId() {
987     return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
988   }
989 
990   /**
991    * Initialize DFSCopyFileMapper specific job-configuration.
992    * @param conf : The dfs/mapred configuration.
993    * @param jobConf : The handle to the jobConf object to be initialized.
994    * @param args Arguments
995    * @return true if it is necessary to launch a job.
996    */
setup(Configuration conf, JobConf jobConf, final Arguments args)997   private static boolean setup(Configuration conf, JobConf jobConf,
998                             final Arguments args)
999       throws IOException {
1000     jobConf.set(DST_DIR_LABEL, args.dst.toUri().toString());
1001 
1002     //set boolean values
1003     final boolean update = args.flags.contains(Options.UPDATE);
1004     final boolean skipCRCCheck = args.flags.contains(Options.SKIPCRC);
1005     final boolean overwrite = !update && args.flags.contains(Options.OVERWRITE);
1006     jobConf.setBoolean(Options.UPDATE.propertyname, update);
1007     jobConf.setBoolean(Options.SKIPCRC.propertyname, skipCRCCheck);
1008     jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
1009     jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
1010         args.flags.contains(Options.IGNORE_READ_FAILURES));
1011     jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname,
1012         args.flags.contains(Options.PRESERVE_STATUS));
1013 
1014     final String randomId = getRandomId();
1015     JobClient jClient = new JobClient(jobConf);
1016     Path stagingArea;
1017     try {
1018       stagingArea = JobSubmissionFiles.getStagingDir(jClient, conf);
1019     } catch (InterruptedException e) {
1020       throw new IOException(e);
1021     }
1022 
1023     Path jobDirectory = new Path(stagingArea + NAME + "_" + randomId);
1024     FsPermission mapredSysPerms =
1025       new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
1026     // FileSystem.mkdirs(jClient.getFs(), jobDirectory, mapredSysPerms);
1027     FileSystem.mkdirs(FileSystem.get(jobDirectory.toUri(),conf), jobDirectory, mapredSysPerms);
1028     jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
1029 
1030     FileSystem dstfs = args.dst.getFileSystem(conf);
1031 
1032     // get tokens for all the required FileSystems..
1033     TokenCache.obtainTokensForNamenodes(jobConf.getCredentials(),
1034                                         new Path[] {args.dst}, conf);
1035 
1036     boolean dstExists = dstfs.exists(args.dst);
1037     boolean dstIsDir = false;
1038     if (dstExists) {
1039       dstIsDir = dstfs.getFileStatus(args.dst).isDir();
1040     }
1041 
1042     // default logPath
1043     Path logPath = args.log;
1044     if (logPath == null) {
1045       String filename = "_distcp_logs_" + randomId;
1046       if (!dstExists || !dstIsDir) {
1047         Path parent = args.dst.getParent();
1048         if (!dstfs.exists(parent)) {
1049           dstfs.mkdirs(parent);
1050         }
1051         logPath = new Path(parent, filename);
1052       } else {
1053         logPath = new Path(args.dst, filename);
1054       }
1055     }
1056     FileOutputFormat.setOutputPath(jobConf, logPath);
1057 
1058     // create src list, dst list
1059     FileSystem jobfs = jobDirectory.getFileSystem(jobConf);
1060 
1061     Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
1062     jobConf.set(SRC_LIST_LABEL, srcfilelist.toString());
1063     SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs, jobConf,
1064         srcfilelist, LongWritable.class, FilePair.class,
1065         SequenceFile.CompressionType.NONE);
1066 
1067     Path dstfilelist = new Path(jobDirectory, "_distcp_dst_files");
1068     SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs, jobConf,
1069         dstfilelist, Text.class, Text.class,
1070         SequenceFile.CompressionType.NONE);
1071 
1072     Path dstdirlist = new Path(jobDirectory, "_distcp_dst_dirs");
1073     jobConf.set(DST_DIR_LIST_LABEL, dstdirlist.toString());
1074     SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs, jobConf,
1075         dstdirlist, Text.class, FilePair.class,
1076         SequenceFile.CompressionType.NONE);
1077 
1078     // handle the case where the destination directory doesn't exist
1079     // and we've only a single src directory OR we're updating/overwriting
1080     // the contents of the destination directory.
1081     final boolean special =
1082       (args.srcs.size() == 1 && !dstExists) || update || overwrite;
1083     int srcCount = 0, cnsyncf = 0, dirsyn = 0;
1084     long fileCount = 0L, byteCount = 0L, cbsyncs = 0L;
1085     try {
1086       for(Iterator<Path> srcItr = args.srcs.iterator(); srcItr.hasNext(); ) {
1087         final Path src = srcItr.next();
1088         FileSystem srcfs = src.getFileSystem(conf);
1089         FileStatus srcfilestat = srcfs.getFileStatus(src);
1090         Path root = special && srcfilestat.isDir()? src: src.getParent();
1091         if (srcfilestat.isDir()) {
1092           ++srcCount;
1093         }
1094 
1095         Stack<FileStatus> pathstack = new Stack<FileStatus>();
1096         for(pathstack.push(srcfilestat); !pathstack.empty(); ) {
1097           FileStatus cur = pathstack.pop();
1098           FileStatus[] children = srcfs.listStatus(cur.getPath());
1099           for(int i = 0; i < children.length; i++) {
1100             boolean skipfile = false;
1101             final FileStatus child = children[i];
1102             final String dst = makeRelative(root, child.getPath());
1103             ++srcCount;
1104 
1105             if (child.isDir()) {
1106               pathstack.push(child);
1107             }
1108             else {
1109               //skip file if the src and the dst files are the same.
1110               skipfile = update &&
1111                 sameFile(srcfs, child, dstfs,
1112                   new Path(args.dst, dst), skipCRCCheck);
1113               //skip file if it exceed file limit or size limit
1114               skipfile |= fileCount == args.filelimit
1115                           || byteCount + child.getLen() > args.sizelimit;
1116 
1117               if (!skipfile) {
1118                 ++fileCount;
1119                 byteCount += child.getLen();
1120 
1121                 if (LOG.isTraceEnabled()) {
1122                   LOG.trace("adding file " + child.getPath());
1123                 }
1124 
1125                 ++cnsyncf;
1126                 cbsyncs += child.getLen();
1127                 if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
1128                   src_writer.sync();
1129                   dst_writer.sync();
1130                   cnsyncf = 0;
1131                   cbsyncs = 0L;
1132                 }
1133               }
1134             }
1135 
1136             if (!skipfile) {
1137               src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()),
1138                   new FilePair(child, dst));
1139             }
1140 
1141             dst_writer.append(new Text(dst),
1142                 new Text(child.getPath().toString()));
1143           }
1144 
1145           if (cur.isDir()) {
1146             String dst = makeRelative(root, cur.getPath());
1147             dir_writer.append(new Text(dst), new FilePair(cur, dst));
1148             if (++dirsyn > SYNC_FILE_MAX) {
1149               dirsyn = 0;
1150               dir_writer.sync();
1151             }
1152           }
1153         }
1154       }
1155     } finally {
1156       checkAndClose(src_writer);
1157       checkAndClose(dst_writer);
1158       checkAndClose(dir_writer);
1159     }
1160 
1161     FileStatus dststatus = null;
1162     try {
1163       dststatus = dstfs.getFileStatus(args.dst);
1164     } catch(FileNotFoundException fnfe) {
1165       LOG.info(args.dst + " does not exist.");
1166     }
1167 
1168     // create dest path dir if copying > 1 file
1169     if (dststatus == null) {
1170       if (srcCount > 1 && !dstfs.mkdirs(args.dst)) {
1171         throw new IOException("Failed to create" + args.dst);
1172       }
1173     }
1174 
1175     final Path sorted = new Path(jobDirectory, "_distcp_sorted");
1176     checkDuplication(jobfs, dstfilelist, sorted, conf);
1177 
1178     if (dststatus != null && args.flags.contains(Options.DELETE)) {
1179       deleteNonexisting(dstfs, dststatus, sorted,
1180           jobfs, jobDirectory, jobConf, conf);
1181     }
1182 
1183     Path tmpDir = new Path(
1184         (dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
1185         args.dst.getParent(): args.dst, "_distcp_tmp_" + randomId);
1186     jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString());
1187     LOG.info("sourcePathsCount=" + srcCount);
1188     LOG.info("filesToCopyCount=" + fileCount);
1189     LOG.info("bytesToCopyCount=" + StringUtils.humanReadableInt(byteCount));
1190     jobConf.setInt(SRC_COUNT_LABEL, srcCount);
1191     jobConf.setLong(TOTAL_SIZE_LABEL, byteCount);
1192     setMapCount(byteCount, jobConf);
1193     return fileCount > 0;
1194   }
1195 
1196   /**
1197    * Check whether the contents of src and dst are the same.
1198    *
1199    * Return false if dstpath does not exist
1200    *
1201    * If the files have different sizes, return false.
1202    *
1203    * If the files have the same sizes, the file checksums will be compared.
1204    *
1205    * When file checksum is not supported in any of file systems,
1206    * two files are considered as the same if they have the same size.
1207    */
sameFile(FileSystem srcfs, FileStatus srcstatus, FileSystem dstfs, Path dstpath, boolean skipCRCCheck)1208   static private boolean sameFile(FileSystem srcfs, FileStatus srcstatus,
1209       FileSystem dstfs, Path dstpath, boolean skipCRCCheck) throws IOException {
1210     FileStatus dststatus;
1211     try {
1212       dststatus = dstfs.getFileStatus(dstpath);
1213     } catch(FileNotFoundException fnfe) {
1214       return false;
1215     }
1216 
1217     //same length?
1218     if (srcstatus.getLen() != dststatus.getLen()) {
1219       return false;
1220     }
1221 
1222     if (skipCRCCheck) {
1223       LOG.debug("Skipping CRC Check");
1224       return true;
1225     }
1226 
1227     //get src checksum
1228     final FileChecksum srccs;
1229     try {
1230       srccs = srcfs.getFileChecksum(srcstatus.getPath());
1231     } catch(FileNotFoundException fnfe) {
1232       /*
1233        * Two possible cases:
1234        * (1) src existed once but was deleted between the time period that
1235        *     srcstatus was obtained and the try block above.
1236        * (2) srcfs does not support file checksum and (incorrectly) throws
1237        *     FNFE, e.g. some previous versions of HftpFileSystem.
1238        * For case (1), it is okay to return true since src was already deleted.
1239        * For case (2), true should be returned.
1240        */
1241       return true;
1242     }
1243 
1244     //compare checksums
1245     try {
1246       final FileChecksum dstcs = dstfs.getFileChecksum(dststatus.getPath());
1247       //return true if checksum is not supported
1248       //(i.e. some of the checksums is null)
1249       return srccs == null || dstcs == null || srccs.equals(dstcs);
1250     } catch(FileNotFoundException fnfe) {
1251       return false;
1252     }
1253   }
1254 
1255   /** Delete the dst files/dirs which do not exist in src */
deleteNonexisting( FileSystem dstfs, FileStatus dstroot, Path dstsorted, FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf )1256   static private void deleteNonexisting(
1257       FileSystem dstfs, FileStatus dstroot, Path dstsorted,
1258       FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf
1259       ) throws IOException {
1260     if (!dstroot.isDir()) {
1261       throw new IOException("dst must be a directory when option "
1262           + Options.DELETE.cmd + " is set, but dst (= " + dstroot.getPath()
1263           + ") is not a directory.");
1264     }
1265 
1266     //write dst lsr results
1267     final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
1268     final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, jobconf,
1269         dstlsr, Text.class, FileStatus.class,
1270         SequenceFile.CompressionType.NONE);
1271     try {
1272       //do lsr to get all file statuses in dstroot
1273       final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
1274       for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
1275         final FileStatus status = lsrstack.pop();
1276         if (status.isDir()) {
1277           for(FileStatus child : dstfs.listStatus(status.getPath())) {
1278             String relative = makeRelative(dstroot.getPath(), child.getPath());
1279             writer.append(new Text(relative), child);
1280             lsrstack.push(child);
1281           }
1282         }
1283       }
1284     } finally {
1285       checkAndClose(writer);
1286     }
1287 
1288     //sort lsr results
1289     final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted");
1290     SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs,
1291         new Text.Comparator(), Text.class, FileStatus.class, jobconf);
1292     sorter.sort(dstlsr, sortedlsr);
1293 
1294     //compare lsr list and dst list
1295     SequenceFile.Reader lsrin = null;
1296     SequenceFile.Reader dstin = null;
1297     try {
1298       lsrin = new SequenceFile.Reader(jobfs, sortedlsr, jobconf);
1299       dstin = new SequenceFile.Reader(jobfs, dstsorted, jobconf);
1300 
1301       //compare sorted lsr list and sorted dst list
1302       final Text lsrpath = new Text();
1303       final FileStatus lsrstatus = new FileStatus();
1304       final Text dstpath = new Text();
1305       final Text dstfrom = new Text();
1306       final FsShell shell = new FsShell(conf);
1307       final String[] shellargs = {"-rmr", null};
1308 
1309       boolean hasnext = dstin.next(dstpath, dstfrom);
1310       for(; lsrin.next(lsrpath, lsrstatus); ) {
1311         int dst_cmp_lsr = dstpath.compareTo(lsrpath);
1312         for(; hasnext && dst_cmp_lsr < 0; ) {
1313           hasnext = dstin.next(dstpath, dstfrom);
1314           dst_cmp_lsr = dstpath.compareTo(lsrpath);
1315         }
1316 
1317         if (dst_cmp_lsr == 0) {
1318           //lsrpath exists in dst, skip it
1319           hasnext = dstin.next(dstpath, dstfrom);
1320         }
1321         else {
1322           //lsrpath does not exist, delete it
1323           String s = new Path(dstroot.getPath(), lsrpath.toString()).toString();
1324           if (shellargs[1] == null || !isAncestorPath(shellargs[1], s)) {
1325             shellargs[1] = s;
1326             int r = 0;
1327             try {
1328                r = shell.run(shellargs);
1329             } catch(Exception e) {
1330               throw new IOException("Exception from shell.", e);
1331             }
1332             if (r != 0) {
1333               throw new IOException("\"" + shellargs[0] + " " + shellargs[1]
1334                   + "\" returns non-zero value " + r);
1335             }
1336           }
1337         }
1338       }
1339     } finally {
1340       checkAndClose(lsrin);
1341       checkAndClose(dstin);
1342     }
1343   }
1344 
1345   //is x an ancestor path of y?
isAncestorPath(String x, String y)1346   static private boolean isAncestorPath(String x, String y) {
1347     if (!y.startsWith(x)) {
1348       return false;
1349     }
1350     final int len = x.length();
1351     return y.length() == len || y.charAt(len) == Path.SEPARATOR_CHAR;
1352   }
1353 
1354   /** Check whether the file list have duplication. */
checkDuplication(FileSystem fs, Path file, Path sorted, Configuration conf)1355   static private void checkDuplication(FileSystem fs, Path file, Path sorted,
1356     Configuration conf) throws IOException {
1357     SequenceFile.Reader in = null;
1358     try {
1359       SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
1360         new Text.Comparator(), Text.class, Text.class, conf);
1361       sorter.sort(file, sorted);
1362       in = new SequenceFile.Reader(fs, sorted, conf);
1363 
1364       Text prevdst = null, curdst = new Text();
1365       Text prevsrc = null, cursrc = new Text();
1366       for(; in.next(curdst, cursrc); ) {
1367         if (prevdst != null && curdst.equals(prevdst)) {
1368           throw new DuplicationException(
1369             "Invalid input, there are duplicated files in the sources: "
1370             + prevsrc + ", " + cursrc);
1371         }
1372         prevdst = curdst;
1373         curdst = new Text();
1374         prevsrc = cursrc;
1375         cursrc = new Text();
1376       }
1377     }
1378     finally {
1379       checkAndClose(in);
1380     }
1381   }
1382 
checkAndClose(java.io.Closeable io)1383   static boolean checkAndClose(java.io.Closeable io) {
1384     if (io != null) {
1385       try {
1386         io.close();
1387       }
1388       catch(IOException ioe) {
1389         LOG.warn(StringUtils.stringifyException(ioe));
1390         return false;
1391       }
1392     }
1393     return true;
1394   }
1395 
1396   /** An exception class for duplicated source files. */
1397   public static class DuplicationException extends IOException {
1398     private static final long serialVersionUID = 1L;
1399     /** Error code for this exception */
1400     public static final int ERROR_CODE = -2;
DuplicationException(String message)1401     DuplicationException(String message) {super(message);}
1402   }
1403 }
1404