1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.fs.shell; 19 20 import java.io.ByteArrayOutputStream; 21 import java.io.EOFException; 22 import java.io.File; 23 import java.io.IOException; 24 import java.io.InputStream; 25 import java.util.LinkedList; 26 import java.util.zip.GZIPInputStream; 27 28 import org.apache.avro.Schema; 29 import org.apache.avro.file.DataFileReader; 30 import org.apache.avro.file.FileReader; 31 import org.apache.avro.generic.GenericDatumReader; 32 import org.apache.avro.generic.GenericDatumWriter; 33 import org.apache.avro.io.DatumWriter; 34 import org.apache.avro.io.EncoderFactory; 35 import org.apache.avro.io.JsonEncoder; 36 import org.apache.commons.io.Charsets; 37 import org.apache.hadoop.classification.InterfaceAudience; 38 import org.apache.hadoop.classification.InterfaceStability; 39 import org.apache.hadoop.conf.Configuration; 40 import org.apache.hadoop.fs.AvroFSInput; 41 import org.apache.hadoop.fs.FSDataInputStream; 42 import org.apache.hadoop.fs.FileChecksum; 43 import org.apache.hadoop.fs.FileContext; 44 import org.apache.hadoop.fs.FileStatus; 45 import org.apache.hadoop.fs.Path; 46 import org.apache.hadoop.fs.PathIsDirectoryException; 47 import org.apache.hadoop.io.DataInputBuffer; 48 import org.apache.hadoop.io.DataOutputBuffer; 49 import org.apache.hadoop.io.IOUtils; 50 import org.apache.hadoop.io.SequenceFile; 51 import org.apache.hadoop.io.Writable; 52 import org.apache.hadoop.io.WritableComparable; 53 import org.apache.hadoop.io.compress.CompressionCodec; 54 import org.apache.hadoop.io.compress.CompressionCodecFactory; 55 import org.apache.hadoop.util.ReflectionUtils; 56 import org.apache.hadoop.util.StringUtils; 57 import org.codehaus.jackson.JsonEncoding; 58 import org.codehaus.jackson.JsonFactory; 59 import org.codehaus.jackson.JsonGenerator; 60 import org.codehaus.jackson.util.MinimalPrettyPrinter; 61 62 /** 63 * Display contents or checksums of files 64 */ 65 @InterfaceAudience.Private 66 @InterfaceStability.Evolving 67 68 class Display extends FsCommand { registerCommands(CommandFactory factory)69 public static void registerCommands(CommandFactory factory) { 70 factory.addClass(Cat.class, "-cat"); 71 factory.addClass(Text.class, "-text"); 72 factory.addClass(Checksum.class, "-checksum"); 73 } 74 75 /** 76 * Displays file content to stdout 77 */ 78 public static class Cat extends Display { 79 public static final String NAME = "cat"; 80 public static final String USAGE = "[-ignoreCrc] <src> ..."; 81 public static final String DESCRIPTION = 82 "Fetch all files that match the file pattern <src> " + 83 "and display their content on stdout.\n"; 84 85 private boolean verifyChecksum = true; 86 87 @Override processOptions(LinkedList<String> args)88 protected void processOptions(LinkedList<String> args) 89 throws IOException { 90 CommandFormat cf = new CommandFormat(1, Integer.MAX_VALUE, "ignoreCrc"); 91 cf.parse(args); 92 verifyChecksum = !cf.getOpt("ignoreCrc"); 93 } 94 95 @Override processPath(PathData item)96 protected void processPath(PathData item) throws IOException { 97 if (item.stat.isDirectory()) { 98 throw new PathIsDirectoryException(item.toString()); 99 } 100 101 item.fs.setVerifyChecksum(verifyChecksum); 102 printToStdout(getInputStream(item)); 103 } 104 printToStdout(InputStream in)105 private void printToStdout(InputStream in) throws IOException { 106 try { 107 IOUtils.copyBytes(in, out, getConf(), false); 108 } finally { 109 in.close(); 110 } 111 } 112 getInputStream(PathData item)113 protected InputStream getInputStream(PathData item) throws IOException { 114 return item.fs.open(item.path); 115 } 116 } 117 118 /** 119 * Same behavior as "-cat", but handles zip and TextRecordInputStream 120 * and Avro encodings. 121 */ 122 public static class Text extends Cat { 123 public static final String NAME = "text"; 124 public static final String USAGE = Cat.USAGE; 125 public static final String DESCRIPTION = 126 "Takes a source file and outputs the file in text format.\n" + 127 "The allowed formats are zip and TextRecordInputStream and Avro."; 128 129 @Override getInputStream(PathData item)130 protected InputStream getInputStream(PathData item) throws IOException { 131 FSDataInputStream i = (FSDataInputStream)super.getInputStream(item); 132 133 // Handle 0 and 1-byte files 134 short leadBytes; 135 try { 136 leadBytes = i.readShort(); 137 } catch (EOFException e) { 138 i.seek(0); 139 return i; 140 } 141 142 // Check type of stream first 143 switch(leadBytes) { 144 case 0x1f8b: { // RFC 1952 145 // Must be gzip 146 i.seek(0); 147 return new GZIPInputStream(i); 148 } 149 case 0x5345: { // 'S' 'E' 150 // Might be a SequenceFile 151 if (i.readByte() == 'Q') { 152 i.close(); 153 return new TextRecordInputStream(item.stat); 154 } 155 } 156 default: { 157 // Check the type of compression instead, depending on Codec class's 158 // own detection methods, based on the provided path. 159 CompressionCodecFactory cf = new CompressionCodecFactory(getConf()); 160 CompressionCodec codec = cf.getCodec(item.path); 161 if (codec != null) { 162 i.seek(0); 163 return codec.createInputStream(i); 164 } 165 break; 166 } 167 case 0x4f62: { // 'O' 'b' 168 if (i.readByte() == 'j') { 169 i.close(); 170 return new AvroFileInputStream(item.stat); 171 } 172 break; 173 } 174 } 175 176 // File is non-compressed, or not a file container we know. 177 i.seek(0); 178 return i; 179 } 180 } 181 182 public static class Checksum extends Display { 183 public static final String NAME = "checksum"; 184 public static final String USAGE = "<src> ..."; 185 public static final String DESCRIPTION = 186 "Dump checksum information for files that match the file " + 187 "pattern <src> to stdout. Note that this requires a round-trip " + 188 "to a datanode storing each block of the file, and thus is not " + 189 "efficient to run on a large number of files. The checksum of a " + 190 "file depends on its content, block size and the checksum " + 191 "algorithm and parameters used for creating the file."; 192 193 @Override processPath(PathData item)194 protected void processPath(PathData item) throws IOException { 195 if (item.stat.isDirectory()) { 196 throw new PathIsDirectoryException(item.toString()); 197 } 198 199 FileChecksum checksum = item.fs.getFileChecksum(item.path); 200 if (checksum == null) { 201 out.printf("%s\tNONE\t%n", item.toString()); 202 } else { 203 String checksumString = StringUtils.byteToHexString( 204 checksum.getBytes(), 0, checksum.getLength()); 205 out.printf("%s\t%s\t%s%n", 206 item.toString(), checksum.getAlgorithmName(), 207 checksumString); 208 } 209 } 210 } 211 212 protected class TextRecordInputStream extends InputStream { 213 SequenceFile.Reader r; 214 WritableComparable<?> key; 215 Writable val; 216 217 DataInputBuffer inbuf; 218 DataOutputBuffer outbuf; 219 TextRecordInputStream(FileStatus f)220 public TextRecordInputStream(FileStatus f) throws IOException { 221 final Path fpath = f.getPath(); 222 final Configuration lconf = getConf(); 223 r = new SequenceFile.Reader(lconf, 224 SequenceFile.Reader.file(fpath)); 225 key = ReflectionUtils.newInstance( 226 r.getKeyClass().asSubclass(WritableComparable.class), lconf); 227 val = ReflectionUtils.newInstance( 228 r.getValueClass().asSubclass(Writable.class), lconf); 229 inbuf = new DataInputBuffer(); 230 outbuf = new DataOutputBuffer(); 231 } 232 233 @Override read()234 public int read() throws IOException { 235 int ret; 236 if (null == inbuf || -1 == (ret = inbuf.read())) { 237 if (!r.next(key, val)) { 238 return -1; 239 } 240 byte[] tmp = key.toString().getBytes(Charsets.UTF_8); 241 outbuf.write(tmp, 0, tmp.length); 242 outbuf.write('\t'); 243 tmp = val.toString().getBytes(Charsets.UTF_8); 244 outbuf.write(tmp, 0, tmp.length); 245 outbuf.write('\n'); 246 inbuf.reset(outbuf.getData(), outbuf.getLength()); 247 outbuf.reset(); 248 ret = inbuf.read(); 249 } 250 return ret; 251 } 252 253 @Override close()254 public void close() throws IOException { 255 r.close(); 256 super.close(); 257 } 258 } 259 260 /** 261 * This class transforms a binary Avro data file into an InputStream 262 * with data that is in a human readable JSON format. 263 */ 264 protected static class AvroFileInputStream extends InputStream { 265 private int pos; 266 private byte[] buffer; 267 private ByteArrayOutputStream output; 268 private FileReader<?> fileReader; 269 private DatumWriter<Object> writer; 270 private JsonEncoder encoder; 271 AvroFileInputStream(FileStatus status)272 public AvroFileInputStream(FileStatus status) throws IOException { 273 pos = 0; 274 buffer = new byte[0]; 275 GenericDatumReader<Object> reader = new GenericDatumReader<Object>(); 276 FileContext fc = FileContext.getFileContext(new Configuration()); 277 fileReader = 278 DataFileReader.openReader(new AvroFSInput(fc, status.getPath()),reader); 279 Schema schema = fileReader.getSchema(); 280 writer = new GenericDatumWriter<Object>(schema); 281 output = new ByteArrayOutputStream(); 282 JsonGenerator generator = 283 new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8); 284 MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter(); 285 prettyPrinter.setRootValueSeparator(System.getProperty("line.separator")); 286 generator.setPrettyPrinter(prettyPrinter); 287 encoder = EncoderFactory.get().jsonEncoder(schema, generator); 288 } 289 290 /** 291 * Read a single byte from the stream. 292 */ 293 @Override read()294 public int read() throws IOException { 295 if (pos < buffer.length) { 296 return buffer[pos++]; 297 } 298 if (!fileReader.hasNext()) { 299 return -1; 300 } 301 writer.write(fileReader.next(), encoder); 302 encoder.flush(); 303 if (!fileReader.hasNext()) { 304 // Write a new line after the last Avro record. 305 output.write(System.getProperty("line.separator") 306 .getBytes(Charsets.UTF_8)); 307 output.flush(); 308 } 309 pos = 0; 310 buffer = output.toByteArray(); 311 output.reset(); 312 return read(); 313 } 314 315 /** 316 * Close the stream. 317 */ 318 @Override close()319 public void close() throws IOException { 320 fileReader.close(); 321 output.close(); 322 super.close(); 323 } 324 } 325 } 326