1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.hbase.replication;
20 
21 import static org.junit.Assert.*;
22 
23 import java.util.List;
24 import java.util.SortedMap;
25 import java.util.SortedSet;
26 
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.ServerName;
30 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
31 import org.apache.zookeeper.KeeperException;
32 import org.junit.Before;
33 import org.junit.Test;
34 
35 /**
36  * White box testing for replication state interfaces. Implementations should extend this class, and
37  * initialize the interfaces properly.
38  */
39 public abstract class TestReplicationStateBasic {
40 
41   protected ReplicationQueues rq1;
42   protected ReplicationQueues rq2;
43   protected ReplicationQueues rq3;
44   protected ReplicationQueuesClient rqc;
45   protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString();
46   protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, -1L).toString();
47   protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, -1L).toString();
48   protected ReplicationPeers rp;
49   protected static final String ID_ONE = "1";
50   protected static final String ID_TWO = "2";
51   protected static String KEY_ONE;
52   protected static String KEY_TWO;
53 
54   // For testing when we try to replicate to ourself
55   protected String OUR_ID = "3";
56   protected String OUR_KEY;
57 
58   protected static int zkTimeoutCount;
59   protected static final int ZK_MAX_COUNT = 300;
60   protected static final int ZK_SLEEP_INTERVAL = 100; // millis
61 
62   private static final Log LOG = LogFactory.getLog(TestReplicationStateBasic.class);
63 
64   @Before
setUp()65   public void setUp() {
66     zkTimeoutCount = 0;
67   }
68 
69   @Test
testReplicationQueuesClient()70   public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
71     rqc.init();
72     // Test methods with empty state
73     assertEquals(0, rqc.getListOfReplicators().size());
74     assertNull(rqc.getLogsInQueue(server1, "qId1"));
75     assertNull(rqc.getAllQueues(server1));
76 
77     /*
78      * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
79      * server2: zero queues
80      */
81     rq1.init(server1);
82     rq2.init(server2);
83     rq1.addLog("qId1", "trash");
84     rq1.removeLog("qId1", "trash");
85     rq1.addLog("qId2", "filename1");
86     rq1.addLog("qId3", "filename2");
87     rq1.addLog("qId3", "filename3");
88     rq2.addLog("trash", "trash");
89     rq2.removeQueue("trash");
90 
91     List<String> reps = rqc.getListOfReplicators();
92     assertEquals(2, reps.size());
93     assertTrue(server1, reps.contains(server1));
94     assertTrue(server2, reps.contains(server2));
95 
96     assertNull(rqc.getLogsInQueue("bogus", "bogus"));
97     assertNull(rqc.getLogsInQueue(server1, "bogus"));
98     assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size());
99     assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size());
100     assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0));
101 
102     assertNull(rqc.getAllQueues("bogus"));
103     assertEquals(0, rqc.getAllQueues(server2).size());
104     List<String> list = rqc.getAllQueues(server1);
105     assertEquals(3, list.size());
106     assertTrue(list.contains("qId2"));
107     assertTrue(list.contains("qId3"));
108   }
109 
110   @Test
testReplicationQueues()111   public void testReplicationQueues() throws ReplicationException {
112     rq1.init(server1);
113     rq2.init(server2);
114     rq3.init(server3);
115     //Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
116     rp.init();
117 
118     // 3 replicators should exist
119     assertEquals(3, rq1.getListOfReplicators().size());
120     rq1.removeQueue("bogus");
121     rq1.removeLog("bogus", "bogus");
122     rq1.removeAllQueues();
123     assertNull(rq1.getAllQueues());
124     assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
125     assertNull(rq1.getLogsInQueue("bogus"));
126     assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size());
127 
128     rq1.setLogPosition("bogus", "bogus", 5L);
129 
130     populateQueues();
131 
132     assertEquals(3, rq1.getListOfReplicators().size());
133     assertEquals(0, rq2.getLogsInQueue("qId1").size());
134     assertEquals(5, rq3.getLogsInQueue("qId5").size());
135     assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
136     rq3.setLogPosition("qId5", "filename4", 354L);
137     assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
138 
139     assertEquals(5, rq3.getLogsInQueue("qId5").size());
140     assertEquals(0, rq2.getLogsInQueue("qId1").size());
141     assertEquals(0, rq1.getAllQueues().size());
142     assertEquals(1, rq2.getAllQueues().size());
143     assertEquals(5, rq3.getAllQueues().size());
144 
145     assertEquals(0, rq3.claimQueues(server1).size());
146     assertEquals(2, rq3.getListOfReplicators().size());
147 
148     SortedMap<String, SortedSet<String>> queues = rq2.claimQueues(server3);
149     assertEquals(5, queues.size());
150     assertEquals(1, rq2.getListOfReplicators().size());
151 
152     // Try to claim our own queues
153     assertEquals(0, rq2.claimQueues(server2).size());
154 
155     assertEquals(6, rq2.getAllQueues().size());
156 
157     rq2.removeAllQueues();
158 
159     assertEquals(0, rq2.getListOfReplicators().size());
160   }
161 
162   @Test
testReplicationPeers()163   public void testReplicationPeers() throws Exception {
164     rp.init();
165 
166     // Test methods with non-existent peer ids
167     try {
168       rp.removePeer("bogus");
169       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
170     } catch (IllegalArgumentException e) {
171     }
172     try {
173       rp.enablePeer("bogus");
174       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
175     } catch (IllegalArgumentException e) {
176     }
177     try {
178       rp.disablePeer("bogus");
179       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
180     } catch (IllegalArgumentException e) {
181     }
182     try {
183       rp.getStatusOfPeer("bogus");
184       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
185     } catch (IllegalArgumentException e) {
186     }
187     assertFalse(rp.peerAdded("bogus"));
188     rp.peerRemoved("bogus");
189 
190     assertNull(rp.getPeerConf("bogus"));
191     assertNumberOfPeers(0);
192 
193     // Add some peers
194     rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
195     assertNumberOfPeers(1);
196     rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
197     assertNumberOfPeers(2);
198 
199     // Test methods with a peer that is added but not connected
200     try {
201       rp.getStatusOfPeer(ID_ONE);
202       fail("There are no connected peers, should have thrown an IllegalArgumentException");
203     } catch (IllegalArgumentException e) {
204     }
205     assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
206     rp.removePeer(ID_ONE);
207     rp.peerRemoved(ID_ONE);
208     assertNumberOfPeers(1);
209 
210     // Add one peer
211     rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
212     rp.peerAdded(ID_ONE);
213     assertNumberOfPeers(2);
214     assertTrue(rp.getStatusOfPeer(ID_ONE));
215     rp.disablePeer(ID_ONE);
216     assertConnectedPeerStatus(false, ID_ONE);
217     rp.enablePeer(ID_ONE);
218     assertConnectedPeerStatus(true, ID_ONE);
219 
220     // Disconnect peer
221     rp.peerRemoved(ID_ONE);
222     assertNumberOfPeers(2);
223     try {
224       rp.getStatusOfPeer(ID_ONE);
225       fail("There are no connected peers, should have thrown an IllegalArgumentException");
226     } catch (IllegalArgumentException e) {
227     }
228   }
229 
assertConnectedPeerStatus(boolean status, String peerId)230   protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
231     // we can first check if the value was changed in the store, if it wasn't then fail right away
232     if (status != rp.getStatusOfPeerFromBackingStore(peerId)) {
233       fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
234     }
235     while (true) {
236       if (status == rp.getStatusOfPeer(peerId)) {
237         return;
238       }
239       if (zkTimeoutCount < ZK_MAX_COUNT) {
240         LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
241             + ", sleeping and trying again.");
242         Thread.sleep(ZK_SLEEP_INTERVAL);
243       } else {
244         fail("Timed out waiting for ConnectedPeerStatus to be " + status);
245       }
246     }
247   }
248 
assertNumberOfPeers(int total)249   protected void assertNumberOfPeers(int total) {
250     assertEquals(total, rp.getAllPeerConfigs().size());
251     assertEquals(total, rp.getAllPeerIds().size());
252     assertEquals(total, rp.getAllPeerIds().size());
253   }
254 
255   /*
256    * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
257    * 3, 4, 5 log files respectively
258    */
populateQueues()259   protected void populateQueues() throws ReplicationException {
260     rq1.addLog("trash", "trash");
261     rq1.removeQueue("trash");
262 
263     rq2.addLog("qId1", "trash");
264     rq2.removeLog("qId1", "trash");
265 
266     for (int i = 1; i < 6; i++) {
267       for (int j = 0; j < i; j++) {
268         rq3.addLog("qId" + i, "filename" + j);
269       }
270       //Add peers for the corresponding queues so they are not orphans
271       rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("bogus" + i), null);
272     }
273   }
274 }
275 
276