1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.fs.slive;
20 
21 import java.io.DataInputStream;
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.nio.ByteBuffer;
25 
26 /**
27  * Class which reads in and verifies bytes that have been read in
28  */
29 class DataVerifier {
30   private static final int BYTES_PER_LONG = Constants.BYTES_PER_LONG;
31 
32   private int bufferSize;
33 
34   /**
35    * The output from verification includes the number of chunks that were the
36    * same as expected and the number of segments that were different than what
37    * was expected and the number of total bytes read
38    */
39   static class VerifyOutput {
40     private long same;
41     private long different;
42     private long read;
43     private long readTime;
44 
VerifyOutput(long sameChunks, long differentChunks, long readBytes, long readTime)45     VerifyOutput(long sameChunks, long differentChunks, long readBytes,
46         long readTime) {
47       this.same = sameChunks;
48       this.different = differentChunks;
49       this.read = readBytes;
50       this.readTime = readTime;
51     }
52 
getReadTime()53     long getReadTime() {
54       return this.readTime;
55     }
56 
getBytesRead()57     long getBytesRead() {
58       return this.read;
59     }
60 
getChunksSame()61     long getChunksSame() {
62       return same;
63     }
64 
getChunksDifferent()65     long getChunksDifferent() {
66       return different;
67     }
68 
toString()69     public String toString() {
70       return "Bytes read = " + getBytesRead() + " same = " + getChunksSame()
71           + " different = " + getChunksDifferent() + " in " + getReadTime()
72           + " milliseconds";
73     }
74 
75   }
76 
77   /**
78    * Class used to hold the result of a read on a header
79    */
80   private static class ReadInfo {
81     private long byteAm;
82     private long hash;
83     private long timeTaken;
84     private long bytesRead;
85 
ReadInfo(long byteAm, long hash, long timeTaken, long bytesRead)86     ReadInfo(long byteAm, long hash, long timeTaken, long bytesRead) {
87       this.byteAm = byteAm;
88       this.hash = hash;
89       this.timeTaken = timeTaken;
90       this.bytesRead = bytesRead;
91     }
92 
getByteAm()93     long getByteAm() {
94       return byteAm;
95     }
96 
getHashValue()97     long getHashValue() {
98       return hash;
99     }
100 
getTimeTaken()101     long getTimeTaken() {
102       return timeTaken;
103     }
104 
getBytesRead()105     long getBytesRead() {
106       return bytesRead;
107     }
108 
109   }
110 
111   /**
112    * Storage class used to hold the chunks same and different for buffered reads
113    * and the resultant verification
114    */
115   private static class VerifyInfo {
116 
VerifyInfo(long same, long different)117     VerifyInfo(long same, long different) {
118       this.same = same;
119       this.different = different;
120     }
121 
getSame()122     long getSame() {
123       return same;
124     }
125 
getDifferent()126     long getDifferent() {
127       return different;
128     }
129 
130     private long same;
131     private long different;
132   }
133 
134   /**
135    * Inits with given buffer size (must be greater than bytes per long and a
136    * multiple of bytes per long)
137    *
138    * @param bufferSize
139    *          size which must be greater than BYTES_PER_LONG and which also must
140    *          be a multiple of BYTES_PER_LONG
141    */
DataVerifier(int bufferSize)142   DataVerifier(int bufferSize) {
143     if (bufferSize < BYTES_PER_LONG) {
144       throw new IllegalArgumentException(
145           "Buffer size must be greater than or equal to " + BYTES_PER_LONG);
146     }
147     if ((bufferSize % BYTES_PER_LONG) != 0) {
148       throw new IllegalArgumentException("Buffer size must be a multiple of "
149           + BYTES_PER_LONG);
150     }
151     this.bufferSize = bufferSize;
152   }
153 
154   /**
155    * Inits with the default buffer size
156    */
DataVerifier()157   DataVerifier() {
158     this(Constants.BUFFERSIZE);
159   }
160 
161   /**
162    * Verifies a buffer of a given size using the given start hash offset
163    *
164    * @param buf
165    *          the buffer to verify
166    * @param size
167    *          the number of bytes to be used in that buffer
168    * @param startOffset
169    *          the start hash offset
170    * @param hasher
171    *          the hasher to use for calculating expected values
172    *
173    * @return ResumeBytes a set of data about the next offset and chunks analyzed
174    */
verifyBuffer(ByteBuffer buf, int size, long startOffset, DataHasher hasher)175   private VerifyInfo verifyBuffer(ByteBuffer buf, int size, long startOffset,
176       DataHasher hasher) {
177     ByteBuffer cmpBuf = ByteBuffer.wrap(new byte[BYTES_PER_LONG]);
178     long hashOffset = startOffset;
179     long chunksSame = 0;
180     long chunksDifferent = 0;
181     for (long i = 0; i < size; ++i) {
182       cmpBuf.put(buf.get());
183       if (!cmpBuf.hasRemaining()) {
184         cmpBuf.rewind();
185         long receivedData = cmpBuf.getLong();
186         cmpBuf.rewind();
187         long expected = hasher.generate(hashOffset);
188         hashOffset += BYTES_PER_LONG;
189         if (receivedData == expected) {
190           ++chunksSame;
191         } else {
192           ++chunksDifferent;
193         }
194       }
195     }
196     // any left over??
197     if (cmpBuf.hasRemaining() && cmpBuf.position() != 0) {
198       // partial capture
199       // zero fill and compare with zero filled
200       int curSize = cmpBuf.position();
201       while (cmpBuf.hasRemaining()) {
202         cmpBuf.put((byte) 0);
203       }
204       long expected = hasher.generate(hashOffset);
205       ByteBuffer tempBuf = ByteBuffer.wrap(new byte[BYTES_PER_LONG]);
206       tempBuf.putLong(expected);
207       tempBuf.position(curSize);
208       while (tempBuf.hasRemaining()) {
209         tempBuf.put((byte) 0);
210       }
211       cmpBuf.rewind();
212       tempBuf.rewind();
213       if (cmpBuf.equals(tempBuf)) {
214         ++chunksSame;
215       } else {
216         ++chunksDifferent;
217       }
218     }
219     return new VerifyInfo(chunksSame, chunksDifferent);
220   }
221 
222   /**
223    * Determines the offset to use given a byte counter
224    *
225    * @param byteRead
226    *
227    * @return offset position
228    */
determineOffset(long byteRead)229   private long determineOffset(long byteRead) {
230     if (byteRead < 0) {
231       byteRead = 0;
232     }
233     return (byteRead / BYTES_PER_LONG) * BYTES_PER_LONG;
234   }
235 
236   /**
237    * Verifies a given number of bytes from a file - less number of bytes may be
238    * read if a header can not be read in due to the byte limit
239    *
240    * @param byteAm
241    *          the byte amount to limit to (should be less than or equal to file
242    *          size)
243    *
244    * @param in
245    *          the input stream to read from
246    *
247    * @return VerifyOutput with data about reads
248    *
249    * @throws IOException
250    *           if a read failure occurs
251    *
252    * @throws BadFileException
253    *           if a header can not be read or end of file is reached
254    *           unexpectedly
255    */
verifyFile(long byteAm, DataInputStream in)256   VerifyOutput verifyFile(long byteAm, DataInputStream in)
257       throws IOException, BadFileException {
258     return verifyBytes(byteAm, 0, in);
259   }
260 
261   /**
262    * Verifies a given number of bytes from a file - less number of bytes may be
263    * read if a header can not be read in due to the byte limit
264    *
265    * @param byteAm
266    *          the byte amount to limit to (should be less than or equal to file
267    *          size)
268    *
269    * @param bytesRead
270    *          the starting byte location
271    *
272    * @param in
273    *          the input stream to read from
274    *
275    * @return VerifyOutput with data about reads
276    *
277    * @throws IOException
278    *           if a read failure occurs
279    *
280    * @throws BadFileException
281    *           if a header can not be read or end of file is reached
282    *           unexpectedly
283    */
verifyBytes(long byteAm, long bytesRead, DataInputStream in)284   private VerifyOutput verifyBytes(long byteAm, long bytesRead,
285       DataInputStream in) throws IOException, BadFileException {
286     if (byteAm <= 0) {
287       return new VerifyOutput(0, 0, 0, 0);
288     }
289     long chunksSame = 0;
290     long chunksDifferent = 0;
291     long readTime = 0;
292     long bytesLeft = byteAm;
293     long bufLeft = 0;
294     long bufRead = 0;
295     long seqNum = 0;
296     DataHasher hasher = null;
297     ByteBuffer readBuf = ByteBuffer.wrap(new byte[bufferSize]);
298     while (bytesLeft > 0) {
299       if (bufLeft <= 0) {
300         if (bytesLeft < DataWriter.getHeaderLength()) {
301           // no bytes left to read a header
302           break;
303         }
304         // time to read a new header
305         ReadInfo header = null;
306         try {
307           header = readHeader(in);
308         } catch (EOFException e) {
309           // eof ok on header reads
310           // but not on data readers
311           break;
312         }
313         ++seqNum;
314         hasher = new DataHasher(header.getHashValue());
315         bufLeft = header.getByteAm();
316         readTime += header.getTimeTaken();
317         bytesRead += header.getBytesRead();
318         bytesLeft -= header.getBytesRead();
319         bufRead = 0;
320         // number of bytes to read greater than how many we want to read
321         if (bufLeft > bytesLeft) {
322           bufLeft = bytesLeft;
323         }
324         // does the buffer amount have anything??
325         if (bufLeft <= 0) {
326           continue;
327         }
328       }
329       // figure out the buffer size to read
330       int bufSize = bufferSize;
331       if (bytesLeft < bufSize) {
332         bufSize = (int) bytesLeft;
333       }
334       if (bufLeft < bufSize) {
335         bufSize = (int) bufLeft;
336       }
337       // read it in
338       try {
339         readBuf.rewind();
340         long startTime = Timer.now();
341         in.readFully(readBuf.array(), 0, bufSize);
342         readTime += Timer.elapsed(startTime);
343       } catch (EOFException e) {
344         throw new BadFileException(
345             "Could not read the number of expected data bytes " + bufSize
346                 + " due to unexpected end of file during sequence " + seqNum, e);
347       }
348       // update the counters
349       bytesRead += bufSize;
350       bytesLeft -= bufSize;
351       bufLeft -= bufSize;
352       // verify what we read
353       readBuf.rewind();
354       // figure out the expected hash offset start point
355       long vOffset = determineOffset(bufRead);
356       // now update for new position
357       bufRead += bufSize;
358       // verify
359       VerifyInfo verifyRes = verifyBuffer(readBuf, bufSize, vOffset, hasher);
360       // update the verification counters
361       chunksSame += verifyRes.getSame();
362       chunksDifferent += verifyRes.getDifferent();
363     }
364     return new VerifyOutput(chunksSame, chunksDifferent, bytesRead, readTime);
365   }
366 
367 
368   /**
369    * Reads a header from the given input stream
370    *
371    * @param in
372    *          input stream to read from
373    *
374    * @return ReadInfo
375    *
376    * @throws IOException
377    *           if a read error occurs or EOF occurs
378    *
379    * @throws BadFileException
380    *           if end of file occurs or the byte amount read is invalid
381    */
readHeader(DataInputStream in)382   ReadInfo readHeader(DataInputStream in) throws IOException,
383       BadFileException {
384     int headerLen = DataWriter.getHeaderLength();
385     ByteBuffer headerBuf = ByteBuffer.wrap(new byte[headerLen]);
386     long elapsed = 0;
387     {
388       long startTime = Timer.now();
389       in.readFully(headerBuf.array());
390       elapsed += Timer.elapsed(startTime);
391     }
392     headerBuf.rewind();
393     long hashValue = headerBuf.getLong();
394     long byteAvailable = headerBuf.getLong();
395     if (byteAvailable < 0) {
396       throw new BadFileException("Invalid negative amount " + byteAvailable
397           + " determined for header data amount");
398     }
399     return new ReadInfo(byteAvailable, hashValue, elapsed, headerLen);
400   }
401 }
402