1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with this 4 * work for additional information regarding copyright ownership. The ASF 5 * licenses this file to you under the Apache License, Version 2.0 (the 6 * "License"); you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14 * License for the specific language governing permissions and limitations under 15 * the License. 16 */ 17 package org.apache.hadoop.io.file.tfile; 18 19 import java.io.DataInputStream; 20 import java.io.DataOutputStream; 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.io.OutputStream; 24 25 /** 26 * Several related classes to support chunk-encoded sub-streams on top of a 27 * regular stream. 28 */ 29 final class Chunk { 30 31 /** 32 * Prevent the instantiation of class. 33 */ Chunk()34 private Chunk() { 35 // nothing 36 } 37 38 /** 39 * Decoding a chain of chunks encoded through ChunkEncoder or 40 * SingleChunkEncoder. 41 */ 42 static public class ChunkDecoder extends InputStream { 43 private DataInputStream in = null; 44 private boolean lastChunk; 45 private int remain = 0; 46 private boolean closed; 47 ChunkDecoder()48 public ChunkDecoder() { 49 lastChunk = true; 50 closed = true; 51 } 52 reset(DataInputStream downStream)53 public void reset(DataInputStream downStream) { 54 // no need to wind forward the old input. 55 in = downStream; 56 lastChunk = false; 57 remain = 0; 58 closed = false; 59 } 60 61 /** 62 * Constructor 63 * 64 * @param in 65 * The source input stream which contains chunk-encoded data 66 * stream. 67 */ ChunkDecoder(DataInputStream in)68 public ChunkDecoder(DataInputStream in) { 69 this.in = in; 70 lastChunk = false; 71 closed = false; 72 } 73 74 /** 75 * Have we reached the last chunk. 76 * 77 * @return true if we have reached the last chunk. 78 * @throws java.io.IOException 79 */ isLastChunk()80 public boolean isLastChunk() throws IOException { 81 checkEOF(); 82 return lastChunk; 83 } 84 85 /** 86 * How many bytes remain in the current chunk? 87 * 88 * @return remaining bytes left in the current chunk. 89 * @throws java.io.IOException 90 */ getRemain()91 public int getRemain() throws IOException { 92 checkEOF(); 93 return remain; 94 } 95 96 /** 97 * Reading the length of next chunk. 98 * 99 * @throws java.io.IOException 100 * when no more data is available. 101 */ readLength()102 private void readLength() throws IOException { 103 remain = Utils.readVInt(in); 104 if (remain >= 0) { 105 lastChunk = true; 106 } else { 107 remain = -remain; 108 } 109 } 110 111 /** 112 * Check whether we reach the end of the stream. 113 * 114 * @return false if the chunk encoded stream has more data to read (in which 115 * case available() will be greater than 0); true otherwise. 116 * @throws java.io.IOException 117 * on I/O errors. 118 */ checkEOF()119 private boolean checkEOF() throws IOException { 120 if (isClosed()) return true; 121 while (true) { 122 if (remain > 0) return false; 123 if (lastChunk) return true; 124 readLength(); 125 } 126 } 127 128 @Override 129 /* 130 * This method never blocks the caller. Returning 0 does not mean we reach 131 * the end of the stream. 132 */ available()133 public int available() { 134 return remain; 135 } 136 137 @Override read()138 public int read() throws IOException { 139 if (checkEOF()) return -1; 140 int ret = in.read(); 141 if (ret < 0) throw new IOException("Corrupted chunk encoding stream"); 142 --remain; 143 return ret; 144 } 145 146 @Override read(byte[] b)147 public int read(byte[] b) throws IOException { 148 return read(b, 0, b.length); 149 } 150 151 @Override read(byte[] b, int off, int len)152 public int read(byte[] b, int off, int len) throws IOException { 153 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 154 throw new IndexOutOfBoundsException(); 155 } 156 157 if (!checkEOF()) { 158 int n = Math.min(remain, len); 159 int ret = in.read(b, off, n); 160 if (ret < 0) throw new IOException("Corrupted chunk encoding stream"); 161 remain -= ret; 162 return ret; 163 } 164 return -1; 165 } 166 167 @Override skip(long n)168 public long skip(long n) throws IOException { 169 if (!checkEOF()) { 170 long ret = in.skip(Math.min(remain, n)); 171 remain -= ret; 172 return ret; 173 } 174 return 0; 175 } 176 177 @Override markSupported()178 public boolean markSupported() { 179 return false; 180 } 181 isClosed()182 public boolean isClosed() { 183 return closed; 184 } 185 186 @Override close()187 public void close() throws IOException { 188 if (closed == false) { 189 try { 190 while (!checkEOF()) { 191 skip(Integer.MAX_VALUE); 192 } 193 } finally { 194 closed = true; 195 } 196 } 197 } 198 } 199 200 /** 201 * Chunk Encoder. Encoding the output data into a chain of chunks in the 202 * following sequences: -len1, byte[len1], -len2, byte[len2], ... len_n, 203 * byte[len_n]. Where len1, len2, ..., len_n are the lengths of the data 204 * chunks. Non-terminal chunks have their lengths negated. Non-terminal chunks 205 * cannot have length 0. All lengths are in the range of 0 to 206 * Integer.MAX_VALUE and are encoded in Utils.VInt format. 207 */ 208 static public class ChunkEncoder extends OutputStream { 209 /** 210 * The data output stream it connects to. 211 */ 212 private DataOutputStream out; 213 214 /** 215 * The internal buffer that is only used when we do not know the advertised 216 * size. 217 */ 218 private byte buf[]; 219 220 /** 221 * The number of valid bytes in the buffer. This value is always in the 222 * range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt> 223 * through <tt>buf[count-1]</tt> contain valid byte data. 224 */ 225 private int count; 226 227 /** 228 * Constructor. 229 * 230 * @param out 231 * the underlying output stream. 232 * @param buf 233 * user-supplied buffer. The buffer would be used exclusively by 234 * the ChunkEncoder during its life cycle. 235 */ ChunkEncoder(DataOutputStream out, byte[] buf)236 public ChunkEncoder(DataOutputStream out, byte[] buf) { 237 this.out = out; 238 this.buf = buf; 239 this.count = 0; 240 } 241 242 /** 243 * Write out a chunk. 244 * 245 * @param chunk 246 * The chunk buffer. 247 * @param offset 248 * Offset to chunk buffer for the beginning of chunk. 249 * @param len 250 * @param last 251 * Is this the last call to flushBuffer? 252 */ writeChunk(byte[] chunk, int offset, int len, boolean last)253 private void writeChunk(byte[] chunk, int offset, int len, boolean last) 254 throws IOException { 255 if (last) { // always write out the length for the last chunk. 256 Utils.writeVInt(out, len); 257 if (len > 0) { 258 out.write(chunk, offset, len); 259 } 260 } else { 261 if (len > 0) { 262 Utils.writeVInt(out, -len); 263 out.write(chunk, offset, len); 264 } 265 } 266 } 267 268 /** 269 * Write out a chunk that is a concatenation of the internal buffer plus 270 * user supplied data. This will never be the last block. 271 * 272 * @param data 273 * User supplied data buffer. 274 * @param offset 275 * Offset to user data buffer. 276 * @param len 277 * User data buffer size. 278 */ writeBufData(byte[] data, int offset, int len)279 private void writeBufData(byte[] data, int offset, int len) 280 throws IOException { 281 if (count + len > 0) { 282 Utils.writeVInt(out, -(count + len)); 283 out.write(buf, 0, count); 284 count = 0; 285 out.write(data, offset, len); 286 } 287 } 288 289 /** 290 * Flush the internal buffer. 291 * 292 * Is this the last call to flushBuffer? 293 * 294 * @throws java.io.IOException 295 */ flushBuffer()296 private void flushBuffer() throws IOException { 297 if (count > 0) { 298 writeChunk(buf, 0, count, false); 299 count = 0; 300 } 301 } 302 303 @Override write(int b)304 public void write(int b) throws IOException { 305 if (count >= buf.length) { 306 flushBuffer(); 307 } 308 buf[count++] = (byte) b; 309 } 310 311 @Override write(byte b[])312 public void write(byte b[]) throws IOException { 313 write(b, 0, b.length); 314 } 315 316 @Override write(byte b[], int off, int len)317 public void write(byte b[], int off, int len) throws IOException { 318 if ((len + count) >= buf.length) { 319 /* 320 * If the input data do not fit in buffer, flush the output buffer and 321 * then write the data directly. In this way buffered streams will 322 * cascade harmlessly. 323 */ 324 writeBufData(b, off, len); 325 return; 326 } 327 328 System.arraycopy(b, off, buf, count, len); 329 count += len; 330 } 331 332 @Override flush()333 public void flush() throws IOException { 334 flushBuffer(); 335 out.flush(); 336 } 337 338 @Override close()339 public void close() throws IOException { 340 if (buf != null) { 341 try { 342 writeChunk(buf, 0, count, true); 343 } finally { 344 buf = null; 345 out = null; 346 } 347 } 348 } 349 } 350 351 /** 352 * Encode the whole stream as a single chunk. Expecting to know the size of 353 * the chunk up-front. 354 */ 355 static public class SingleChunkEncoder extends OutputStream { 356 /** 357 * The data output stream it connects to. 358 */ 359 private final DataOutputStream out; 360 361 /** 362 * The remaining bytes to be written. 363 */ 364 private int remain; 365 private boolean closed = false; 366 367 /** 368 * Constructor. 369 * 370 * @param out 371 * the underlying output stream. 372 * @param size 373 * The total # of bytes to be written as a single chunk. 374 * @throws java.io.IOException 375 * if an I/O error occurs. 376 */ SingleChunkEncoder(DataOutputStream out, int size)377 public SingleChunkEncoder(DataOutputStream out, int size) 378 throws IOException { 379 this.out = out; 380 this.remain = size; 381 Utils.writeVInt(out, size); 382 } 383 384 @Override write(int b)385 public void write(int b) throws IOException { 386 if (remain > 0) { 387 out.write(b); 388 --remain; 389 } else { 390 throw new IOException("Writing more bytes than advertised size."); 391 } 392 } 393 394 @Override write(byte b[])395 public void write(byte b[]) throws IOException { 396 write(b, 0, b.length); 397 } 398 399 @Override write(byte b[], int off, int len)400 public void write(byte b[], int off, int len) throws IOException { 401 if (remain >= len) { 402 out.write(b, off, len); 403 remain -= len; 404 } else { 405 throw new IOException("Writing more bytes than advertised size."); 406 } 407 } 408 409 @Override flush()410 public void flush() throws IOException { 411 out.flush(); 412 } 413 414 @Override close()415 public void close() throws IOException { 416 if (closed == true) { 417 return; 418 } 419 420 try { 421 if (remain > 0) { 422 throw new IOException("Writing less bytes than advertised size."); 423 } 424 } finally { 425 closed = true; 426 } 427 } 428 } 429 } 430