1 /* 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase.replication; 20 21 import java.io.Closeable; 22 import java.io.IOException; 23 import java.util.HashMap; 24 import java.util.List; 25 import java.util.Map; 26 27 import org.apache.commons.logging.Log; 28 import org.apache.commons.logging.LogFactory; 29 import org.apache.hadoop.hbase.classification.InterfaceAudience; 30 import org.apache.hadoop.conf.Configuration; 31 import org.apache.hadoop.hbase.Abortable; 32 import org.apache.hadoop.hbase.TableName; 33 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; 34 import org.apache.hadoop.hbase.exceptions.DeserializationException; 35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 36 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; 37 import org.apache.hadoop.hbase.util.Bytes; 38 import org.apache.hadoop.hbase.zookeeper.ZKUtil; 39 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; 40 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; 41 import org.apache.zookeeper.KeeperException; 42 import org.apache.zookeeper.KeeperException.NodeExistsException; 43 44 @InterfaceAudience.Private 45 public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable { 46 private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class); 47 48 private final ReplicationPeerConfig peerConfig; 49 private final String id; 50 private volatile PeerState peerState; 51 private volatile Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>(); 52 private final Configuration conf; 53 54 private PeerStateTracker peerStateTracker; 55 private TableCFsTracker tableCFsTracker; 56 57 /** 58 * Constructor that takes all the objects required to communicate with the specified peer, except 59 * for the region server addresses. 60 * @param conf configuration object to this peer 61 * @param id string representation of this peer's identifier 62 * @param peerConfig configuration for the replication peer 63 */ ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)64 public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig) 65 throws ReplicationException { 66 this.conf = conf; 67 this.peerConfig = peerConfig; 68 this.id = id; 69 } 70 71 /** 72 * Constructor that takes all the objects required to communicate with the specified peer, except 73 * for the region server addresses. 74 * @param conf configuration object to this peer 75 * @param id string representation of this peer's identifier 76 * @param peerConfig configuration for the replication peer 77 * @param tableCFs table-cf configuration for this peer 78 */ ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig, Map<TableName, List<String>> tableCFs)79 public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig, 80 Map<TableName, List<String>> tableCFs) throws ReplicationException { 81 this.conf = conf; 82 this.peerConfig = peerConfig; 83 this.id = id; 84 this.tableCFs = tableCFs; 85 } 86 87 /** 88 * start a state tracker to check whether this peer is enabled or not 89 * 90 * @param zookeeper zk watcher for the local cluster 91 * @param peerStateNode path to zk node which stores peer state 92 * @throws KeeperException 93 */ startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)94 public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode) 95 throws KeeperException { 96 ensurePeerEnabled(zookeeper, peerStateNode); 97 this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this); 98 this.peerStateTracker.start(); 99 try { 100 this.readPeerStateZnode(); 101 } catch (DeserializationException e) { 102 throw ZKUtil.convert(e); 103 } 104 } 105 readPeerStateZnode()106 private void readPeerStateZnode() throws DeserializationException { 107 this.peerState = 108 isStateEnabled(this.peerStateTracker.getData(false)) 109 ? PeerState.ENABLED 110 : PeerState.DISABLED; 111 } 112 113 /** 114 * start a table-cfs tracker to listen the (table, cf-list) map change 115 * 116 * @param zookeeper zk watcher for the local cluster 117 * @param tableCFsNode path to zk node which stores table-cfs 118 * @throws KeeperException 119 */ startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)120 public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode) 121 throws KeeperException { 122 this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper, 123 this); 124 this.tableCFsTracker.start(); 125 this.readTableCFsZnode(); 126 } 127 readTableCFsZnode()128 private void readTableCFsZnode() { 129 String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false)); 130 this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs); 131 } 132 133 @Override getPeerState()134 public PeerState getPeerState() { 135 return peerState; 136 } 137 138 /** 139 * Get the identifier of this peer 140 * @return string representation of the id (short) 141 */ 142 @Override getId()143 public String getId() { 144 return id; 145 } 146 147 /** 148 * Get the peer config object 149 * @return the ReplicationPeerConfig for this peer 150 */ 151 @Override getPeerConfig()152 public ReplicationPeerConfig getPeerConfig() { 153 return peerConfig; 154 } 155 156 /** 157 * Get the configuration object required to communicate with this peer 158 * @return configuration object 159 */ 160 @Override getConfiguration()161 public Configuration getConfiguration() { 162 return conf; 163 } 164 165 /** 166 * Get replicable (table, cf-list) map of this peer 167 * @return the replicable (table, cf-list) map 168 */ 169 @Override getTableCFs()170 public Map<TableName, List<String>> getTableCFs() { 171 return this.tableCFs; 172 } 173 174 @Override abort(String why, Throwable e)175 public void abort(String why, Throwable e) { 176 LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig 177 + " was aborted for the following reason(s):" + why, e); 178 } 179 180 @Override isAborted()181 public boolean isAborted() { 182 // Currently the replication peer is never "Aborted", we just log when the 183 // abort method is called. 184 return false; 185 } 186 187 @Override close()188 public void close() throws IOException { 189 // TODO: stop zkw? 190 } 191 192 /** 193 * Parse the raw data from ZK to get a peer's state 194 * @param bytes raw ZK data 195 * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state. 196 * @throws DeserializationException 197 */ isStateEnabled(final byte[] bytes)198 public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException { 199 ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes); 200 return ZooKeeperProtos.ReplicationState.State.ENABLED == state; 201 } 202 203 /** 204 * @param bytes Content of a state znode. 205 * @return State parsed from the passed bytes. 206 * @throws DeserializationException 207 */ parseStateFrom(final byte[] bytes)208 private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes) 209 throws DeserializationException { 210 ProtobufUtil.expectPBMagicPrefix(bytes); 211 int pblen = ProtobufUtil.lengthOfPBMagic(); 212 ZooKeeperProtos.ReplicationState.Builder builder = 213 ZooKeeperProtos.ReplicationState.newBuilder(); 214 ZooKeeperProtos.ReplicationState state; 215 try { 216 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 217 state = builder.build(); 218 return state.getState(); 219 } catch (IOException e) { 220 throw new DeserializationException(e); 221 } 222 } 223 224 /** 225 * Utility method to ensure an ENABLED znode is in place; if not present, we create it. 226 * @param zookeeper 227 * @param path Path to znode to check 228 * @return True if we created the znode. 229 * @throws NodeExistsException 230 * @throws KeeperException 231 */ ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)232 private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path) 233 throws NodeExistsException, KeeperException { 234 if (ZKUtil.checkExists(zookeeper, path) == -1) { 235 // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the 236 // peer-state znode. This happens while adding a peer. 237 // The peer state data is set as "ENABLED" by default. 238 ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, 239 ReplicationStateZKBase.ENABLED_ZNODE_BYTES); 240 return true; 241 } 242 return false; 243 } 244 245 /** 246 * Tracker for state of this peer 247 */ 248 public class PeerStateTracker extends ZooKeeperNodeTracker { 249 PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher, Abortable abortable)250 public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher, 251 Abortable abortable) { 252 super(watcher, peerStateZNode, abortable); 253 } 254 255 @Override nodeDataChanged(String path)256 public synchronized void nodeDataChanged(String path) { 257 if (path.equals(node)) { 258 super.nodeDataChanged(path); 259 try { 260 readPeerStateZnode(); 261 } catch (DeserializationException e) { 262 LOG.warn("Failed deserializing the content of " + path, e); 263 } 264 } 265 } 266 } 267 268 /** 269 * Tracker for (table, cf-list) map of this peer 270 */ 271 public class TableCFsTracker extends ZooKeeperNodeTracker { 272 TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher, Abortable abortable)273 public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher, 274 Abortable abortable) { 275 super(watcher, tableCFsZNode, abortable); 276 } 277 278 @Override nodeCreated(String path)279 public synchronized void nodeCreated(String path) { 280 if (path.equals(node)) { 281 super.nodeCreated(path); 282 readTableCFsZnode(); 283 } 284 } 285 286 @Override nodeDataChanged(String path)287 public synchronized void nodeDataChanged(String path) { 288 if (path.equals(node)) { 289 super.nodeDataChanged(path); 290 } 291 } 292 } 293 } 294