1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.tools;
19 
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Stack;
26 
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.fs.permission.FsPermission;
32 import org.apache.hadoop.io.SequenceFile;
33 import org.apache.hadoop.io.Text;
34 import org.apache.hadoop.io.Writable;
35 import org.apache.hadoop.io.WritableComparable;
36 import org.apache.hadoop.mapred.FileOutputFormat;
37 import org.apache.hadoop.mapred.FileSplit;
38 import org.apache.hadoop.mapred.InputFormat;
39 import org.apache.hadoop.mapred.InputSplit;
40 import org.apache.hadoop.mapred.InvalidInputException;
41 import org.apache.hadoop.mapred.JobClient;
42 import org.apache.hadoop.mapred.JobConf;
43 import org.apache.hadoop.mapred.Mapper;
44 import org.apache.hadoop.mapred.OutputCollector;
45 import org.apache.hadoop.mapred.RecordReader;
46 import org.apache.hadoop.mapred.Reporter;
47 import org.apache.hadoop.mapred.SequenceFileRecordReader;
48 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
49 import org.apache.hadoop.util.StringUtils;
50 import org.apache.hadoop.util.ToolRunner;
51 
52 /**
53  * A Map-reduce program to recursively change files properties
54  * such as owner, group and permission.
55  */
56 public class DistCh extends DistTool {
57   static final String NAME = "distch";
58   static final String JOB_DIR_LABEL = NAME + ".job.dir";
59   static final String OP_LIST_LABEL = NAME + ".op.list";
60   static final String OP_COUNT_LABEL = NAME + ".op.count";
61 
62   static final String USAGE = "java " + DistCh.class.getName()
63       + " [OPTIONS] <path:owner:group:permission>+ "
64 
65       + "\n\nThe values of owner, group and permission can be empty."
66       + "\nPermission is a octal number."
67 
68       + "\n\nOPTIONS:"
69       + "\n-f <urilist_uri>       Use list at <urilist_uri> as src list"
70       + "\n-i                     Ignore failures"
71       + "\n-log <logdir>          Write logs to <logdir>"
72       ;
73 
74   private static final long OP_PER_MAP =  1000;
75   private static final int MAX_MAPS_PER_NODE = 20;
76   private static final int SYNC_FILE_MAX = 10;
77 
78   static enum Counter { SUCCEED, FAIL }
79 
80   static enum Option {
81     IGNORE_FAILURES("-i", NAME + ".ignore.failures");
82 
83     final String cmd, propertyname;
84 
Option(String cmd, String propertyname)85     private Option(String cmd, String propertyname) {
86       this.cmd = cmd;
87       this.propertyname = propertyname;
88     }
89   }
90 
DistCh(Configuration conf)91   DistCh(Configuration conf) {
92     super(createJobConf(conf));
93   }
94 
createJobConf(Configuration conf)95   private static JobConf createJobConf(Configuration conf) {
96     JobConf jobconf = new JobConf(conf, DistCh.class);
97     jobconf.setJobName(NAME);
98     jobconf.setMapSpeculativeExecution(false);
99 
100     jobconf.setInputFormat(ChangeInputFormat.class);
101     jobconf.setOutputKeyClass(Text.class);
102     jobconf.setOutputValueClass(Text.class);
103 
104     jobconf.setMapperClass(ChangeFilesMapper.class);
105     jobconf.setNumReduceTasks(0);
106     return jobconf;
107   }
108 
109   /** File operations. */
110   static class FileOperation implements Writable {
111     private Path src;
112     private String owner;
113     private String group;
114     private FsPermission permission;
115 
FileOperation()116     FileOperation() {}
117 
FileOperation(Path src, FileOperation that)118     FileOperation(Path src, FileOperation that) {
119       this.src = src;
120       this.owner = that.owner;
121       this.group = that.group;
122       this.permission = that.permission;
123       checkState();
124     }
125 
126     /**
127      * path:owner:group:permission
128      * e.g.
129      * /user/foo:foo:bar:700
130      */
FileOperation(String line)131     FileOperation(String line) {
132       try {
133         String[] t = line.split(":", 4);
134         for(int i = 0; i < t.length; i++) {
135           if ("".equals(t[i])) {
136             t[i] = null;
137           }
138         }
139 
140         src = new Path(t[0]);
141         owner = t[1];
142         group = t[2];
143         permission = t[3] == null? null:
144           new FsPermission(Short.parseShort(t[3], 8));
145 
146         checkState();
147       }
148       catch(Exception e) {
149         throw (IllegalArgumentException)new IllegalArgumentException(
150             "line=" + line).initCause(e);
151       }
152     }
153 
checkState()154     private void checkState() throws IllegalStateException {
155       if (owner == null && group == null && permission == null) {
156         throw new IllegalStateException(
157             "owner == null && group == null && permission == null");
158       }
159     }
160 
161     static final FsPermission FILE_UMASK
162         = FsPermission.createImmutable((short)0111);
163 
isDifferent(FileStatus original)164     private boolean isDifferent(FileStatus original) {
165       if (owner != null && !owner.equals(original.getOwner())) {
166         return true;
167       }
168       if (group != null && !group.equals(original.getGroup())) {
169         return true;
170       }
171       if (permission != null) {
172         FsPermission orig = original.getPermission();
173         return original.isDirectory()? !permission.equals(orig):
174           !permission.applyUMask(FILE_UMASK).equals(orig);
175       }
176       return false;
177     }
178 
run(Configuration conf)179     void run(Configuration conf) throws IOException {
180       FileSystem fs = src.getFileSystem(conf);
181       if (permission != null) {
182         fs.setPermission(src, permission);
183       }
184       if (owner != null || group != null) {
185         fs.setOwner(src, owner, group);
186       }
187     }
188 
189     /** {@inheritDoc} */
readFields(DataInput in)190     public void readFields(DataInput in) throws IOException {
191       this.src = new Path(Text.readString(in));
192       owner = DistTool.readString(in);
193       group = DistTool.readString(in);
194       permission = in.readBoolean()? FsPermission.read(in): null;
195     }
196 
197     /** {@inheritDoc} */
write(DataOutput out)198     public void write(DataOutput out) throws IOException {
199       Text.writeString(out, src.toString());
200       DistTool.writeString(out, owner);
201       DistTool.writeString(out, group);
202 
203       boolean b = permission != null;
204       out.writeBoolean(b);
205       if (b) {permission.write(out);}
206     }
207 
208     /** {@inheritDoc} */
toString()209     public String toString() {
210       return src + ":" + owner + ":" + group + ":" + permission;
211     }
212   }
213 
214   /** Responsible for generating splits of the src file list. */
215   static class ChangeInputFormat implements InputFormat<Text, FileOperation> {
216     /** Do nothing. */
validateInput(JobConf job)217     public void validateInput(JobConf job) {}
218 
219     /**
220      * Produce splits such that each is no greater than the quotient of the
221      * total size and the number of splits requested.
222      * @param job The handle to the JobConf object
223      * @param numSplits Number of splits requested
224      */
getSplits(JobConf job, int numSplits )225     public InputSplit[] getSplits(JobConf job, int numSplits
226         ) throws IOException {
227       final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
228       final int targetcount = srcCount / numSplits;
229       String srclist = job.get(OP_LIST_LABEL, "");
230       if (srcCount < 0 || "".equals(srclist)) {
231         throw new RuntimeException("Invalid metadata: #files(" + srcCount +
232                                    ") listuri(" + srclist + ")");
233       }
234       Path srcs = new Path(srclist);
235       FileSystem fs = srcs.getFileSystem(job);
236 
237       List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
238 
239       Text key = new Text();
240       FileOperation value = new FileOperation();
241       long prev = 0L;
242       int count = 0; //count src
243       try (SequenceFile.Reader in = new SequenceFile.Reader(fs, srcs, job)) {
244         for ( ; in.next(key, value); ) {
245           long curr = in.getPosition();
246           long delta = curr - prev;
247           if (++count > targetcount) {
248             count = 0;
249             splits.add(new FileSplit(srcs, prev, delta, (String[])null));
250             prev = curr;
251           }
252         }
253       }
254       long remaining = fs.getFileStatus(srcs).getLen() - prev;
255       if (remaining != 0) {
256         splits.add(new FileSplit(srcs, prev, remaining, (String[])null));
257       }
258       LOG.info("numSplits="  + numSplits + ", splits.size()=" + splits.size());
259       return splits.toArray(new FileSplit[splits.size()]);
260     }
261 
262     /** {@inheritDoc} */
getRecordReader(InputSplit split, JobConf job, Reporter reporter)263     public RecordReader<Text, FileOperation> getRecordReader(InputSplit split,
264         JobConf job, Reporter reporter) throws IOException {
265       return new SequenceFileRecordReader<Text, FileOperation>(job,
266           (FileSplit)split);
267     }
268   }
269 
270   /** The mapper for changing files. */
271   static class ChangeFilesMapper
272       implements Mapper<Text, FileOperation, WritableComparable<?>, Text> {
273     private JobConf jobconf;
274     private boolean ignoreFailures;
275 
276     private int failcount = 0;
277     private int succeedcount = 0;
278 
getCountString()279     private String getCountString() {
280       return "Succeeded: " + succeedcount + " Failed: " + failcount;
281     }
282 
283     /** {@inheritDoc} */
configure(JobConf job)284     public void configure(JobConf job) {
285       this.jobconf = job;
286       ignoreFailures=job.getBoolean(Option.IGNORE_FAILURES.propertyname,false);
287     }
288 
289     /** Run a FileOperation */
map(Text key, FileOperation value, OutputCollector<WritableComparable<?>, Text> out, Reporter reporter )290     public void map(Text key, FileOperation value,
291         OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
292         ) throws IOException {
293       try {
294         value.run(jobconf);
295         ++succeedcount;
296         reporter.incrCounter(Counter.SUCCEED, 1);
297       } catch (IOException e) {
298         ++failcount;
299         reporter.incrCounter(Counter.FAIL, 1);
300 
301         String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
302         out.collect(null, new Text(s));
303         LOG.info(s);
304       } finally {
305         reporter.setStatus(getCountString());
306       }
307     }
308 
309     /** {@inheritDoc} */
close()310     public void close() throws IOException {
311       if (failcount == 0 || ignoreFailures) {
312         return;
313       }
314       throw new IOException(getCountString());
315     }
316   }
317 
check(Configuration conf, List<FileOperation> ops )318   private static void check(Configuration conf, List<FileOperation> ops
319       ) throws InvalidInputException {
320     List<Path> srcs = new ArrayList<Path>();
321     for(FileOperation op : ops) {
322       srcs.add(op.src);
323     }
324     DistTool.checkSource(conf, srcs);
325   }
326 
fetchList(Configuration conf, Path inputfile )327   private static List<FileOperation> fetchList(Configuration conf, Path inputfile
328       ) throws IOException {
329     List<FileOperation> result = new ArrayList<FileOperation>();
330     for(String line : readFile(conf, inputfile)) {
331       result.add(new FileOperation(line));
332     }
333     return result;
334   }
335 
336   /** This is the main driver for recursively changing files properties. */
run(String[] args)337   public int run(String[] args) throws Exception {
338     List<FileOperation> ops = new ArrayList<FileOperation>();
339     Path logpath = null;
340     boolean isIgnoreFailures = false;
341 
342     try {
343       for (int idx = 0; idx < args.length; idx++) {
344         if ("-f".equals(args[idx])) {
345           if (++idx ==  args.length) {
346             System.out.println("urilist_uri not specified");
347             System.out.println(USAGE);
348             return -1;
349           }
350           ops.addAll(fetchList(jobconf, new Path(args[idx])));
351         } else if (Option.IGNORE_FAILURES.cmd.equals(args[idx])) {
352           isIgnoreFailures = true;
353         } else if ("-log".equals(args[idx])) {
354           if (++idx ==  args.length) {
355             System.out.println("logdir not specified");
356             System.out.println(USAGE);
357             return -1;
358           }
359           logpath = new Path(args[idx]);
360         } else if ('-' == args[idx].codePointAt(0)) {
361           System.out.println("Invalid switch " + args[idx]);
362           System.out.println(USAGE);
363           ToolRunner.printGenericCommandUsage(System.out);
364           return -1;
365         } else {
366           ops.add(new FileOperation(args[idx]));
367         }
368       }
369       // mandatory command-line parameters
370       if (ops.isEmpty()) {
371         throw new IllegalStateException("Operation is empty");
372       }
373       LOG.info("ops=" + ops);
374       LOG.info("isIgnoreFailures=" + isIgnoreFailures);
375       jobconf.setBoolean(Option.IGNORE_FAILURES.propertyname, isIgnoreFailures);
376       check(jobconf, ops);
377 
378       try {
379         if (setup(ops, logpath)) {
380           JobClient.runJob(jobconf);
381         }
382       } finally {
383         try {
384           if (logpath == null) {
385             //delete log directory
386             final Path logdir = FileOutputFormat.getOutputPath(jobconf);
387             if (logdir != null) {
388               logdir.getFileSystem(jobconf).delete(logdir, true);
389             }
390           }
391         }
392         finally {
393           //delete job directory
394           final String jobdir = jobconf.get(JOB_DIR_LABEL);
395           if (jobdir != null) {
396             final Path jobpath = new Path(jobdir);
397             jobpath.getFileSystem(jobconf).delete(jobpath, true);
398           }
399         }
400       }
401     } catch(DuplicationException e) {
402       LOG.error("Input error:", e);
403       return DuplicationException.ERROR_CODE;
404     } catch(Exception e) {
405       LOG.error(NAME + " failed: ", e);
406       System.out.println(USAGE);
407       ToolRunner.printGenericCommandUsage(System.out);
408       return -1;
409     }
410     return 0;
411   }
412 
413   /** Calculate how many maps to run. */
getMapCount(int srcCount, int numNodes)414   private static int getMapCount(int srcCount, int numNodes) {
415     int numMaps = (int)(srcCount / OP_PER_MAP);
416     numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
417     return Math.max(numMaps, 1);
418   }
419 
setup(List<FileOperation> ops, Path log)420   private boolean setup(List<FileOperation> ops, Path log)
421   throws IOException {
422     final String randomId = getRandomId();
423     JobClient jClient = new JobClient(jobconf);
424     Path stagingArea;
425     try {
426       stagingArea = JobSubmissionFiles.getStagingDir(
427                        jClient.getClusterHandle(), jobconf);
428     } catch (InterruptedException ie){
429       throw new IOException(ie);
430     }
431     Path jobdir = new Path(stagingArea + NAME + "_" + randomId);
432     FsPermission mapredSysPerms =
433       new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
434     FileSystem.mkdirs(jClient.getFs(), jobdir, mapredSysPerms);
435     LOG.info(JOB_DIR_LABEL + "=" + jobdir);
436 
437     if (log == null) {
438       log = new Path(jobdir, "_logs");
439     }
440     FileOutputFormat.setOutputPath(jobconf, log);
441     LOG.info("log=" + log);
442 
443     //create operation list
444     FileSystem fs = jobdir.getFileSystem(jobconf);
445     Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
446     jobconf.set(OP_LIST_LABEL, opList.toString());
447     int opCount = 0, synCount = 0;
448     try (SequenceFile.Writer opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
449             FileOperation.class, SequenceFile.CompressionType.NONE)) {
450       for(FileOperation op : ops) {
451         FileStatus srcstat = fs.getFileStatus(op.src);
452         if (srcstat.isDirectory() && op.isDifferent(srcstat)) {
453           ++opCount;
454           opWriter.append(new Text(op.src.toString()), op);
455         }
456 
457         Stack<Path> pathstack = new Stack<Path>();
458         for(pathstack.push(op.src); !pathstack.empty(); ) {
459           for(FileStatus stat : fs.listStatus(pathstack.pop())) {
460             if (stat.isDirectory()) {
461               pathstack.push(stat.getPath());
462             }
463 
464             if (op.isDifferent(stat)) {
465               ++opCount;
466               if (++synCount > SYNC_FILE_MAX) {
467                 opWriter.sync();
468                 synCount = 0;
469               }
470               Path f = stat.getPath();
471               opWriter.append(new Text(f.toString()), new FileOperation(f, op));
472             }
473           }
474         }
475       }
476     }
477 
478     checkDuplication(fs, opList, new Path(jobdir, "_sorted"), jobconf);
479     jobconf.setInt(OP_COUNT_LABEL, opCount);
480     LOG.info(OP_COUNT_LABEL + "=" + opCount);
481     jobconf.setNumMapTasks(getMapCount(opCount,
482         new JobClient(jobconf).getClusterStatus().getTaskTrackers()));
483     return opCount != 0;
484   }
485 
checkDuplication(FileSystem fs, Path file, Path sorted, Configuration conf)486   private static void checkDuplication(FileSystem fs, Path file, Path sorted,
487     Configuration conf) throws IOException {
488     SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
489         new Text.Comparator(), Text.class, FileOperation.class, conf);
490     sorter.sort(file, sorted);
491     try (SequenceFile.Reader in = new SequenceFile.Reader(fs, sorted, conf)) {
492       FileOperation curop = new FileOperation();
493       Text prevsrc = null, cursrc = new Text();
494       for(; in.next(cursrc, curop); ) {
495         if (prevsrc != null && cursrc.equals(prevsrc)) {
496           throw new DuplicationException(
497             "Invalid input, there are duplicated files in the sources: "
498             + prevsrc + ", " + cursrc);
499         }
500         prevsrc = cursrc;
501         cursrc = new Text();
502         curop = new FileOperation();
503       }
504     }
505   }
506 
main(String[] args)507   public static void main(String[] args) throws Exception {
508     System.exit(ToolRunner.run(new DistCh(new Configuration()), args));
509   }
510 }