1 /*
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.apache.hadoop.hbase.replication;
20 
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Abortable;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
34 import org.apache.hadoop.hbase.exceptions.DeserializationException;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
39 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
40 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41 import org.apache.zookeeper.KeeperException;
42 import org.apache.zookeeper.KeeperException.NodeExistsException;
43 
44 @InterfaceAudience.Private
45 public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable {
46   private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
47 
48   private final ReplicationPeerConfig peerConfig;
49   private final String id;
50   private volatile PeerState peerState;
51   private volatile Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
52   private final Configuration conf;
53 
54   private PeerStateTracker peerStateTracker;
55   private TableCFsTracker tableCFsTracker;
56 
57   /**
58    * Constructor that takes all the objects required to communicate with the specified peer, except
59    * for the region server addresses.
60    * @param conf configuration object to this peer
61    * @param id string representation of this peer's identifier
62    * @param peerConfig configuration for the replication peer
63    */
ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)64   public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)
65       throws ReplicationException {
66     this.conf = conf;
67     this.peerConfig = peerConfig;
68     this.id = id;
69   }
70 
71   /**
72    * Constructor that takes all the objects required to communicate with the specified peer, except
73    * for the region server addresses.
74    * @param conf configuration object to this peer
75    * @param id string representation of this peer's identifier
76    * @param peerConfig configuration for the replication peer
77    * @param tableCFs table-cf configuration for this peer
78    */
ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig, Map<TableName, List<String>> tableCFs)79   public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig,
80       Map<TableName, List<String>> tableCFs) throws ReplicationException {
81     this.conf = conf;
82     this.peerConfig = peerConfig;
83     this.id = id;
84     this.tableCFs = tableCFs;
85   }
86 
87   /**
88    * start a state tracker to check whether this peer is enabled or not
89    *
90    * @param zookeeper zk watcher for the local cluster
91    * @param peerStateNode path to zk node which stores peer state
92    * @throws KeeperException
93    */
startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)94   public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
95       throws KeeperException {
96     ensurePeerEnabled(zookeeper, peerStateNode);
97     this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
98     this.peerStateTracker.start();
99     try {
100       this.readPeerStateZnode();
101     } catch (DeserializationException e) {
102       throw ZKUtil.convert(e);
103     }
104   }
105 
readPeerStateZnode()106   private void readPeerStateZnode() throws DeserializationException {
107     this.peerState =
108         isStateEnabled(this.peerStateTracker.getData(false))
109           ? PeerState.ENABLED
110           : PeerState.DISABLED;
111   }
112 
113   /**
114    * start a table-cfs tracker to listen the (table, cf-list) map change
115    *
116    * @param zookeeper zk watcher for the local cluster
117    * @param tableCFsNode path to zk node which stores table-cfs
118    * @throws KeeperException
119    */
startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)120   public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
121     throws KeeperException {
122     this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
123         this);
124     this.tableCFsTracker.start();
125     this.readTableCFsZnode();
126   }
127 
readTableCFsZnode()128   private void readTableCFsZnode() {
129     String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
130     this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs);
131   }
132 
133   @Override
getPeerState()134   public PeerState getPeerState() {
135     return peerState;
136   }
137 
138   /**
139    * Get the identifier of this peer
140    * @return string representation of the id (short)
141    */
142   @Override
getId()143   public String getId() {
144     return id;
145   }
146 
147   /**
148    * Get the peer config object
149    * @return the ReplicationPeerConfig for this peer
150    */
151   @Override
getPeerConfig()152   public ReplicationPeerConfig getPeerConfig() {
153     return peerConfig;
154   }
155 
156   /**
157    * Get the configuration object required to communicate with this peer
158    * @return configuration object
159    */
160   @Override
getConfiguration()161   public Configuration getConfiguration() {
162     return conf;
163   }
164 
165   /**
166    * Get replicable (table, cf-list) map of this peer
167    * @return the replicable (table, cf-list) map
168    */
169   @Override
getTableCFs()170   public Map<TableName, List<String>> getTableCFs() {
171     return this.tableCFs;
172   }
173 
174   @Override
abort(String why, Throwable e)175   public void abort(String why, Throwable e) {
176     LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig
177         + " was aborted for the following reason(s):" + why, e);
178   }
179 
180   @Override
isAborted()181   public boolean isAborted() {
182     // Currently the replication peer is never "Aborted", we just log when the
183     // abort method is called.
184     return false;
185   }
186 
187   @Override
close()188   public void close() throws IOException {
189     // TODO: stop zkw?
190   }
191 
192   /**
193    * Parse the raw data from ZK to get a peer's state
194    * @param bytes raw ZK data
195    * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
196    * @throws DeserializationException
197    */
isStateEnabled(final byte[] bytes)198   public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
199     ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
200     return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
201   }
202 
203   /**
204    * @param bytes Content of a state znode.
205    * @return State parsed from the passed bytes.
206    * @throws DeserializationException
207    */
parseStateFrom(final byte[] bytes)208   private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
209       throws DeserializationException {
210     ProtobufUtil.expectPBMagicPrefix(bytes);
211     int pblen = ProtobufUtil.lengthOfPBMagic();
212     ZooKeeperProtos.ReplicationState.Builder builder =
213         ZooKeeperProtos.ReplicationState.newBuilder();
214     ZooKeeperProtos.ReplicationState state;
215     try {
216       ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
217       state = builder.build();
218       return state.getState();
219     } catch (IOException e) {
220       throw new DeserializationException(e);
221     }
222   }
223 
224   /**
225    * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
226    * @param zookeeper
227    * @param path Path to znode to check
228    * @return True if we created the znode.
229    * @throws NodeExistsException
230    * @throws KeeperException
231    */
ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)232   private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
233       throws NodeExistsException, KeeperException {
234     if (ZKUtil.checkExists(zookeeper, path) == -1) {
235       // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
236       // peer-state znode. This happens while adding a peer.
237       // The peer state data is set as "ENABLED" by default.
238       ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
239         ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
240       return true;
241     }
242     return false;
243   }
244 
245   /**
246    * Tracker for state of this peer
247    */
248   public class PeerStateTracker extends ZooKeeperNodeTracker {
249 
PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher, Abortable abortable)250     public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
251         Abortable abortable) {
252       super(watcher, peerStateZNode, abortable);
253     }
254 
255     @Override
nodeDataChanged(String path)256     public synchronized void nodeDataChanged(String path) {
257       if (path.equals(node)) {
258         super.nodeDataChanged(path);
259         try {
260           readPeerStateZnode();
261         } catch (DeserializationException e) {
262           LOG.warn("Failed deserializing the content of " + path, e);
263         }
264       }
265     }
266   }
267 
268   /**
269    * Tracker for (table, cf-list) map of this peer
270    */
271   public class TableCFsTracker extends ZooKeeperNodeTracker {
272 
TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher, Abortable abortable)273     public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
274         Abortable abortable) {
275       super(watcher, tableCFsZNode, abortable);
276     }
277 
278     @Override
nodeCreated(String path)279     public synchronized void nodeCreated(String path) {
280       if (path.equals(node)) {
281         super.nodeCreated(path);
282         readTableCFsZnode();
283       }
284     }
285 
286     @Override
nodeDataChanged(String path)287     public synchronized void nodeDataChanged(String path) {
288       if (path.equals(node)) {
289         super.nodeDataChanged(path);
290       }
291     }
292   }
293 }
294