1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import static org.junit.Assert.assertTrue;
21 
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.security.PrivilegedExceptionAction;
25 
26 import org.apache.commons.logging.impl.Log4JLogger;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FSDataInputStream;
29 import org.apache.hadoop.fs.FSDataOutputStream;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hdfs.protocol.Block;
33 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
34 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
35 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
36 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
37 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
38 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
39 import org.apache.hadoop.hdfs.server.datanode.DataNode;
40 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
41 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
42 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
43 import org.apache.hadoop.ipc.RPC;
44 import org.apache.hadoop.security.UserGroupInformation;
45 import org.apache.hadoop.security.token.Token;
46 import org.apache.log4j.Level;
47 import org.junit.Assert;
48 import org.junit.Test;
49 
50 
51 /**
52  * Test for short circuit read functionality using {@link BlockReaderLocal}.
53  * When a block is being read by a client is on the local datanode, instead of
54  * using {@link DataTransferProtocol} and connect to datanode, the short circuit
55  * read allows reading the file directly from the files on the local file
56  * system.
57  */
58 public class TestShortCircuitLocalRead {
59   static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
60 
61   static final long seed = 0xDEADBEEFL;
62   static final int blockSize = 5120;
63   boolean simulatedStorage = false;
64 
65   // creates a file but does not close it
createFile(FileSystem fileSys, Path name, int repl)66   static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
67       throws IOException {
68     FSDataOutputStream stm = fileSys.create(name, true,
69                                             fileSys.getConf().getInt("io.file.buffer.size", 4096),
70                                             (short)repl, (long)blockSize);
71     return stm;
72   }
73 
checkData(byte[] actual, int from, byte[] expected, String message)74   static private void checkData(byte[] actual, int from, byte[] expected,
75       String message) {
76     checkData(actual, from, expected, actual.length, message);
77   }
78 
checkData(byte[] actual, int from, byte[] expected, int len, String message)79   static private void checkData(byte[] actual, int from, byte[] expected,
80       int len, String message) {
81     for (int idx = 0; idx < len; idx++) {
82       if (expected[from + idx] != actual[idx]) {
83         Assert.fail(message + " byte " + (from + idx) + " differs. expected "
84             + expected[from + idx] + " actual " + actual[idx]);
85       }
86     }
87   }
88 
checkFileContent(FileSystem fs, Path name, byte[] expected, int readOffset)89   static void checkFileContent(FileSystem fs, Path name, byte[] expected,
90       int readOffset) throws IOException {
91     FSDataInputStream stm = fs.open(name);
92     byte[] actual = new byte[expected.length-readOffset];
93     stm.readFully(readOffset, actual);
94     checkData(actual, readOffset, expected, "Read 2");
95     stm.close();
96     // Now read using a different API.
97     actual = new byte[expected.length-readOffset];
98     stm = fs.open(name);
99     long skipped = stm.skip(readOffset);
100     Assert.assertEquals(skipped, readOffset);
101     //Read a small number of bytes first.
102     int nread = stm.read(actual, 0, 3);
103     nread += stm.read(actual, nread, 2);
104     //Read across chunk boundary
105     nread += stm.read(actual, nread, 517);
106     checkData(actual, readOffset, expected, nread, "A few bytes");
107     //Now read rest of it
108     while (nread < actual.length) {
109       int nbytes = stm.read(actual, nread, actual.length - nread);
110       if (nbytes < 0) {
111         throw new EOFException("End of file reached before reading fully.");
112       }
113       nread += nbytes;
114     }
115     checkData(actual, readOffset, expected, "Read 3");
116     stm.close();
117   }
118 
119   /**
120    * Test that file data can be read by reading the block file
121    * directly from the local store.
122    */
doTestShortCircuitRead(boolean ignoreChecksum, int size, int readOffset)123   public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
124       int readOffset) throws IOException {
125     Configuration conf = new Configuration();
126     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
127     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
128         ignoreChecksum);
129     conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
130         UserGroupInformation.getCurrentUser().getShortUserName());
131     if (simulatedStorage) {
132       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
133     }
134     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
135     FileSystem fs = cluster.getFileSystem();
136     try {
137       // check that / exists
138       Path path = new Path("/");
139       assertTrue("/ should be a directory",
140                  fs.getFileStatus(path).isDir() == true);
141 
142       byte[] fileData = AppendTestUtil.randomBytes(seed, size);
143       // create a new file in home directory. Do not close it.
144       Path file1 = new Path("filelocal.dat");
145       FSDataOutputStream stm = createFile(fs, file1, 1);
146 
147       // write to file
148       stm.write(fileData);
149       stm.close();
150       checkFileContent(fs, file1, fileData, readOffset);
151     } finally {
152       fs.close();
153       cluster.shutdown();
154     }
155   }
156 
157   @Test
testFileLocalReadNoChecksum()158   public void testFileLocalReadNoChecksum() throws IOException {
159     doTestShortCircuitRead(true, 3*blockSize+100, 0);
160   }
161 
162   @Test
testFileLocalReadChecksum()163   public void testFileLocalReadChecksum() throws IOException {
164     doTestShortCircuitRead(false, 3*blockSize+100, 0);
165   }
166 
167   @Test
testSmallFileLocalRead()168   public void testSmallFileLocalRead() throws IOException {
169     doTestShortCircuitRead(false, 13, 0);
170     doTestShortCircuitRead(false, 13, 5);
171     doTestShortCircuitRead(true, 13, 0);
172     doTestShortCircuitRead(true, 13, 5);
173   }
174 
175   @Test
testReadFromAnOffset()176   public void testReadFromAnOffset() throws IOException {
177     doTestShortCircuitRead(false, 3*blockSize+100, 777);
178     doTestShortCircuitRead(true, 3*blockSize+100, 777);
179   }
180 
181   @Test
testLongFile()182   public void testLongFile() throws IOException {
183     doTestShortCircuitRead(false, 10*blockSize+100, 777);
184     doTestShortCircuitRead(true, 10*blockSize+100, 777);
185   }
186 
187   @Test
testGetBlockLocalPathInfo()188   public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
189     final Configuration conf = new Configuration();
190     conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, "alloweduser");
191     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
192     cluster.waitActive();
193     final DataNode dn = cluster.getDataNodes().get(0);
194     FileSystem fs = cluster.getFileSystem();
195     try {
196       DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
197       UserGroupInformation aUgi = UserGroupInformation
198           .createRemoteUser("alloweduser");
199       LocatedBlocks lb = cluster.getNameNode().getBlockLocations("/tmp/x", 0,
200           16);
201       // Create a new block object, because the block inside LocatedBlock at
202       // namenode is of type BlockInfo.
203       Block blk = new Block(lb.get(0).getBlock());
204       Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
205       final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
206       ClientDatanodeProtocol proxy = aUgi
207           .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
208             @Override
209             public ClientDatanodeProtocol run() throws Exception {
210               return DFSClient.createClientDatanodeProtocolProxy(
211                   dnInfo, conf, 60000, false);
212             }
213           });
214 
215       //This should succeed
216       BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
217       Assert.assertEquals(dn.data.getBlockLocalPathInfo(blk).getBlockPath(),
218           blpi.getBlockPath());
219       RPC.stopProxy(proxy);
220 
221       // Now try with a not allowed user.
222       UserGroupInformation bUgi = UserGroupInformation
223           .createRemoteUser("notalloweduser");
224       proxy = bUgi
225           .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
226             @Override
227             public ClientDatanodeProtocol run() throws Exception {
228               return DFSClient.createClientDatanodeProtocolProxy(
229                   dnInfo, conf, 60000, false);
230             }
231           });
232       try {
233         proxy.getBlockLocalPathInfo(blk, token);
234         Assert.fail("The call should have failed as " + bUgi.getShortUserName()
235             + " is not allowed to call getBlockLocalPathInfo");
236       } catch (IOException ex) {
237         Assert.assertTrue(ex.getMessage().contains(
238             "not allowed to call getBlockLocalPathInfo"));
239       } finally {
240         RPC.stopProxy(proxy);
241       }
242     } finally {
243       fs.close();
244       cluster.shutdown();
245     }
246   }
247 
248   /**
249    * Test to run benchmarks between shortcircuit read vs regular read with
250    * specified number of threads simultaneously reading.
251    * <br>
252    * Run this using the following command:
253    * bin/hadoop --config confdir \
254    * org.apache.hadoop.hdfs.TestShortCircuitLocalRead \
255    * <shortcircuit on?> <checsum on?> <Number of threads>
256    */
main(String[] args)257   public static void main(String[] args) throws Exception {
258     ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.INFO);
259     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.INFO);
260     ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.INFO);
261 
262     if (args.length != 3) {
263       System.out.println("Usage: test shortcircuit checksum threadCount");
264       System.exit(1);
265     }
266     boolean shortcircuit = Boolean.valueOf(args[0]);
267     boolean checksum = Boolean.valueOf(args[1]);
268     int threadCount = Integer.valueOf(args[2]);
269 
270     // Setup create a file
271     Configuration conf = new Configuration();
272     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
273     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
274         checksum);
275 
276     //Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
277     int fileSize = 1000 * blockSize + 100; // File with 1000 blocks
278     final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize);
279 
280     // create a new file in home directory. Do not close it.
281     final Path file1 = new Path("filelocal.dat");
282     final FileSystem fs = FileSystem.get(conf);
283     FSDataOutputStream stm = createFile(fs, file1, 1);
284 
285     stm.write(dataToWrite);
286     stm.close();
287 
288     long start = System.currentTimeMillis();
289     final int iteration = 20;
290     Thread[] threads = new Thread[threadCount];
291     for (int i = 0; i < threadCount; i++) {
292       threads[i] = new Thread() {
293         public void run() {
294           for (int i = 0; i < iteration; i++) {
295             try {
296               checkFileContent(fs, file1, dataToWrite, 0);
297             } catch (IOException e) {
298               e.printStackTrace();
299             }
300           }
301         }
302       };
303     }
304     for (int i = 0; i < threadCount; i++) {
305       threads[i].start();
306     }
307     for (int i = 0; i < threadCount; i++) {
308       threads[i].join();
309     }
310     long end = System.currentTimeMillis();
311     System.out.println("Iteration " + iteration + " took " + (end - start));
312     fs.delete(file1, false);
313   }
314 }
315