1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.fs; 19 20 import org.apache.commons.logging.Log; 21 import org.apache.commons.logging.LogFactory; 22 import org.apache.hadoop.conf.Configuration; 23 import org.apache.hadoop.fs.FSDataInputStream; 24 import org.apache.hadoop.hdfs.DFSConfigKeys; 25 import org.apache.hadoop.hdfs.DFSTestUtil; 26 import org.apache.hadoop.hdfs.DistributedFileSystem; 27 import org.apache.hadoop.hdfs.MiniDFSCluster; 28 import org.apache.hadoop.hdfs.PeerCache; 29 import org.apache.hadoop.io.IOUtils; 30 import org.junit.Assert; 31 import org.junit.Test; 32 33 public class TestUnbuffer { 34 private static final Log LOG = 35 LogFactory.getLog(TestUnbuffer.class.getName()); 36 37 /** 38 * Test that calling Unbuffer closes sockets. 39 */ 40 @Test testUnbufferClosesSockets()41 public void testUnbufferClosesSockets() throws Exception { 42 Configuration conf = new Configuration(); 43 // Set a new ClientContext. This way, we will have our own PeerCache, 44 // rather than sharing one with other unit tests. 45 conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, 46 "testUnbufferClosesSocketsContext"); 47 48 // Disable short-circuit reads. With short-circuit, we wouldn't hold open a 49 // TCP socket. 50 conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); 51 52 // Set a really long socket timeout to avoid test timing issues. 53 conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 54 100000000L); 55 conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, 56 100000000L); 57 58 MiniDFSCluster cluster = null; 59 FSDataInputStream stream = null; 60 try { 61 cluster = new MiniDFSCluster.Builder(conf).build(); 62 DistributedFileSystem dfs = (DistributedFileSystem) 63 FileSystem.newInstance(conf); 64 final Path TEST_PATH = new Path("/test1"); 65 DFSTestUtil.createFile(dfs, TEST_PATH, 128, (short)1, 1); 66 stream = dfs.open(TEST_PATH); 67 // Read a byte. This will trigger the creation of a block reader. 68 stream.seek(2); 69 int b = stream.read(); 70 Assert.assertTrue(-1 != b); 71 72 // The Peer cache should start off empty. 73 PeerCache cache = dfs.getClient().getClientContext().getPeerCache(); 74 Assert.assertEquals(0, cache.size()); 75 76 // Unbuffer should clear the block reader and return the socket to the 77 // cache. 78 stream.unbuffer(); 79 stream.seek(2); 80 Assert.assertEquals(1, cache.size()); 81 int b2 = stream.read(); 82 Assert.assertEquals(b, b2); 83 } finally { 84 if (stream != null) { 85 IOUtils.cleanup(null, stream); 86 } 87 if (cluster != null) { 88 cluster.shutdown(); 89 } 90 } 91 } 92 93 /** 94 * Test opening many files via TCP (not short-circuit). 95 * 96 * This is practical when using unbuffer, because it reduces the number of 97 * sockets and amount of memory that we use. 98 */ 99 @Test testOpenManyFilesViaTcp()100 public void testOpenManyFilesViaTcp() throws Exception { 101 final int NUM_OPENS = 500; 102 Configuration conf = new Configuration(); 103 conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); 104 MiniDFSCluster cluster = null; 105 FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS]; 106 try { 107 cluster = new MiniDFSCluster.Builder(conf).build(); 108 DistributedFileSystem dfs = cluster.getFileSystem(); 109 final Path TEST_PATH = new Path("/testFile"); 110 DFSTestUtil.createFile(dfs, TEST_PATH, 131072, (short)1, 1); 111 112 for (int i = 0; i < NUM_OPENS; i++) { 113 streams[i] = dfs.open(TEST_PATH); 114 LOG.info("opening file " + i + "..."); 115 Assert.assertTrue(-1 != streams[i].read()); 116 streams[i].unbuffer(); 117 } 118 } finally { 119 for (FSDataInputStream stream : streams) { 120 IOUtils.cleanup(null, stream); 121 } 122 if (cluster != null) { 123 cluster.shutdown(); 124 } 125 } 126 } 127 } 128