1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import static org.junit.Assert.assertTrue; 21 22 import java.io.EOFException; 23 import java.io.IOException; 24 import java.security.PrivilegedExceptionAction; 25 26 import org.apache.commons.logging.impl.Log4JLogger; 27 import org.apache.hadoop.conf.Configuration; 28 import org.apache.hadoop.fs.FSDataInputStream; 29 import org.apache.hadoop.fs.FSDataOutputStream; 30 import org.apache.hadoop.fs.FileSystem; 31 import org.apache.hadoop.fs.Path; 32 import org.apache.hadoop.hdfs.protocol.Block; 33 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; 34 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; 35 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol; 36 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 37 import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 38 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 39 import org.apache.hadoop.hdfs.server.datanode.DataNode; 40 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; 41 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; 42 import org.apache.hadoop.hdfs.server.namenode.LeaseManager; 43 import org.apache.hadoop.ipc.RPC; 44 import org.apache.hadoop.security.UserGroupInformation; 45 import org.apache.hadoop.security.token.Token; 46 import org.apache.log4j.Level; 47 import org.junit.Assert; 48 import org.junit.Test; 49 50 51 /** 52 * Test for short circuit read functionality using {@link BlockReaderLocal}. 53 * When a block is being read by a client is on the local datanode, instead of 54 * using {@link DataTransferProtocol} and connect to datanode, the short circuit 55 * read allows reading the file directly from the files on the local file 56 * system. 57 */ 58 public class TestShortCircuitLocalRead { 59 static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/"; 60 61 static final long seed = 0xDEADBEEFL; 62 static final int blockSize = 5120; 63 boolean simulatedStorage = false; 64 65 // creates a file but does not close it createFile(FileSystem fileSys, Path name, int repl)66 static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) 67 throws IOException { 68 FSDataOutputStream stm = fileSys.create(name, true, 69 fileSys.getConf().getInt("io.file.buffer.size", 4096), 70 (short)repl, (long)blockSize); 71 return stm; 72 } 73 checkData(byte[] actual, int from, byte[] expected, String message)74 static private void checkData(byte[] actual, int from, byte[] expected, 75 String message) { 76 checkData(actual, from, expected, actual.length, message); 77 } 78 checkData(byte[] actual, int from, byte[] expected, int len, String message)79 static private void checkData(byte[] actual, int from, byte[] expected, 80 int len, String message) { 81 for (int idx = 0; idx < len; idx++) { 82 if (expected[from + idx] != actual[idx]) { 83 Assert.fail(message + " byte " + (from + idx) + " differs. expected " 84 + expected[from + idx] + " actual " + actual[idx]); 85 } 86 } 87 } 88 checkFileContent(FileSystem fs, Path name, byte[] expected, int readOffset)89 static void checkFileContent(FileSystem fs, Path name, byte[] expected, 90 int readOffset) throws IOException { 91 FSDataInputStream stm = fs.open(name); 92 byte[] actual = new byte[expected.length-readOffset]; 93 stm.readFully(readOffset, actual); 94 checkData(actual, readOffset, expected, "Read 2"); 95 stm.close(); 96 // Now read using a different API. 97 actual = new byte[expected.length-readOffset]; 98 stm = fs.open(name); 99 long skipped = stm.skip(readOffset); 100 Assert.assertEquals(skipped, readOffset); 101 //Read a small number of bytes first. 102 int nread = stm.read(actual, 0, 3); 103 nread += stm.read(actual, nread, 2); 104 //Read across chunk boundary 105 nread += stm.read(actual, nread, 517); 106 checkData(actual, readOffset, expected, nread, "A few bytes"); 107 //Now read rest of it 108 while (nread < actual.length) { 109 int nbytes = stm.read(actual, nread, actual.length - nread); 110 if (nbytes < 0) { 111 throw new EOFException("End of file reached before reading fully."); 112 } 113 nread += nbytes; 114 } 115 checkData(actual, readOffset, expected, "Read 3"); 116 stm.close(); 117 } 118 119 /** 120 * Test that file data can be read by reading the block file 121 * directly from the local store. 122 */ doTestShortCircuitRead(boolean ignoreChecksum, int size, int readOffset)123 public void doTestShortCircuitRead(boolean ignoreChecksum, int size, 124 int readOffset) throws IOException { 125 Configuration conf = new Configuration(); 126 conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); 127 conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, 128 ignoreChecksum); 129 conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, 130 UserGroupInformation.getCurrentUser().getShortUserName()); 131 if (simulatedStorage) { 132 conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); 133 } 134 MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); 135 FileSystem fs = cluster.getFileSystem(); 136 try { 137 // check that / exists 138 Path path = new Path("/"); 139 assertTrue("/ should be a directory", 140 fs.getFileStatus(path).isDir() == true); 141 142 byte[] fileData = AppendTestUtil.randomBytes(seed, size); 143 // create a new file in home directory. Do not close it. 144 Path file1 = new Path("filelocal.dat"); 145 FSDataOutputStream stm = createFile(fs, file1, 1); 146 147 // write to file 148 stm.write(fileData); 149 stm.close(); 150 checkFileContent(fs, file1, fileData, readOffset); 151 } finally { 152 fs.close(); 153 cluster.shutdown(); 154 } 155 } 156 157 @Test testFileLocalReadNoChecksum()158 public void testFileLocalReadNoChecksum() throws IOException { 159 doTestShortCircuitRead(true, 3*blockSize+100, 0); 160 } 161 162 @Test testFileLocalReadChecksum()163 public void testFileLocalReadChecksum() throws IOException { 164 doTestShortCircuitRead(false, 3*blockSize+100, 0); 165 } 166 167 @Test testSmallFileLocalRead()168 public void testSmallFileLocalRead() throws IOException { 169 doTestShortCircuitRead(false, 13, 0); 170 doTestShortCircuitRead(false, 13, 5); 171 doTestShortCircuitRead(true, 13, 0); 172 doTestShortCircuitRead(true, 13, 5); 173 } 174 175 @Test testReadFromAnOffset()176 public void testReadFromAnOffset() throws IOException { 177 doTestShortCircuitRead(false, 3*blockSize+100, 777); 178 doTestShortCircuitRead(true, 3*blockSize+100, 777); 179 } 180 181 @Test testLongFile()182 public void testLongFile() throws IOException { 183 doTestShortCircuitRead(false, 10*blockSize+100, 777); 184 doTestShortCircuitRead(true, 10*blockSize+100, 777); 185 } 186 187 @Test testGetBlockLocalPathInfo()188 public void testGetBlockLocalPathInfo() throws IOException, InterruptedException { 189 final Configuration conf = new Configuration(); 190 conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, "alloweduser"); 191 MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); 192 cluster.waitActive(); 193 final DataNode dn = cluster.getDataNodes().get(0); 194 FileSystem fs = cluster.getFileSystem(); 195 try { 196 DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23); 197 UserGroupInformation aUgi = UserGroupInformation 198 .createRemoteUser("alloweduser"); 199 LocatedBlocks lb = cluster.getNameNode().getBlockLocations("/tmp/x", 0, 200 16); 201 // Create a new block object, because the block inside LocatedBlock at 202 // namenode is of type BlockInfo. 203 Block blk = new Block(lb.get(0).getBlock()); 204 Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken(); 205 final DatanodeInfo dnInfo = lb.get(0).getLocations()[0]; 206 ClientDatanodeProtocol proxy = aUgi 207 .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() { 208 @Override 209 public ClientDatanodeProtocol run() throws Exception { 210 return DFSClient.createClientDatanodeProtocolProxy( 211 dnInfo, conf, 60000, false); 212 } 213 }); 214 215 //This should succeed 216 BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token); 217 Assert.assertEquals(dn.data.getBlockLocalPathInfo(blk).getBlockPath(), 218 blpi.getBlockPath()); 219 RPC.stopProxy(proxy); 220 221 // Now try with a not allowed user. 222 UserGroupInformation bUgi = UserGroupInformation 223 .createRemoteUser("notalloweduser"); 224 proxy = bUgi 225 .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() { 226 @Override 227 public ClientDatanodeProtocol run() throws Exception { 228 return DFSClient.createClientDatanodeProtocolProxy( 229 dnInfo, conf, 60000, false); 230 } 231 }); 232 try { 233 proxy.getBlockLocalPathInfo(blk, token); 234 Assert.fail("The call should have failed as " + bUgi.getShortUserName() 235 + " is not allowed to call getBlockLocalPathInfo"); 236 } catch (IOException ex) { 237 Assert.assertTrue(ex.getMessage().contains( 238 "not allowed to call getBlockLocalPathInfo")); 239 } finally { 240 RPC.stopProxy(proxy); 241 } 242 } finally { 243 fs.close(); 244 cluster.shutdown(); 245 } 246 } 247 248 /** 249 * Test to run benchmarks between shortcircuit read vs regular read with 250 * specified number of threads simultaneously reading. 251 * <br> 252 * Run this using the following command: 253 * bin/hadoop --config confdir \ 254 * org.apache.hadoop.hdfs.TestShortCircuitLocalRead \ 255 * <shortcircuit on?> <checsum on?> <Number of threads> 256 */ main(String[] args)257 public static void main(String[] args) throws Exception { 258 ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.INFO); 259 ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.INFO); 260 ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.INFO); 261 262 if (args.length != 3) { 263 System.out.println("Usage: test shortcircuit checksum threadCount"); 264 System.exit(1); 265 } 266 boolean shortcircuit = Boolean.valueOf(args[0]); 267 boolean checksum = Boolean.valueOf(args[1]); 268 int threadCount = Integer.valueOf(args[2]); 269 270 // Setup create a file 271 Configuration conf = new Configuration(); 272 conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit); 273 conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, 274 checksum); 275 276 //Override fileSize and DATA_TO_WRITE to much larger values for benchmark test 277 int fileSize = 1000 * blockSize + 100; // File with 1000 blocks 278 final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize); 279 280 // create a new file in home directory. Do not close it. 281 final Path file1 = new Path("filelocal.dat"); 282 final FileSystem fs = FileSystem.get(conf); 283 FSDataOutputStream stm = createFile(fs, file1, 1); 284 285 stm.write(dataToWrite); 286 stm.close(); 287 288 long start = System.currentTimeMillis(); 289 final int iteration = 20; 290 Thread[] threads = new Thread[threadCount]; 291 for (int i = 0; i < threadCount; i++) { 292 threads[i] = new Thread() { 293 public void run() { 294 for (int i = 0; i < iteration; i++) { 295 try { 296 checkFileContent(fs, file1, dataToWrite, 0); 297 } catch (IOException e) { 298 e.printStackTrace(); 299 } 300 } 301 } 302 }; 303 } 304 for (int i = 0; i < threadCount; i++) { 305 threads[i].start(); 306 } 307 for (int i = 0; i < threadCount; i++) { 308 threads[i].join(); 309 } 310 long end = System.currentTimeMillis(); 311 System.out.println("Iteration " + iteration + " took " + (end - start)); 312 fs.delete(file1, false); 313 } 314 } 315