1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertSame;
22 import static org.junit.Assert.assertTrue;
23 
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.nio.channels.ReadableByteChannel;
28 
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hdfs.net.Peer;
32 import org.apache.hadoop.hdfs.protocol.DatanodeID;
33 import org.apache.hadoop.net.unix.DomainSocket;
34 import org.junit.Test;
35 import org.mockito.Mockito;
36 import org.mockito.invocation.InvocationOnMock;
37 import org.mockito.stubbing.Answer;
38 
39 import com.google.common.collect.HashMultiset;
40 
41 public class TestPeerCache {
42   static final Log LOG = LogFactory.getLog(TestPeerCache.class);
43 
44   private static class FakePeer implements Peer {
45     private boolean closed = false;
46     private final boolean hasDomain;
47 
48     private final DatanodeID dnId;
49 
FakePeer(DatanodeID dnId, boolean hasDomain)50     public FakePeer(DatanodeID dnId, boolean hasDomain) {
51       this.dnId = dnId;
52       this.hasDomain = hasDomain;
53     }
54 
55     @Override
getInputStreamChannel()56     public ReadableByteChannel getInputStreamChannel() {
57       throw new UnsupportedOperationException();
58     }
59 
60     @Override
setReadTimeout(int timeoutMs)61     public void setReadTimeout(int timeoutMs) throws IOException {
62       throw new UnsupportedOperationException();
63     }
64 
65     @Override
getReceiveBufferSize()66     public int getReceiveBufferSize() throws IOException {
67       throw new UnsupportedOperationException();
68     }
69 
70     @Override
getTcpNoDelay()71     public boolean getTcpNoDelay() throws IOException {
72       return false;
73     }
74 
75     @Override
setWriteTimeout(int timeoutMs)76     public void setWriteTimeout(int timeoutMs) throws IOException {
77       throw new UnsupportedOperationException();
78     }
79 
80     @Override
isClosed()81     public boolean isClosed() {
82       return closed;
83     }
84 
85     @Override
close()86     public void close() throws IOException {
87       closed = true;
88     }
89 
90     @Override
getRemoteAddressString()91     public String getRemoteAddressString() {
92       return dnId.getInfoAddr();
93     }
94 
95     @Override
getLocalAddressString()96     public String getLocalAddressString() {
97       return "127.0.0.1:123";
98     }
99 
100     @Override
getInputStream()101     public InputStream getInputStream() throws IOException {
102       throw new UnsupportedOperationException();
103     }
104 
105     @Override
getOutputStream()106     public OutputStream getOutputStream() throws IOException {
107       throw new UnsupportedOperationException();
108     }
109 
110     @Override
isLocal()111     public boolean isLocal() {
112       return true;
113     }
114 
115     @Override
toString()116     public String toString() {
117       return "FakePeer(dnId=" + dnId + ")";
118     }
119 
120     @Override
getDomainSocket()121     public DomainSocket getDomainSocket() {
122       if (!hasDomain) return null;
123       // Return a mock which throws an exception whenever any function is
124       // called.
125       return Mockito.mock(DomainSocket.class,
126           new Answer<Object>() {
127             @Override
128             public Object answer(InvocationOnMock invocation)
129                 throws Throwable {
130               throw new RuntimeException("injected fault.");
131           } });
132     }
133 
134     @Override
equals(Object o)135     public boolean equals(Object o) {
136       if (!(o instanceof FakePeer)) return false;
137       FakePeer other = (FakePeer)o;
138       return hasDomain == other.hasDomain &&
139           dnId.equals(other.dnId);
140     }
141 
142     @Override
hashCode()143     public int hashCode() {
144       return dnId.hashCode() ^ (hasDomain ? 1 : 0);
145     }
146 
147     @Override
hasSecureChannel()148     public boolean hasSecureChannel() {
149       return false;
150     }
151   }
152 
153   @Test
154   public void testAddAndRetrieve() throws Exception {
155     PeerCache cache = new PeerCache(3, 100000);
156     DatanodeID dnId = new DatanodeID("192.168.0.1",
157           "fakehostname", "fake_datanode_id",
158           100, 101, 102, 103);
159     FakePeer peer = new FakePeer(dnId, false);
160     cache.put(dnId, peer);
161     assertTrue(!peer.isClosed());
162     assertEquals(1, cache.size());
163     assertEquals(peer, cache.get(dnId, false));
164     assertEquals(0, cache.size());
165     cache.close();
166   }
167 
168   @Test
169   public void testExpiry() throws Exception {
170     final int CAPACITY = 3;
171     final int EXPIRY_PERIOD = 10;
172     PeerCache cache = new PeerCache(CAPACITY, EXPIRY_PERIOD);
173     DatanodeID dnIds[] = new DatanodeID[CAPACITY];
174     FakePeer peers[] = new FakePeer[CAPACITY];
175     for (int i = 0; i < CAPACITY; ++i) {
176       dnIds[i] = new DatanodeID("192.168.0.1",
177           "fakehostname_" + i, "fake_datanode_id",
178           100, 101, 102, 103);
179       peers[i] = new FakePeer(dnIds[i], false);
180     }
181     for (int i = 0; i < CAPACITY; ++i) {
182       cache.put(dnIds[i], peers[i]);
183     }
184 
185     // Wait for the peers to expire
186     Thread.sleep(EXPIRY_PERIOD * 50);
187     assertEquals(0, cache.size());
188 
189     // make sure that the peers were closed when they were expired
190     for (int i = 0; i < CAPACITY; ++i) {
191       assertTrue(peers[i].isClosed());
192     }
193 
194     // sleep for another second and see if
195     // the daemon thread runs fine on empty cache
196     Thread.sleep(EXPIRY_PERIOD * 50);
197     cache.close();
198   }
199 
200   @Test
201   public void testEviction() throws Exception {
202     final int CAPACITY = 3;
203     PeerCache cache = new PeerCache(CAPACITY, 100000);
204     DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1];
205     FakePeer peers[] = new FakePeer[CAPACITY + 1];
206     for (int i = 0; i < dnIds.length; ++i) {
207       dnIds[i] = new DatanodeID("192.168.0.1",
208           "fakehostname_" + i, "fake_datanode_id_" + i,
209           100, 101, 102, 103);
210       peers[i] = new FakePeer(dnIds[i], false);
211     }
212     for (int i = 0; i < CAPACITY; ++i) {
213       cache.put(dnIds[i], peers[i]);
214     }
215     // Check that the peers are cached
216     assertEquals(CAPACITY, cache.size());
217 
218     // Add another entry and check that the first entry was evicted
219     cache.put(dnIds[CAPACITY], peers[CAPACITY]);
220     assertEquals(CAPACITY, cache.size());
221     assertSame(null, cache.get(dnIds[0], false));
222 
223     // Make sure that the other entries are still there
224     for (int i = 1; i < CAPACITY; ++i) {
225       Peer peer = cache.get(dnIds[i], false);
226       assertSame(peers[i], peer);
227       assertTrue(!peer.isClosed());
228       peer.close();
229     }
230     assertEquals(1, cache.size());
231     cache.close();
232   }
233 
234   @Test
235   public void testMultiplePeersWithSameKey() throws Exception {
236     final int CAPACITY = 3;
237     PeerCache cache = new PeerCache(CAPACITY, 100000);
238     DatanodeID dnId = new DatanodeID("192.168.0.1",
239           "fakehostname", "fake_datanode_id",
240           100, 101, 102, 103);
241     HashMultiset<FakePeer> peers = HashMultiset.create(CAPACITY);
242     for (int i = 0; i < CAPACITY; ++i) {
243       FakePeer peer = new FakePeer(dnId, false);
244       peers.add(peer);
245       cache.put(dnId, peer);
246     }
247     // Check that all of the peers ended up in the cache
248     assertEquals(CAPACITY, cache.size());
249     while (!peers.isEmpty()) {
250       Peer peer = cache.get(dnId, false);
251       assertTrue(peer != null);
252       assertTrue(!peer.isClosed());
253       peers.remove(peer);
254     }
255     assertEquals(0, cache.size());
256     cache.close();
257   }
258 
259   @Test
260   public void testDomainSocketPeers() throws Exception {
261     final int CAPACITY = 3;
262     PeerCache cache = new PeerCache(CAPACITY, 100000);
263     DatanodeID dnId = new DatanodeID("192.168.0.1",
264           "fakehostname", "fake_datanode_id",
265           100, 101, 102, 103);
266     HashMultiset<FakePeer> peers = HashMultiset.create(CAPACITY);
267     for (int i = 0; i < CAPACITY; ++i) {
268       FakePeer peer = new FakePeer(dnId, i == CAPACITY - 1);
269       peers.add(peer);
270       cache.put(dnId, peer);
271     }
272     // Check that all of the peers ended up in the cache
273     assertEquals(CAPACITY, cache.size());
274     // Test that get(requireDomainPeer=true) finds the peer with the
275     // domain socket.
276     Peer peer = cache.get(dnId, true);
277     assertTrue(peer.getDomainSocket() != null);
278     peers.remove(peer);
279     // Test that get(requireDomainPeer=true) returns null when there are
280     // no more peers with domain sockets.
281     peer = cache.get(dnId, true);
282     assertTrue(peer == null);
283     // Check that all of the other peers ended up in the cache.
284     while (!peers.isEmpty()) {
285       peer = cache.get(dnId, false);
286       assertTrue(peer != null);
287       assertTrue(!peer.isClosed());
288       peers.remove(peer);
289     }
290     assertEquals(0, cache.size());
291     cache.close();
292   }
293 }
294