1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.fs;
19 
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.hadoop.fs.FSDataInputStream;
24 import org.apache.hadoop.hdfs.DFSConfigKeys;
25 import org.apache.hadoop.hdfs.DFSTestUtil;
26 import org.apache.hadoop.hdfs.DistributedFileSystem;
27 import org.apache.hadoop.hdfs.MiniDFSCluster;
28 import org.apache.hadoop.hdfs.PeerCache;
29 import org.apache.hadoop.io.IOUtils;
30 import org.junit.Assert;
31 import org.junit.Test;
32 
33 public class TestUnbuffer {
34   private static final Log LOG =
35       LogFactory.getLog(TestUnbuffer.class.getName());
36 
37   /**
38    * Test that calling Unbuffer closes sockets.
39    */
40   @Test
testUnbufferClosesSockets()41   public void testUnbufferClosesSockets() throws Exception {
42     Configuration conf = new Configuration();
43     // Set a new ClientContext.  This way, we will have our own PeerCache,
44     // rather than sharing one with other unit tests.
45     conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
46         "testUnbufferClosesSocketsContext");
47 
48     // Disable short-circuit reads.  With short-circuit, we wouldn't hold open a
49     // TCP socket.
50     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
51 
52     // Set a really long socket timeout to avoid test timing issues.
53     conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
54         100000000L);
55     conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
56         100000000L);
57 
58     MiniDFSCluster cluster = null;
59     FSDataInputStream stream = null;
60     try {
61       cluster = new MiniDFSCluster.Builder(conf).build();
62       DistributedFileSystem dfs = (DistributedFileSystem)
63           FileSystem.newInstance(conf);
64       final Path TEST_PATH = new Path("/test1");
65       DFSTestUtil.createFile(dfs, TEST_PATH, 128, (short)1, 1);
66       stream = dfs.open(TEST_PATH);
67       // Read a byte.  This will trigger the creation of a block reader.
68       stream.seek(2);
69       int b = stream.read();
70       Assert.assertTrue(-1 != b);
71 
72       // The Peer cache should start off empty.
73       PeerCache cache = dfs.getClient().getClientContext().getPeerCache();
74       Assert.assertEquals(0, cache.size());
75 
76       // Unbuffer should clear the block reader and return the socket to the
77       // cache.
78       stream.unbuffer();
79       stream.seek(2);
80       Assert.assertEquals(1, cache.size());
81       int b2 = stream.read();
82       Assert.assertEquals(b, b2);
83     } finally {
84       if (stream != null) {
85         IOUtils.cleanup(null, stream);
86       }
87       if (cluster != null) {
88         cluster.shutdown();
89       }
90     }
91   }
92 
93   /**
94    * Test opening many files via TCP (not short-circuit).
95    *
96    * This is practical when using unbuffer, because it reduces the number of
97    * sockets and amount of memory that we use.
98    */
99   @Test
testOpenManyFilesViaTcp()100   public void testOpenManyFilesViaTcp() throws Exception {
101     final int NUM_OPENS = 500;
102     Configuration conf = new Configuration();
103     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
104     MiniDFSCluster cluster = null;
105     FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS];
106     try {
107       cluster = new MiniDFSCluster.Builder(conf).build();
108       DistributedFileSystem dfs = cluster.getFileSystem();
109       final Path TEST_PATH = new Path("/testFile");
110       DFSTestUtil.createFile(dfs, TEST_PATH, 131072, (short)1, 1);
111 
112       for (int i = 0; i < NUM_OPENS; i++) {
113         streams[i] = dfs.open(TEST_PATH);
114         LOG.info("opening file " + i + "...");
115         Assert.assertTrue(-1 != streams[i].read());
116         streams[i].unbuffer();
117       }
118     } finally {
119       for (FSDataInputStream stream : streams) {
120         IOUtils.cleanup(null, stream);
121       }
122       if (cluster != null) {
123         cluster.shutdown();
124       }
125     }
126   }
127 }
128