1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hbase.replication; 20 21 import static org.junit.Assert.*; 22 23 import java.util.List; 24 import java.util.SortedMap; 25 import java.util.SortedSet; 26 27 import org.apache.commons.logging.Log; 28 import org.apache.commons.logging.LogFactory; 29 import org.apache.hadoop.hbase.ServerName; 30 import org.apache.hadoop.hbase.zookeeper.ZKConfig; 31 import org.apache.zookeeper.KeeperException; 32 import org.junit.Before; 33 import org.junit.Test; 34 35 /** 36 * White box testing for replication state interfaces. Implementations should extend this class, and 37 * initialize the interfaces properly. 38 */ 39 public abstract class TestReplicationStateBasic { 40 41 protected ReplicationQueues rq1; 42 protected ReplicationQueues rq2; 43 protected ReplicationQueues rq3; 44 protected ReplicationQueuesClient rqc; 45 protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString(); 46 protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, -1L).toString(); 47 protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, -1L).toString(); 48 protected ReplicationPeers rp; 49 protected static final String ID_ONE = "1"; 50 protected static final String ID_TWO = "2"; 51 protected static String KEY_ONE; 52 protected static String KEY_TWO; 53 54 // For testing when we try to replicate to ourself 55 protected String OUR_ID = "3"; 56 protected String OUR_KEY; 57 58 protected static int zkTimeoutCount; 59 protected static final int ZK_MAX_COUNT = 300; 60 protected static final int ZK_SLEEP_INTERVAL = 100; // millis 61 62 private static final Log LOG = LogFactory.getLog(TestReplicationStateBasic.class); 63 64 @Before setUp()65 public void setUp() { 66 zkTimeoutCount = 0; 67 } 68 69 @Test testReplicationQueuesClient()70 public void testReplicationQueuesClient() throws ReplicationException, KeeperException { 71 rqc.init(); 72 // Test methods with empty state 73 assertEquals(0, rqc.getListOfReplicators().size()); 74 assertNull(rqc.getLogsInQueue(server1, "qId1")); 75 assertNull(rqc.getAllQueues(server1)); 76 77 /* 78 * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- 79 * server2: zero queues 80 */ 81 rq1.init(server1); 82 rq2.init(server2); 83 rq1.addLog("qId1", "trash"); 84 rq1.removeLog("qId1", "trash"); 85 rq1.addLog("qId2", "filename1"); 86 rq1.addLog("qId3", "filename2"); 87 rq1.addLog("qId3", "filename3"); 88 rq2.addLog("trash", "trash"); 89 rq2.removeQueue("trash"); 90 91 List<String> reps = rqc.getListOfReplicators(); 92 assertEquals(2, reps.size()); 93 assertTrue(server1, reps.contains(server1)); 94 assertTrue(server2, reps.contains(server2)); 95 96 assertNull(rqc.getLogsInQueue("bogus", "bogus")); 97 assertNull(rqc.getLogsInQueue(server1, "bogus")); 98 assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size()); 99 assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size()); 100 assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0)); 101 102 assertNull(rqc.getAllQueues("bogus")); 103 assertEquals(0, rqc.getAllQueues(server2).size()); 104 List<String> list = rqc.getAllQueues(server1); 105 assertEquals(3, list.size()); 106 assertTrue(list.contains("qId2")); 107 assertTrue(list.contains("qId3")); 108 } 109 110 @Test testReplicationQueues()111 public void testReplicationQueues() throws ReplicationException { 112 rq1.init(server1); 113 rq2.init(server2); 114 rq3.init(server3); 115 //Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) 116 rp.init(); 117 118 // 3 replicators should exist 119 assertEquals(3, rq1.getListOfReplicators().size()); 120 rq1.removeQueue("bogus"); 121 rq1.removeLog("bogus", "bogus"); 122 rq1.removeAllQueues(); 123 assertNull(rq1.getAllQueues()); 124 assertEquals(0, rq1.getLogPosition("bogus", "bogus")); 125 assertNull(rq1.getLogsInQueue("bogus")); 126 assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size()); 127 128 rq1.setLogPosition("bogus", "bogus", 5L); 129 130 populateQueues(); 131 132 assertEquals(3, rq1.getListOfReplicators().size()); 133 assertEquals(0, rq2.getLogsInQueue("qId1").size()); 134 assertEquals(5, rq3.getLogsInQueue("qId5").size()); 135 assertEquals(0, rq3.getLogPosition("qId1", "filename0")); 136 rq3.setLogPosition("qId5", "filename4", 354L); 137 assertEquals(354L, rq3.getLogPosition("qId5", "filename4")); 138 139 assertEquals(5, rq3.getLogsInQueue("qId5").size()); 140 assertEquals(0, rq2.getLogsInQueue("qId1").size()); 141 assertEquals(0, rq1.getAllQueues().size()); 142 assertEquals(1, rq2.getAllQueues().size()); 143 assertEquals(5, rq3.getAllQueues().size()); 144 145 assertEquals(0, rq3.claimQueues(server1).size()); 146 assertEquals(2, rq3.getListOfReplicators().size()); 147 148 SortedMap<String, SortedSet<String>> queues = rq2.claimQueues(server3); 149 assertEquals(5, queues.size()); 150 assertEquals(1, rq2.getListOfReplicators().size()); 151 152 // Try to claim our own queues 153 assertEquals(0, rq2.claimQueues(server2).size()); 154 155 assertEquals(6, rq2.getAllQueues().size()); 156 157 rq2.removeAllQueues(); 158 159 assertEquals(0, rq2.getListOfReplicators().size()); 160 } 161 162 @Test testReplicationPeers()163 public void testReplicationPeers() throws Exception { 164 rp.init(); 165 166 // Test methods with non-existent peer ids 167 try { 168 rp.removePeer("bogus"); 169 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 170 } catch (IllegalArgumentException e) { 171 } 172 try { 173 rp.enablePeer("bogus"); 174 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 175 } catch (IllegalArgumentException e) { 176 } 177 try { 178 rp.disablePeer("bogus"); 179 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 180 } catch (IllegalArgumentException e) { 181 } 182 try { 183 rp.getStatusOfPeer("bogus"); 184 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); 185 } catch (IllegalArgumentException e) { 186 } 187 assertFalse(rp.peerAdded("bogus")); 188 rp.peerRemoved("bogus"); 189 190 assertNull(rp.getPeerConf("bogus")); 191 assertNumberOfPeers(0); 192 193 // Add some peers 194 rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null); 195 assertNumberOfPeers(1); 196 rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null); 197 assertNumberOfPeers(2); 198 199 // Test methods with a peer that is added but not connected 200 try { 201 rp.getStatusOfPeer(ID_ONE); 202 fail("There are no connected peers, should have thrown an IllegalArgumentException"); 203 } catch (IllegalArgumentException e) { 204 } 205 assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond())); 206 rp.removePeer(ID_ONE); 207 rp.peerRemoved(ID_ONE); 208 assertNumberOfPeers(1); 209 210 // Add one peer 211 rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null); 212 rp.peerAdded(ID_ONE); 213 assertNumberOfPeers(2); 214 assertTrue(rp.getStatusOfPeer(ID_ONE)); 215 rp.disablePeer(ID_ONE); 216 assertConnectedPeerStatus(false, ID_ONE); 217 rp.enablePeer(ID_ONE); 218 assertConnectedPeerStatus(true, ID_ONE); 219 220 // Disconnect peer 221 rp.peerRemoved(ID_ONE); 222 assertNumberOfPeers(2); 223 try { 224 rp.getStatusOfPeer(ID_ONE); 225 fail("There are no connected peers, should have thrown an IllegalArgumentException"); 226 } catch (IllegalArgumentException e) { 227 } 228 } 229 assertConnectedPeerStatus(boolean status, String peerId)230 protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { 231 // we can first check if the value was changed in the store, if it wasn't then fail right away 232 if (status != rp.getStatusOfPeerFromBackingStore(peerId)) { 233 fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); 234 } 235 while (true) { 236 if (status == rp.getStatusOfPeer(peerId)) { 237 return; 238 } 239 if (zkTimeoutCount < ZK_MAX_COUNT) { 240 LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status 241 + ", sleeping and trying again."); 242 Thread.sleep(ZK_SLEEP_INTERVAL); 243 } else { 244 fail("Timed out waiting for ConnectedPeerStatus to be " + status); 245 } 246 } 247 } 248 assertNumberOfPeers(int total)249 protected void assertNumberOfPeers(int total) { 250 assertEquals(total, rp.getAllPeerConfigs().size()); 251 assertEquals(total, rp.getAllPeerIds().size()); 252 assertEquals(total, rp.getAllPeerIds().size()); 253 } 254 255 /* 256 * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2, 257 * 3, 4, 5 log files respectively 258 */ populateQueues()259 protected void populateQueues() throws ReplicationException { 260 rq1.addLog("trash", "trash"); 261 rq1.removeQueue("trash"); 262 263 rq2.addLog("qId1", "trash"); 264 rq2.removeLog("qId1", "trash"); 265 266 for (int i = 1; i < 6; i++) { 267 for (int j = 0; j < i; j++) { 268 rq3.addLog("qId" + i, "filename" + j); 269 } 270 //Add peers for the corresponding queues so they are not orphans 271 rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("bogus" + i), null); 272 } 273 } 274 } 275 276