1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.fs.shell;
19 
20 import java.io.ByteArrayOutputStream;
21 import java.io.EOFException;
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.util.LinkedList;
26 import java.util.zip.GZIPInputStream;
27 
28 import org.apache.avro.Schema;
29 import org.apache.avro.file.DataFileReader;
30 import org.apache.avro.file.FileReader;
31 import org.apache.avro.generic.GenericDatumReader;
32 import org.apache.avro.generic.GenericDatumWriter;
33 import org.apache.avro.io.DatumWriter;
34 import org.apache.avro.io.EncoderFactory;
35 import org.apache.avro.io.JsonEncoder;
36 import org.apache.commons.io.Charsets;
37 import org.apache.hadoop.classification.InterfaceAudience;
38 import org.apache.hadoop.classification.InterfaceStability;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.fs.AvroFSInput;
41 import org.apache.hadoop.fs.FSDataInputStream;
42 import org.apache.hadoop.fs.FileChecksum;
43 import org.apache.hadoop.fs.FileContext;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.fs.PathIsDirectoryException;
47 import org.apache.hadoop.io.DataInputBuffer;
48 import org.apache.hadoop.io.DataOutputBuffer;
49 import org.apache.hadoop.io.IOUtils;
50 import org.apache.hadoop.io.SequenceFile;
51 import org.apache.hadoop.io.Writable;
52 import org.apache.hadoop.io.WritableComparable;
53 import org.apache.hadoop.io.compress.CompressionCodec;
54 import org.apache.hadoop.io.compress.CompressionCodecFactory;
55 import org.apache.hadoop.util.ReflectionUtils;
56 import org.apache.hadoop.util.StringUtils;
57 import org.codehaus.jackson.JsonEncoding;
58 import org.codehaus.jackson.JsonFactory;
59 import org.codehaus.jackson.JsonGenerator;
60 import org.codehaus.jackson.util.MinimalPrettyPrinter;
61 
62 /**
63  * Display contents or checksums of files
64  */
65 @InterfaceAudience.Private
66 @InterfaceStability.Evolving
67 
68 class Display extends FsCommand {
registerCommands(CommandFactory factory)69   public static void registerCommands(CommandFactory factory) {
70     factory.addClass(Cat.class, "-cat");
71     factory.addClass(Text.class, "-text");
72     factory.addClass(Checksum.class, "-checksum");
73   }
74 
75   /**
76    * Displays file content to stdout
77    */
78   public static class Cat extends Display {
79     public static final String NAME = "cat";
80     public static final String USAGE = "[-ignoreCrc] <src> ...";
81     public static final String DESCRIPTION =
82       "Fetch all files that match the file pattern <src> " +
83       "and display their content on stdout.\n";
84 
85     private boolean verifyChecksum = true;
86 
87     @Override
processOptions(LinkedList<String> args)88     protected void processOptions(LinkedList<String> args)
89     throws IOException {
90       CommandFormat cf = new CommandFormat(1, Integer.MAX_VALUE, "ignoreCrc");
91       cf.parse(args);
92       verifyChecksum = !cf.getOpt("ignoreCrc");
93     }
94 
95     @Override
processPath(PathData item)96     protected void processPath(PathData item) throws IOException {
97       if (item.stat.isDirectory()) {
98         throw new PathIsDirectoryException(item.toString());
99       }
100 
101       item.fs.setVerifyChecksum(verifyChecksum);
102       printToStdout(getInputStream(item));
103     }
104 
printToStdout(InputStream in)105     private void printToStdout(InputStream in) throws IOException {
106       try {
107         IOUtils.copyBytes(in, out, getConf(), false);
108       } finally {
109         in.close();
110       }
111     }
112 
getInputStream(PathData item)113     protected InputStream getInputStream(PathData item) throws IOException {
114       return item.fs.open(item.path);
115     }
116   }
117 
118   /**
119    * Same behavior as "-cat", but handles zip and TextRecordInputStream
120    * and Avro encodings.
121    */
122   public static class Text extends Cat {
123     public static final String NAME = "text";
124     public static final String USAGE = Cat.USAGE;
125     public static final String DESCRIPTION =
126       "Takes a source file and outputs the file in text format.\n" +
127       "The allowed formats are zip and TextRecordInputStream and Avro.";
128 
129     @Override
getInputStream(PathData item)130     protected InputStream getInputStream(PathData item) throws IOException {
131       FSDataInputStream i = (FSDataInputStream)super.getInputStream(item);
132 
133       // Handle 0 and 1-byte files
134       short leadBytes;
135       try {
136         leadBytes = i.readShort();
137       } catch (EOFException e) {
138         i.seek(0);
139         return i;
140       }
141 
142       // Check type of stream first
143       switch(leadBytes) {
144         case 0x1f8b: { // RFC 1952
145           // Must be gzip
146           i.seek(0);
147           return new GZIPInputStream(i);
148         }
149         case 0x5345: { // 'S' 'E'
150           // Might be a SequenceFile
151           if (i.readByte() == 'Q') {
152             i.close();
153             return new TextRecordInputStream(item.stat);
154           }
155         }
156         default: {
157           // Check the type of compression instead, depending on Codec class's
158           // own detection methods, based on the provided path.
159           CompressionCodecFactory cf = new CompressionCodecFactory(getConf());
160           CompressionCodec codec = cf.getCodec(item.path);
161           if (codec != null) {
162             i.seek(0);
163             return codec.createInputStream(i);
164           }
165           break;
166         }
167         case 0x4f62: { // 'O' 'b'
168           if (i.readByte() == 'j') {
169             i.close();
170             return new AvroFileInputStream(item.stat);
171           }
172           break;
173         }
174       }
175 
176       // File is non-compressed, or not a file container we know.
177       i.seek(0);
178       return i;
179     }
180   }
181 
182   public static class Checksum extends Display {
183     public static final String NAME = "checksum";
184     public static final String USAGE = "<src> ...";
185     public static final String DESCRIPTION =
186       "Dump checksum information for files that match the file " +
187       "pattern <src> to stdout. Note that this requires a round-trip " +
188       "to a datanode storing each block of the file, and thus is not " +
189       "efficient to run on a large number of files. The checksum of a " +
190       "file depends on its content, block size and the checksum " +
191       "algorithm and parameters used for creating the file.";
192 
193     @Override
processPath(PathData item)194     protected void processPath(PathData item) throws IOException {
195       if (item.stat.isDirectory()) {
196         throw new PathIsDirectoryException(item.toString());
197       }
198 
199       FileChecksum checksum = item.fs.getFileChecksum(item.path);
200       if (checksum == null) {
201         out.printf("%s\tNONE\t%n", item.toString());
202       } else {
203         String checksumString = StringUtils.byteToHexString(
204             checksum.getBytes(), 0, checksum.getLength());
205         out.printf("%s\t%s\t%s%n",
206             item.toString(), checksum.getAlgorithmName(),
207             checksumString);
208       }
209     }
210   }
211 
212   protected class TextRecordInputStream extends InputStream {
213     SequenceFile.Reader r;
214     WritableComparable<?> key;
215     Writable val;
216 
217     DataInputBuffer inbuf;
218     DataOutputBuffer outbuf;
219 
TextRecordInputStream(FileStatus f)220     public TextRecordInputStream(FileStatus f) throws IOException {
221       final Path fpath = f.getPath();
222       final Configuration lconf = getConf();
223       r = new SequenceFile.Reader(lconf,
224           SequenceFile.Reader.file(fpath));
225       key = ReflectionUtils.newInstance(
226           r.getKeyClass().asSubclass(WritableComparable.class), lconf);
227       val = ReflectionUtils.newInstance(
228           r.getValueClass().asSubclass(Writable.class), lconf);
229       inbuf = new DataInputBuffer();
230       outbuf = new DataOutputBuffer();
231     }
232 
233     @Override
read()234     public int read() throws IOException {
235       int ret;
236       if (null == inbuf || -1 == (ret = inbuf.read())) {
237         if (!r.next(key, val)) {
238           return -1;
239         }
240         byte[] tmp = key.toString().getBytes(Charsets.UTF_8);
241         outbuf.write(tmp, 0, tmp.length);
242         outbuf.write('\t');
243         tmp = val.toString().getBytes(Charsets.UTF_8);
244         outbuf.write(tmp, 0, tmp.length);
245         outbuf.write('\n');
246         inbuf.reset(outbuf.getData(), outbuf.getLength());
247         outbuf.reset();
248         ret = inbuf.read();
249       }
250       return ret;
251     }
252 
253     @Override
close()254     public void close() throws IOException {
255       r.close();
256       super.close();
257     }
258   }
259 
260   /**
261    * This class transforms a binary Avro data file into an InputStream
262    * with data that is in a human readable JSON format.
263    */
264   protected static class AvroFileInputStream extends InputStream {
265     private int pos;
266     private byte[] buffer;
267     private ByteArrayOutputStream output;
268     private FileReader<?> fileReader;
269     private DatumWriter<Object> writer;
270     private JsonEncoder encoder;
271 
AvroFileInputStream(FileStatus status)272     public AvroFileInputStream(FileStatus status) throws IOException {
273       pos = 0;
274       buffer = new byte[0];
275       GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
276       FileContext fc = FileContext.getFileContext(new Configuration());
277       fileReader =
278         DataFileReader.openReader(new AvroFSInput(fc, status.getPath()),reader);
279       Schema schema = fileReader.getSchema();
280       writer = new GenericDatumWriter<Object>(schema);
281       output = new ByteArrayOutputStream();
282       JsonGenerator generator =
283         new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8);
284       MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter();
285       prettyPrinter.setRootValueSeparator(System.getProperty("line.separator"));
286       generator.setPrettyPrinter(prettyPrinter);
287       encoder = EncoderFactory.get().jsonEncoder(schema, generator);
288     }
289 
290     /**
291      * Read a single byte from the stream.
292      */
293     @Override
read()294     public int read() throws IOException {
295       if (pos < buffer.length) {
296         return buffer[pos++];
297       }
298       if (!fileReader.hasNext()) {
299         return -1;
300       }
301       writer.write(fileReader.next(), encoder);
302       encoder.flush();
303       if (!fileReader.hasNext()) {
304         // Write a new line after the last Avro record.
305         output.write(System.getProperty("line.separator")
306                          .getBytes(Charsets.UTF_8));
307         output.flush();
308       }
309       pos = 0;
310       buffer = output.toByteArray();
311       output.reset();
312       return read();
313     }
314 
315     /**
316       * Close the stream.
317       */
318     @Override
close()319     public void close() throws IOException {
320       fileReader.close();
321       output.close();
322       super.close();
323     }
324   }
325 }
326