1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import static org.junit.Assert.assertEquals; 21 import static org.junit.Assert.assertSame; 22 import static org.junit.Assert.assertTrue; 23 24 import java.io.IOException; 25 import java.io.InputStream; 26 import java.io.OutputStream; 27 import java.nio.channels.ReadableByteChannel; 28 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.hadoop.hdfs.net.Peer; 32 import org.apache.hadoop.hdfs.protocol.DatanodeID; 33 import org.apache.hadoop.net.unix.DomainSocket; 34 import org.junit.Test; 35 import org.mockito.Mockito; 36 import org.mockito.invocation.InvocationOnMock; 37 import org.mockito.stubbing.Answer; 38 39 import com.google.common.collect.HashMultiset; 40 41 public class TestPeerCache { 42 static final Log LOG = LogFactory.getLog(TestPeerCache.class); 43 44 private static class FakePeer implements Peer { 45 private boolean closed = false; 46 private final boolean hasDomain; 47 48 private final DatanodeID dnId; 49 FakePeer(DatanodeID dnId, boolean hasDomain)50 public FakePeer(DatanodeID dnId, boolean hasDomain) { 51 this.dnId = dnId; 52 this.hasDomain = hasDomain; 53 } 54 55 @Override getInputStreamChannel()56 public ReadableByteChannel getInputStreamChannel() { 57 throw new UnsupportedOperationException(); 58 } 59 60 @Override setReadTimeout(int timeoutMs)61 public void setReadTimeout(int timeoutMs) throws IOException { 62 throw new UnsupportedOperationException(); 63 } 64 65 @Override getReceiveBufferSize()66 public int getReceiveBufferSize() throws IOException { 67 throw new UnsupportedOperationException(); 68 } 69 70 @Override getTcpNoDelay()71 public boolean getTcpNoDelay() throws IOException { 72 return false; 73 } 74 75 @Override setWriteTimeout(int timeoutMs)76 public void setWriteTimeout(int timeoutMs) throws IOException { 77 throw new UnsupportedOperationException(); 78 } 79 80 @Override isClosed()81 public boolean isClosed() { 82 return closed; 83 } 84 85 @Override close()86 public void close() throws IOException { 87 closed = true; 88 } 89 90 @Override getRemoteAddressString()91 public String getRemoteAddressString() { 92 return dnId.getInfoAddr(); 93 } 94 95 @Override getLocalAddressString()96 public String getLocalAddressString() { 97 return "127.0.0.1:123"; 98 } 99 100 @Override getInputStream()101 public InputStream getInputStream() throws IOException { 102 throw new UnsupportedOperationException(); 103 } 104 105 @Override getOutputStream()106 public OutputStream getOutputStream() throws IOException { 107 throw new UnsupportedOperationException(); 108 } 109 110 @Override isLocal()111 public boolean isLocal() { 112 return true; 113 } 114 115 @Override toString()116 public String toString() { 117 return "FakePeer(dnId=" + dnId + ")"; 118 } 119 120 @Override getDomainSocket()121 public DomainSocket getDomainSocket() { 122 if (!hasDomain) return null; 123 // Return a mock which throws an exception whenever any function is 124 // called. 125 return Mockito.mock(DomainSocket.class, 126 new Answer<Object>() { 127 @Override 128 public Object answer(InvocationOnMock invocation) 129 throws Throwable { 130 throw new RuntimeException("injected fault."); 131 } }); 132 } 133 134 @Override equals(Object o)135 public boolean equals(Object o) { 136 if (!(o instanceof FakePeer)) return false; 137 FakePeer other = (FakePeer)o; 138 return hasDomain == other.hasDomain && 139 dnId.equals(other.dnId); 140 } 141 142 @Override hashCode()143 public int hashCode() { 144 return dnId.hashCode() ^ (hasDomain ? 1 : 0); 145 } 146 147 @Override hasSecureChannel()148 public boolean hasSecureChannel() { 149 return false; 150 } 151 } 152 153 @Test 154 public void testAddAndRetrieve() throws Exception { 155 PeerCache cache = new PeerCache(3, 100000); 156 DatanodeID dnId = new DatanodeID("192.168.0.1", 157 "fakehostname", "fake_datanode_id", 158 100, 101, 102, 103); 159 FakePeer peer = new FakePeer(dnId, false); 160 cache.put(dnId, peer); 161 assertTrue(!peer.isClosed()); 162 assertEquals(1, cache.size()); 163 assertEquals(peer, cache.get(dnId, false)); 164 assertEquals(0, cache.size()); 165 cache.close(); 166 } 167 168 @Test 169 public void testExpiry() throws Exception { 170 final int CAPACITY = 3; 171 final int EXPIRY_PERIOD = 10; 172 PeerCache cache = new PeerCache(CAPACITY, EXPIRY_PERIOD); 173 DatanodeID dnIds[] = new DatanodeID[CAPACITY]; 174 FakePeer peers[] = new FakePeer[CAPACITY]; 175 for (int i = 0; i < CAPACITY; ++i) { 176 dnIds[i] = new DatanodeID("192.168.0.1", 177 "fakehostname_" + i, "fake_datanode_id", 178 100, 101, 102, 103); 179 peers[i] = new FakePeer(dnIds[i], false); 180 } 181 for (int i = 0; i < CAPACITY; ++i) { 182 cache.put(dnIds[i], peers[i]); 183 } 184 185 // Wait for the peers to expire 186 Thread.sleep(EXPIRY_PERIOD * 50); 187 assertEquals(0, cache.size()); 188 189 // make sure that the peers were closed when they were expired 190 for (int i = 0; i < CAPACITY; ++i) { 191 assertTrue(peers[i].isClosed()); 192 } 193 194 // sleep for another second and see if 195 // the daemon thread runs fine on empty cache 196 Thread.sleep(EXPIRY_PERIOD * 50); 197 cache.close(); 198 } 199 200 @Test 201 public void testEviction() throws Exception { 202 final int CAPACITY = 3; 203 PeerCache cache = new PeerCache(CAPACITY, 100000); 204 DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1]; 205 FakePeer peers[] = new FakePeer[CAPACITY + 1]; 206 for (int i = 0; i < dnIds.length; ++i) { 207 dnIds[i] = new DatanodeID("192.168.0.1", 208 "fakehostname_" + i, "fake_datanode_id_" + i, 209 100, 101, 102, 103); 210 peers[i] = new FakePeer(dnIds[i], false); 211 } 212 for (int i = 0; i < CAPACITY; ++i) { 213 cache.put(dnIds[i], peers[i]); 214 } 215 // Check that the peers are cached 216 assertEquals(CAPACITY, cache.size()); 217 218 // Add another entry and check that the first entry was evicted 219 cache.put(dnIds[CAPACITY], peers[CAPACITY]); 220 assertEquals(CAPACITY, cache.size()); 221 assertSame(null, cache.get(dnIds[0], false)); 222 223 // Make sure that the other entries are still there 224 for (int i = 1; i < CAPACITY; ++i) { 225 Peer peer = cache.get(dnIds[i], false); 226 assertSame(peers[i], peer); 227 assertTrue(!peer.isClosed()); 228 peer.close(); 229 } 230 assertEquals(1, cache.size()); 231 cache.close(); 232 } 233 234 @Test 235 public void testMultiplePeersWithSameKey() throws Exception { 236 final int CAPACITY = 3; 237 PeerCache cache = new PeerCache(CAPACITY, 100000); 238 DatanodeID dnId = new DatanodeID("192.168.0.1", 239 "fakehostname", "fake_datanode_id", 240 100, 101, 102, 103); 241 HashMultiset<FakePeer> peers = HashMultiset.create(CAPACITY); 242 for (int i = 0; i < CAPACITY; ++i) { 243 FakePeer peer = new FakePeer(dnId, false); 244 peers.add(peer); 245 cache.put(dnId, peer); 246 } 247 // Check that all of the peers ended up in the cache 248 assertEquals(CAPACITY, cache.size()); 249 while (!peers.isEmpty()) { 250 Peer peer = cache.get(dnId, false); 251 assertTrue(peer != null); 252 assertTrue(!peer.isClosed()); 253 peers.remove(peer); 254 } 255 assertEquals(0, cache.size()); 256 cache.close(); 257 } 258 259 @Test 260 public void testDomainSocketPeers() throws Exception { 261 final int CAPACITY = 3; 262 PeerCache cache = new PeerCache(CAPACITY, 100000); 263 DatanodeID dnId = new DatanodeID("192.168.0.1", 264 "fakehostname", "fake_datanode_id", 265 100, 101, 102, 103); 266 HashMultiset<FakePeer> peers = HashMultiset.create(CAPACITY); 267 for (int i = 0; i < CAPACITY; ++i) { 268 FakePeer peer = new FakePeer(dnId, i == CAPACITY - 1); 269 peers.add(peer); 270 cache.put(dnId, peer); 271 } 272 // Check that all of the peers ended up in the cache 273 assertEquals(CAPACITY, cache.size()); 274 // Test that get(requireDomainPeer=true) finds the peer with the 275 // domain socket. 276 Peer peer = cache.get(dnId, true); 277 assertTrue(peer.getDomainSocket() != null); 278 peers.remove(peer); 279 // Test that get(requireDomainPeer=true) returns null when there are 280 // no more peers with domain sockets. 281 peer = cache.get(dnId, true); 282 assertTrue(peer == null); 283 // Check that all of the other peers ended up in the cache. 284 while (!peers.isEmpty()) { 285 peer = cache.get(dnId, false); 286 assertTrue(peer != null); 287 assertTrue(!peer.isClosed()); 288 peers.remove(peer); 289 } 290 assertEquals(0, cache.size()); 291 cache.close(); 292 } 293 } 294