1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.fs.slive; 20 21 import java.io.DataInputStream; 22 import java.io.EOFException; 23 import java.io.IOException; 24 import java.nio.ByteBuffer; 25 26 /** 27 * Class which reads in and verifies bytes that have been read in 28 */ 29 class DataVerifier { 30 private static final int BYTES_PER_LONG = Constants.BYTES_PER_LONG; 31 32 private int bufferSize; 33 34 /** 35 * The output from verification includes the number of chunks that were the 36 * same as expected and the number of segments that were different than what 37 * was expected and the number of total bytes read 38 */ 39 static class VerifyOutput { 40 private long same; 41 private long different; 42 private long read; 43 private long readTime; 44 VerifyOutput(long sameChunks, long differentChunks, long readBytes, long readTime)45 VerifyOutput(long sameChunks, long differentChunks, long readBytes, 46 long readTime) { 47 this.same = sameChunks; 48 this.different = differentChunks; 49 this.read = readBytes; 50 this.readTime = readTime; 51 } 52 getReadTime()53 long getReadTime() { 54 return this.readTime; 55 } 56 getBytesRead()57 long getBytesRead() { 58 return this.read; 59 } 60 getChunksSame()61 long getChunksSame() { 62 return same; 63 } 64 getChunksDifferent()65 long getChunksDifferent() { 66 return different; 67 } 68 toString()69 public String toString() { 70 return "Bytes read = " + getBytesRead() + " same = " + getChunksSame() 71 + " different = " + getChunksDifferent() + " in " + getReadTime() 72 + " milliseconds"; 73 } 74 75 } 76 77 /** 78 * Class used to hold the result of a read on a header 79 */ 80 private static class ReadInfo { 81 private long byteAm; 82 private long hash; 83 private long timeTaken; 84 private long bytesRead; 85 ReadInfo(long byteAm, long hash, long timeTaken, long bytesRead)86 ReadInfo(long byteAm, long hash, long timeTaken, long bytesRead) { 87 this.byteAm = byteAm; 88 this.hash = hash; 89 this.timeTaken = timeTaken; 90 this.bytesRead = bytesRead; 91 } 92 getByteAm()93 long getByteAm() { 94 return byteAm; 95 } 96 getHashValue()97 long getHashValue() { 98 return hash; 99 } 100 getTimeTaken()101 long getTimeTaken() { 102 return timeTaken; 103 } 104 getBytesRead()105 long getBytesRead() { 106 return bytesRead; 107 } 108 109 } 110 111 /** 112 * Storage class used to hold the chunks same and different for buffered reads 113 * and the resultant verification 114 */ 115 private static class VerifyInfo { 116 VerifyInfo(long same, long different)117 VerifyInfo(long same, long different) { 118 this.same = same; 119 this.different = different; 120 } 121 getSame()122 long getSame() { 123 return same; 124 } 125 getDifferent()126 long getDifferent() { 127 return different; 128 } 129 130 private long same; 131 private long different; 132 } 133 134 /** 135 * Inits with given buffer size (must be greater than bytes per long and a 136 * multiple of bytes per long) 137 * 138 * @param bufferSize 139 * size which must be greater than BYTES_PER_LONG and which also must 140 * be a multiple of BYTES_PER_LONG 141 */ DataVerifier(int bufferSize)142 DataVerifier(int bufferSize) { 143 if (bufferSize < BYTES_PER_LONG) { 144 throw new IllegalArgumentException( 145 "Buffer size must be greater than or equal to " + BYTES_PER_LONG); 146 } 147 if ((bufferSize % BYTES_PER_LONG) != 0) { 148 throw new IllegalArgumentException("Buffer size must be a multiple of " 149 + BYTES_PER_LONG); 150 } 151 this.bufferSize = bufferSize; 152 } 153 154 /** 155 * Inits with the default buffer size 156 */ DataVerifier()157 DataVerifier() { 158 this(Constants.BUFFERSIZE); 159 } 160 161 /** 162 * Verifies a buffer of a given size using the given start hash offset 163 * 164 * @param buf 165 * the buffer to verify 166 * @param size 167 * the number of bytes to be used in that buffer 168 * @param startOffset 169 * the start hash offset 170 * @param hasher 171 * the hasher to use for calculating expected values 172 * 173 * @return ResumeBytes a set of data about the next offset and chunks analyzed 174 */ verifyBuffer(ByteBuffer buf, int size, long startOffset, DataHasher hasher)175 private VerifyInfo verifyBuffer(ByteBuffer buf, int size, long startOffset, 176 DataHasher hasher) { 177 ByteBuffer cmpBuf = ByteBuffer.wrap(new byte[BYTES_PER_LONG]); 178 long hashOffset = startOffset; 179 long chunksSame = 0; 180 long chunksDifferent = 0; 181 for (long i = 0; i < size; ++i) { 182 cmpBuf.put(buf.get()); 183 if (!cmpBuf.hasRemaining()) { 184 cmpBuf.rewind(); 185 long receivedData = cmpBuf.getLong(); 186 cmpBuf.rewind(); 187 long expected = hasher.generate(hashOffset); 188 hashOffset += BYTES_PER_LONG; 189 if (receivedData == expected) { 190 ++chunksSame; 191 } else { 192 ++chunksDifferent; 193 } 194 } 195 } 196 // any left over?? 197 if (cmpBuf.hasRemaining() && cmpBuf.position() != 0) { 198 // partial capture 199 // zero fill and compare with zero filled 200 int curSize = cmpBuf.position(); 201 while (cmpBuf.hasRemaining()) { 202 cmpBuf.put((byte) 0); 203 } 204 long expected = hasher.generate(hashOffset); 205 ByteBuffer tempBuf = ByteBuffer.wrap(new byte[BYTES_PER_LONG]); 206 tempBuf.putLong(expected); 207 tempBuf.position(curSize); 208 while (tempBuf.hasRemaining()) { 209 tempBuf.put((byte) 0); 210 } 211 cmpBuf.rewind(); 212 tempBuf.rewind(); 213 if (cmpBuf.equals(tempBuf)) { 214 ++chunksSame; 215 } else { 216 ++chunksDifferent; 217 } 218 } 219 return new VerifyInfo(chunksSame, chunksDifferent); 220 } 221 222 /** 223 * Determines the offset to use given a byte counter 224 * 225 * @param byteRead 226 * 227 * @return offset position 228 */ determineOffset(long byteRead)229 private long determineOffset(long byteRead) { 230 if (byteRead < 0) { 231 byteRead = 0; 232 } 233 return (byteRead / BYTES_PER_LONG) * BYTES_PER_LONG; 234 } 235 236 /** 237 * Verifies a given number of bytes from a file - less number of bytes may be 238 * read if a header can not be read in due to the byte limit 239 * 240 * @param byteAm 241 * the byte amount to limit to (should be less than or equal to file 242 * size) 243 * 244 * @param in 245 * the input stream to read from 246 * 247 * @return VerifyOutput with data about reads 248 * 249 * @throws IOException 250 * if a read failure occurs 251 * 252 * @throws BadFileException 253 * if a header can not be read or end of file is reached 254 * unexpectedly 255 */ verifyFile(long byteAm, DataInputStream in)256 VerifyOutput verifyFile(long byteAm, DataInputStream in) 257 throws IOException, BadFileException { 258 return verifyBytes(byteAm, 0, in); 259 } 260 261 /** 262 * Verifies a given number of bytes from a file - less number of bytes may be 263 * read if a header can not be read in due to the byte limit 264 * 265 * @param byteAm 266 * the byte amount to limit to (should be less than or equal to file 267 * size) 268 * 269 * @param bytesRead 270 * the starting byte location 271 * 272 * @param in 273 * the input stream to read from 274 * 275 * @return VerifyOutput with data about reads 276 * 277 * @throws IOException 278 * if a read failure occurs 279 * 280 * @throws BadFileException 281 * if a header can not be read or end of file is reached 282 * unexpectedly 283 */ verifyBytes(long byteAm, long bytesRead, DataInputStream in)284 private VerifyOutput verifyBytes(long byteAm, long bytesRead, 285 DataInputStream in) throws IOException, BadFileException { 286 if (byteAm <= 0) { 287 return new VerifyOutput(0, 0, 0, 0); 288 } 289 long chunksSame = 0; 290 long chunksDifferent = 0; 291 long readTime = 0; 292 long bytesLeft = byteAm; 293 long bufLeft = 0; 294 long bufRead = 0; 295 long seqNum = 0; 296 DataHasher hasher = null; 297 ByteBuffer readBuf = ByteBuffer.wrap(new byte[bufferSize]); 298 while (bytesLeft > 0) { 299 if (bufLeft <= 0) { 300 if (bytesLeft < DataWriter.getHeaderLength()) { 301 // no bytes left to read a header 302 break; 303 } 304 // time to read a new header 305 ReadInfo header = null; 306 try { 307 header = readHeader(in); 308 } catch (EOFException e) { 309 // eof ok on header reads 310 // but not on data readers 311 break; 312 } 313 ++seqNum; 314 hasher = new DataHasher(header.getHashValue()); 315 bufLeft = header.getByteAm(); 316 readTime += header.getTimeTaken(); 317 bytesRead += header.getBytesRead(); 318 bytesLeft -= header.getBytesRead(); 319 bufRead = 0; 320 // number of bytes to read greater than how many we want to read 321 if (bufLeft > bytesLeft) { 322 bufLeft = bytesLeft; 323 } 324 // does the buffer amount have anything?? 325 if (bufLeft <= 0) { 326 continue; 327 } 328 } 329 // figure out the buffer size to read 330 int bufSize = bufferSize; 331 if (bytesLeft < bufSize) { 332 bufSize = (int) bytesLeft; 333 } 334 if (bufLeft < bufSize) { 335 bufSize = (int) bufLeft; 336 } 337 // read it in 338 try { 339 readBuf.rewind(); 340 long startTime = Timer.now(); 341 in.readFully(readBuf.array(), 0, bufSize); 342 readTime += Timer.elapsed(startTime); 343 } catch (EOFException e) { 344 throw new BadFileException( 345 "Could not read the number of expected data bytes " + bufSize 346 + " due to unexpected end of file during sequence " + seqNum, e); 347 } 348 // update the counters 349 bytesRead += bufSize; 350 bytesLeft -= bufSize; 351 bufLeft -= bufSize; 352 // verify what we read 353 readBuf.rewind(); 354 // figure out the expected hash offset start point 355 long vOffset = determineOffset(bufRead); 356 // now update for new position 357 bufRead += bufSize; 358 // verify 359 VerifyInfo verifyRes = verifyBuffer(readBuf, bufSize, vOffset, hasher); 360 // update the verification counters 361 chunksSame += verifyRes.getSame(); 362 chunksDifferent += verifyRes.getDifferent(); 363 } 364 return new VerifyOutput(chunksSame, chunksDifferent, bytesRead, readTime); 365 } 366 367 368 /** 369 * Reads a header from the given input stream 370 * 371 * @param in 372 * input stream to read from 373 * 374 * @return ReadInfo 375 * 376 * @throws IOException 377 * if a read error occurs or EOF occurs 378 * 379 * @throws BadFileException 380 * if end of file occurs or the byte amount read is invalid 381 */ readHeader(DataInputStream in)382 ReadInfo readHeader(DataInputStream in) throws IOException, 383 BadFileException { 384 int headerLen = DataWriter.getHeaderLength(); 385 ByteBuffer headerBuf = ByteBuffer.wrap(new byte[headerLen]); 386 long elapsed = 0; 387 { 388 long startTime = Timer.now(); 389 in.readFully(headerBuf.array()); 390 elapsed += Timer.elapsed(startTime); 391 } 392 headerBuf.rewind(); 393 long hashValue = headerBuf.getLong(); 394 long byteAvailable = headerBuf.getLong(); 395 if (byteAvailable < 0) { 396 throw new BadFileException("Invalid negative amount " + byteAvailable 397 + " determined for header data amount"); 398 } 399 return new ReadInfo(byteAvailable, hashValue, elapsed, headerLen); 400 } 401 } 402