1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.contrib.bkjournal; 19 20 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 21 import org.apache.hadoop.hdfs.server.common.Storage; 22 import org.apache.hadoop.hdfs.server.common.StorageInfo; 23 import org.apache.hadoop.hdfs.server.namenode.JournalManager; 24 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; 25 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; 26 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; 27 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; 28 import org.apache.hadoop.conf.Configuration; 29 30 import org.apache.bookkeeper.conf.ClientConfiguration; 31 import org.apache.bookkeeper.client.BKException; 32 import org.apache.bookkeeper.client.BookKeeper; 33 import org.apache.bookkeeper.client.LedgerHandle; 34 import org.apache.bookkeeper.util.ZkUtils; 35 36 import org.apache.zookeeper.data.Stat; 37 import org.apache.zookeeper.ZooKeeper; 38 import org.apache.zookeeper.Watcher; 39 import org.apache.zookeeper.WatchedEvent; 40 import org.apache.zookeeper.KeeperException; 41 import org.apache.zookeeper.CreateMode; 42 import org.apache.zookeeper.ZooDefs.Ids; 43 import org.apache.zookeeper.AsyncCallback.StringCallback; 44 import org.apache.zookeeper.ZKUtil; 45 46 import java.util.Collection; 47 import java.util.Collections; 48 import java.util.ArrayList; 49 import java.util.List; 50 import java.util.concurrent.CountDownLatch; 51 import java.util.concurrent.TimeUnit; 52 import java.util.concurrent.atomic.AtomicBoolean; 53 import java.io.IOException; 54 55 import java.net.URI; 56 57 import org.apache.hadoop.hdfs.protocolPB.PBHelper; 58 import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.VersionProto; 59 import com.google.protobuf.TextFormat; 60 import static com.google.common.base.Charsets.UTF_8; 61 62 import org.apache.commons.io.Charsets; 63 import org.apache.commons.logging.Log; 64 import org.apache.commons.logging.LogFactory; 65 import com.google.common.annotations.VisibleForTesting; 66 /** 67 * BookKeeper Journal Manager 68 * 69 * To use, add the following to hdfs-site.xml. 70 * <pre> 71 * {@code 72 * <property> 73 * <name>dfs.namenode.edits.dir</name> 74 * <value>bookkeeper://zk1:2181;zk2:2181;zk3:2181/hdfsjournal</value> 75 * </property> 76 * 77 * <property> 78 * <name>dfs.namenode.edits.journal-plugin.bookkeeper</name> 79 * <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value> 80 * </property> 81 * } 82 * </pre> 83 * The URI format for bookkeeper is bookkeeper://[zkEnsemble]/[rootZnode] 84 * [zookkeeper ensemble] is a list of semi-colon separated, zookeeper host:port 85 * pairs. In the example above there are 3 servers, in the ensemble, 86 * zk1, zk2 & zk3, each one listening on port 2181. 87 * 88 * [root znode] is the path of the zookeeper znode, under which the editlog 89 * information will be stored. 90 * 91 * Other configuration options are: 92 * <ul> 93 * <li><b>dfs.namenode.bookkeeperjournal.output-buffer-size</b> 94 * Number of bytes a bookkeeper journal stream will buffer before 95 * forcing a flush. Default is 1024.</li> 96 * <li><b>dfs.namenode.bookkeeperjournal.ensemble-size</b> 97 * Number of bookkeeper servers in edit log ledger ensembles. This 98 * is the number of bookkeeper servers which need to be available 99 * for the ledger to be writable. Default is 3.</li> 100 * <li><b>dfs.namenode.bookkeeperjournal.quorum-size</b> 101 * Number of bookkeeper servers in the write quorum. This is the 102 * number of bookkeeper servers which must have acknowledged the 103 * write of an entry before it is considered written. 104 * Default is 2.</li> 105 * <li><b>dfs.namenode.bookkeeperjournal.digestPw</b> 106 * Password to use when creating ledgers. </li> 107 * <li><b>dfs.namenode.bookkeeperjournal.zk.session.timeout</b> 108 * Session timeout for Zookeeper client from BookKeeper Journal Manager. 109 * Hadoop recommends that, this value should be less than the ZKFC 110 * session timeout value. Default value is 3000.</li> 111 * </ul> 112 */ 113 public class BookKeeperJournalManager implements JournalManager { 114 static final Log LOG = LogFactory.getLog(BookKeeperJournalManager.class); 115 116 public static final String BKJM_OUTPUT_BUFFER_SIZE 117 = "dfs.namenode.bookkeeperjournal.output-buffer-size"; 118 public static final int BKJM_OUTPUT_BUFFER_SIZE_DEFAULT = 1024; 119 120 public static final String BKJM_BOOKKEEPER_ENSEMBLE_SIZE 121 = "dfs.namenode.bookkeeperjournal.ensemble-size"; 122 public static final int BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3; 123 124 public static final String BKJM_BOOKKEEPER_QUORUM_SIZE 125 = "dfs.namenode.bookkeeperjournal.quorum-size"; 126 public static final int BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT = 2; 127 128 public static final String BKJM_BOOKKEEPER_DIGEST_PW 129 = "dfs.namenode.bookkeeperjournal.digestPw"; 130 public static final String BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT = ""; 131 132 private static final int BKJM_LAYOUT_VERSION = -1; 133 134 public static final String BKJM_ZK_SESSION_TIMEOUT 135 = "dfs.namenode.bookkeeperjournal.zk.session.timeout"; 136 public static final int BKJM_ZK_SESSION_TIMEOUT_DEFAULT = 3000; 137 138 private static final String BKJM_EDIT_INPROGRESS = "inprogress_"; 139 140 public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH 141 = "dfs.namenode.bookkeeperjournal.zk.availablebookies"; 142 143 public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT 144 = "/ledgers/available"; 145 146 public static final String BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS 147 = "dfs.namenode.bookkeeperjournal.speculativeReadTimeoutMs"; 148 public static final int BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT 149 = 2000; 150 151 public static final String BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC 152 = "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec"; 153 public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5; 154 155 public static final String BKJM_BOOKKEEPER_ACK_QUORUM_SIZE 156 = "dfs.namenode.bookkeeperjournal.ack.quorum-size"; 157 158 public static final String BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC 159 = "dfs.namenode.bookkeeperjournal.addEntryTimeoutSec"; 160 public static final int BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT = 5; 161 162 private ZooKeeper zkc; 163 private final Configuration conf; 164 private final BookKeeper bkc; 165 private final CurrentInprogress ci; 166 private final String basePath; 167 private final String ledgerPath; 168 private final String versionPath; 169 private final MaxTxId maxTxId; 170 private final int ensembleSize; 171 private final int quorumSize; 172 private final int ackQuorumSize; 173 private final int addEntryTimeout; 174 private final String digestpw; 175 private final int speculativeReadTimeout; 176 private final int readEntryTimeout; 177 private final CountDownLatch zkConnectLatch; 178 private final NamespaceInfo nsInfo; 179 private boolean initialized = false; 180 private LedgerHandle currentLedger = null; 181 182 /** 183 * Construct a Bookkeeper journal manager. 184 */ BookKeeperJournalManager(Configuration conf, URI uri, NamespaceInfo nsInfo)185 public BookKeeperJournalManager(Configuration conf, URI uri, 186 NamespaceInfo nsInfo) throws IOException { 187 this.conf = conf; 188 this.nsInfo = nsInfo; 189 190 String zkConnect = uri.getAuthority().replace(";", ","); 191 basePath = uri.getPath(); 192 ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE, 193 BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT); 194 quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE, 195 BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT); 196 ackQuorumSize = conf.getInt(BKJM_BOOKKEEPER_ACK_QUORUM_SIZE, quorumSize); 197 addEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC, 198 BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT); 199 speculativeReadTimeout = conf.getInt( 200 BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS, 201 BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT); 202 readEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC, 203 BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT); 204 205 ledgerPath = basePath + "/ledgers"; 206 String maxTxIdPath = basePath + "/maxtxid"; 207 String currentInprogressNodePath = basePath + "/CurrentInprogress"; 208 versionPath = basePath + "/version"; 209 digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW, 210 BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT); 211 212 try { 213 zkConnectLatch = new CountDownLatch(1); 214 int bkjmZKSessionTimeout = conf.getInt(BKJM_ZK_SESSION_TIMEOUT, 215 BKJM_ZK_SESSION_TIMEOUT_DEFAULT); 216 zkc = new ZooKeeper(zkConnect, bkjmZKSessionTimeout, 217 new ZkConnectionWatcher()); 218 // Configured zk session timeout + some extra grace period (here 219 // BKJM_ZK_SESSION_TIMEOUT_DEFAULT used as grace period) 220 int zkConnectionLatchTimeout = bkjmZKSessionTimeout 221 + BKJM_ZK_SESSION_TIMEOUT_DEFAULT; 222 if (!zkConnectLatch 223 .await(zkConnectionLatchTimeout, TimeUnit.MILLISECONDS)) { 224 throw new IOException("Error connecting to zookeeper"); 225 } 226 227 prepareBookKeeperEnv(); 228 ClientConfiguration clientConf = new ClientConfiguration(); 229 clientConf.setSpeculativeReadTimeout(speculativeReadTimeout); 230 clientConf.setReadEntryTimeout(readEntryTimeout); 231 clientConf.setAddEntryTimeout(addEntryTimeout); 232 bkc = new BookKeeper(clientConf, zkc); 233 } catch (KeeperException e) { 234 throw new IOException("Error initializing zk", e); 235 } catch (InterruptedException ie) { 236 Thread.currentThread().interrupt(); 237 throw new IOException("Interrupted while initializing bk journal manager", 238 ie); 239 } 240 241 ci = new CurrentInprogress(zkc, currentInprogressNodePath); 242 maxTxId = new MaxTxId(zkc, maxTxIdPath); 243 } 244 245 /** 246 * Pre-creating bookkeeper metadata path in zookeeper. 247 */ prepareBookKeeperEnv()248 private void prepareBookKeeperEnv() throws IOException { 249 // create bookie available path in zookeeper if it doesn't exists 250 final String zkAvailablePath = conf.get(BKJM_ZK_LEDGERS_AVAILABLE_PATH, 251 BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT); 252 final CountDownLatch zkPathLatch = new CountDownLatch(1); 253 254 final AtomicBoolean success = new AtomicBoolean(false); 255 StringCallback callback = new StringCallback() { 256 @Override 257 public void processResult(int rc, String path, Object ctx, String name) { 258 if (KeeperException.Code.OK.intValue() == rc 259 || KeeperException.Code.NODEEXISTS.intValue() == rc) { 260 LOG.info("Successfully created bookie available path : " 261 + zkAvailablePath); 262 success.set(true); 263 } else { 264 KeeperException.Code code = KeeperException.Code.get(rc); 265 LOG.error("Error : " 266 + KeeperException.create(code, path).getMessage() 267 + ", failed to create bookie available path : " 268 + zkAvailablePath); 269 } 270 zkPathLatch.countDown(); 271 } 272 }; 273 ZkUtils.asyncCreateFullPathOptimistic(zkc, zkAvailablePath, new byte[0], 274 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null); 275 276 try { 277 if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS) 278 || !success.get()) { 279 throw new IOException("Couldn't create bookie available path :" 280 + zkAvailablePath + ", timed out " + zkc.getSessionTimeout() 281 + " millis"); 282 } 283 } catch (InterruptedException e) { 284 Thread.currentThread().interrupt(); 285 throw new IOException( 286 "Interrupted when creating the bookie available path : " 287 + zkAvailablePath, e); 288 } 289 } 290 291 @Override format(NamespaceInfo ns)292 public void format(NamespaceInfo ns) throws IOException { 293 try { 294 // delete old info 295 Stat baseStat = null; 296 Stat ledgerStat = null; 297 if ((baseStat = zkc.exists(basePath, false)) != null) { 298 if ((ledgerStat = zkc.exists(ledgerPath, false)) != null) { 299 for (EditLogLedgerMetadata l : getLedgerList(true)) { 300 try { 301 bkc.deleteLedger(l.getLedgerId()); 302 } catch (BKException.BKNoSuchLedgerExistsException bke) { 303 LOG.warn("Ledger " + l.getLedgerId() + " does not exist;" 304 + " Cannot delete."); 305 } 306 } 307 } 308 ZKUtil.deleteRecursive(zkc, basePath); 309 } 310 311 // should be clean now. 312 zkc.create(basePath, new byte[] {'0'}, 313 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 314 315 VersionProto.Builder builder = VersionProto.newBuilder(); 316 builder.setNamespaceInfo(PBHelper.convert(ns)) 317 .setLayoutVersion(BKJM_LAYOUT_VERSION); 318 319 byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8); 320 zkc.create(versionPath, data, 321 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 322 323 zkc.create(ledgerPath, new byte[] {'0'}, 324 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 325 } catch (KeeperException ke) { 326 LOG.error("Error accessing zookeeper to format", ke); 327 throw new IOException("Error accessing zookeeper to format", ke); 328 } catch (InterruptedException ie) { 329 Thread.currentThread().interrupt(); 330 throw new IOException("Interrupted during format", ie); 331 } catch (BKException bke) { 332 throw new IOException("Error cleaning up ledgers during format", bke); 333 } 334 } 335 336 @Override hasSomeData()337 public boolean hasSomeData() throws IOException { 338 try { 339 return zkc.exists(basePath, false) != null; 340 } catch (KeeperException ke) { 341 throw new IOException("Couldn't contact zookeeper", ke); 342 } catch (InterruptedException ie) { 343 Thread.currentThread().interrupt(); 344 throw new IOException("Interrupted while checking for data", ie); 345 } 346 } 347 checkEnv()348 synchronized private void checkEnv() throws IOException { 349 if (!initialized) { 350 try { 351 Stat versionStat = zkc.exists(versionPath, false); 352 if (versionStat == null) { 353 throw new IOException("Environment not initialized. " 354 +"Have you forgotten to format?"); 355 } 356 byte[] d = zkc.getData(versionPath, false, versionStat); 357 358 VersionProto.Builder builder = VersionProto.newBuilder(); 359 TextFormat.merge(new String(d, UTF_8), builder); 360 if (!builder.isInitialized()) { 361 throw new IOException("Invalid/Incomplete data in znode"); 362 } 363 VersionProto vp = builder.build(); 364 365 // There's only one version at the moment 366 assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION; 367 368 NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo()); 369 370 if (nsInfo.getNamespaceID() != readns.getNamespaceID() || 371 !nsInfo.clusterID.equals(readns.getClusterID()) || 372 !nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) { 373 String err = String.format("Environment mismatch. Running process %s" 374 +", stored in ZK %s", nsInfo, readns); 375 LOG.error(err); 376 throw new IOException(err); 377 } 378 379 ci.init(); 380 initialized = true; 381 } catch (KeeperException ke) { 382 throw new IOException("Cannot access ZooKeeper", ke); 383 } catch (InterruptedException ie) { 384 Thread.currentThread().interrupt(); 385 throw new IOException("Interrupted while checking environment", ie); 386 } 387 } 388 } 389 390 /** 391 * Start a new log segment in a BookKeeper ledger. 392 * First ensure that we have the write lock for this journal. 393 * Then create a ledger and stream based on that ledger. 394 * The ledger id is written to the inprogress znode, so that in the 395 * case of a crash, a recovery process can find the ledger we were writing 396 * to when we crashed. 397 * @param txId First transaction id to be written to the stream 398 */ 399 @Override startLogSegment(long txId, int layoutVersion)400 public EditLogOutputStream startLogSegment(long txId, int layoutVersion) 401 throws IOException { 402 checkEnv(); 403 404 if (txId <= maxTxId.get()) { 405 throw new IOException("We've already seen " + txId 406 + ". A new stream cannot be created with it"); 407 } 408 409 try { 410 String existingInprogressNode = ci.read(); 411 if (null != existingInprogressNode 412 && zkc.exists(existingInprogressNode, false) != null) { 413 throw new IOException("Inprogress node already exists"); 414 } 415 if (currentLedger != null) { 416 // bookkeeper errored on last stream, clean up ledger 417 currentLedger.close(); 418 } 419 currentLedger = bkc.createLedger(ensembleSize, quorumSize, ackQuorumSize, 420 BookKeeper.DigestType.MAC, 421 digestpw.getBytes(Charsets.UTF_8)); 422 } catch (BKException bke) { 423 throw new IOException("Error creating ledger", bke); 424 } catch (KeeperException ke) { 425 throw new IOException("Error in zookeeper while creating ledger", ke); 426 } catch (InterruptedException ie) { 427 Thread.currentThread().interrupt(); 428 throw new IOException("Interrupted creating ledger", ie); 429 } 430 431 try { 432 String znodePath = inprogressZNode(txId); 433 EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath, 434 layoutVersion, currentLedger.getId(), txId); 435 /* Write the ledger metadata out to the inprogress ledger znode 436 * This can fail if for some reason our write lock has 437 * expired (@see WriteLock) and another process has managed to 438 * create the inprogress znode. 439 * In this case, throw an exception. We don't want to continue 440 * as this would lead to a split brain situation. 441 */ 442 l.write(zkc, znodePath); 443 444 maxTxId.store(txId); 445 ci.update(znodePath); 446 return new BookKeeperEditLogOutputStream(conf, currentLedger); 447 } catch (KeeperException ke) { 448 cleanupLedger(currentLedger); 449 throw new IOException("Error storing ledger metadata", ke); 450 } 451 } 452 cleanupLedger(LedgerHandle lh)453 private void cleanupLedger(LedgerHandle lh) { 454 try { 455 long id = currentLedger.getId(); 456 currentLedger.close(); 457 bkc.deleteLedger(id); 458 } catch (BKException bke) { 459 //log & ignore, an IOException will be thrown soon 460 LOG.error("Error closing ledger", bke); 461 } catch (InterruptedException ie) { 462 Thread.currentThread().interrupt(); 463 LOG.warn("Interrupted while closing ledger", ie); 464 } 465 } 466 467 468 469 /** 470 * Finalize a log segment. If the journal manager is currently 471 * writing to a ledger, ensure that this is the ledger of the log segment 472 * being finalized. 473 * 474 * Otherwise this is the recovery case. In the recovery case, ensure that 475 * the firstTxId of the ledger matches firstTxId for the segment we are 476 * trying to finalize. 477 */ 478 @Override finalizeLogSegment(long firstTxId, long lastTxId)479 public void finalizeLogSegment(long firstTxId, long lastTxId) 480 throws IOException { 481 checkEnv(); 482 483 String inprogressPath = inprogressZNode(firstTxId); 484 try { 485 Stat inprogressStat = zkc.exists(inprogressPath, false); 486 if (inprogressStat == null) { 487 throw new IOException("Inprogress znode " + inprogressPath 488 + " doesn't exist"); 489 } 490 491 EditLogLedgerMetadata l 492 = EditLogLedgerMetadata.read(zkc, inprogressPath); 493 494 if (currentLedger != null) { // normal, non-recovery case 495 if (l.getLedgerId() == currentLedger.getId()) { 496 try { 497 currentLedger.close(); 498 } catch (BKException bke) { 499 LOG.error("Error closing current ledger", bke); 500 } 501 currentLedger = null; 502 } else { 503 throw new IOException( 504 "Active ledger has different ID to inprogress. " 505 + l.getLedgerId() + " found, " 506 + currentLedger.getId() + " expected"); 507 } 508 } 509 510 if (l.getFirstTxId() != firstTxId) { 511 throw new IOException("Transaction id not as expected, " 512 + l.getFirstTxId() + " found, " + firstTxId + " expected"); 513 } 514 515 l.finalizeLedger(lastTxId); 516 String finalisedPath = finalizedLedgerZNode(firstTxId, lastTxId); 517 try { 518 l.write(zkc, finalisedPath); 519 } catch (KeeperException.NodeExistsException nee) { 520 if (!l.verify(zkc, finalisedPath)) { 521 throw new IOException("Node " + finalisedPath + " already exists" 522 + " but data doesn't match"); 523 } 524 } 525 maxTxId.store(lastTxId); 526 zkc.delete(inprogressPath, inprogressStat.getVersion()); 527 String inprogressPathFromCI = ci.read(); 528 if (inprogressPath.equals(inprogressPathFromCI)) { 529 ci.clear(); 530 } 531 } catch (KeeperException e) { 532 throw new IOException("Error finalising ledger", e); 533 } catch (InterruptedException ie) { 534 Thread.currentThread().interrupt(); 535 throw new IOException("Error finalising ledger", ie); 536 } 537 } 538 539 @Override selectInputStreams(Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk)540 public void selectInputStreams(Collection<EditLogInputStream> streams, 541 long fromTxId, boolean inProgressOk) 542 throws IOException { 543 List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId, 544 inProgressOk); 545 try { 546 BookKeeperEditLogInputStream elis = null; 547 for (EditLogLedgerMetadata l : currentLedgerList) { 548 long lastTxId = l.getLastTxId(); 549 if (l.isInProgress()) { 550 lastTxId = recoverLastTxId(l, false); 551 } 552 // Check once again, required in case of InProgress and is case of any 553 // gap. 554 if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) { 555 LedgerHandle h; 556 if (l.isInProgress()) { // we don't want to fence the current journal 557 h = bkc.openLedgerNoRecovery(l.getLedgerId(), 558 BookKeeper.DigestType.MAC, digestpw.getBytes(Charsets.UTF_8)); 559 } else { 560 h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC, 561 digestpw.getBytes(Charsets.UTF_8)); 562 } 563 elis = new BookKeeperEditLogInputStream(h, l); 564 elis.skipTo(fromTxId); 565 } else { 566 // If mismatches then there might be some gap, so we should not check 567 // further. 568 return; 569 } 570 streams.add(elis); 571 if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) { 572 return; 573 } 574 fromTxId = elis.getLastTxId() + 1; 575 } 576 } catch (BKException e) { 577 throw new IOException("Could not open ledger for " + fromTxId, e); 578 } catch (InterruptedException ie) { 579 Thread.currentThread().interrupt(); 580 throw new IOException("Interrupted opening ledger for " + fromTxId, ie); 581 } 582 } 583 getNumberOfTransactions(long fromTxId, boolean inProgressOk)584 long getNumberOfTransactions(long fromTxId, boolean inProgressOk) 585 throws IOException { 586 long count = 0; 587 long expectedStart = 0; 588 for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) { 589 long lastTxId = l.getLastTxId(); 590 if (l.isInProgress()) { 591 lastTxId = recoverLastTxId(l, false); 592 if (lastTxId == HdfsConstants.INVALID_TXID) { 593 break; 594 } 595 } 596 597 assert lastTxId >= l.getFirstTxId(); 598 599 if (lastTxId < fromTxId) { 600 continue; 601 } else if (l.getFirstTxId() <= fromTxId && lastTxId >= fromTxId) { 602 // we can start in the middle of a segment 603 count = (lastTxId - l.getFirstTxId()) + 1; 604 expectedStart = lastTxId + 1; 605 } else { 606 if (expectedStart != l.getFirstTxId()) { 607 if (count == 0) { 608 throw new CorruptionException("StartTxId " + l.getFirstTxId() 609 + " is not as expected " + expectedStart 610 + ". Gap in transaction log?"); 611 } else { 612 break; 613 } 614 } 615 count += (lastTxId - l.getFirstTxId()) + 1; 616 expectedStart = lastTxId + 1; 617 } 618 } 619 return count; 620 } 621 622 @Override recoverUnfinalizedSegments()623 public void recoverUnfinalizedSegments() throws IOException { 624 checkEnv(); 625 626 synchronized (this) { 627 try { 628 List<String> children = zkc.getChildren(ledgerPath, false); 629 for (String child : children) { 630 if (!child.startsWith(BKJM_EDIT_INPROGRESS)) { 631 continue; 632 } 633 String znode = ledgerPath + "/" + child; 634 EditLogLedgerMetadata l = EditLogLedgerMetadata.read(zkc, znode); 635 try { 636 long endTxId = recoverLastTxId(l, true); 637 if (endTxId == HdfsConstants.INVALID_TXID) { 638 LOG.error("Unrecoverable corruption has occurred in segment " 639 + l.toString() + " at path " + znode 640 + ". Unable to continue recovery."); 641 throw new IOException("Unrecoverable corruption," 642 + " please check logs."); 643 } 644 finalizeLogSegment(l.getFirstTxId(), endTxId); 645 } catch (SegmentEmptyException see) { 646 LOG.warn("Inprogress znode " + child 647 + " refers to a ledger which is empty. This occurs when the NN" 648 + " crashes after opening a segment, but before writing the" 649 + " OP_START_LOG_SEGMENT op. It is safe to delete." 650 + " MetaData [" + l.toString() + "]"); 651 652 // If the max seen transaction is the same as what would 653 // have been the first transaction of the failed ledger, 654 // decrement it, as that transaction never happened and as 655 // such, is _not_ the last seen 656 if (maxTxId.get() == l.getFirstTxId()) { 657 maxTxId.reset(maxTxId.get() - 1); 658 } 659 660 zkc.delete(znode, -1); 661 } 662 } 663 } catch (KeeperException.NoNodeException nne) { 664 // nothing to recover, ignore 665 } catch (KeeperException ke) { 666 throw new IOException("Couldn't get list of inprogress segments", ke); 667 } catch (InterruptedException ie) { 668 Thread.currentThread().interrupt(); 669 throw new IOException("Interrupted getting list of inprogress segments", 670 ie); 671 } 672 } 673 } 674 675 @Override purgeLogsOlderThan(long minTxIdToKeep)676 public void purgeLogsOlderThan(long minTxIdToKeep) 677 throws IOException { 678 checkEnv(); 679 680 for (EditLogLedgerMetadata l : getLedgerList(false)) { 681 if (l.getLastTxId() < minTxIdToKeep) { 682 try { 683 Stat stat = zkc.exists(l.getZkPath(), false); 684 zkc.delete(l.getZkPath(), stat.getVersion()); 685 bkc.deleteLedger(l.getLedgerId()); 686 } catch (InterruptedException ie) { 687 Thread.currentThread().interrupt(); 688 LOG.error("Interrupted while purging " + l, ie); 689 } catch (BKException bke) { 690 LOG.error("Couldn't delete ledger from bookkeeper", bke); 691 } catch (KeeperException ke) { 692 LOG.error("Error deleting ledger entry in zookeeper", ke); 693 } 694 } 695 } 696 } 697 698 @Override discardSegments(long startTxId)699 public void discardSegments(long startTxId) throws IOException { 700 throw new UnsupportedOperationException(); 701 } 702 703 @Override doPreUpgrade()704 public void doPreUpgrade() throws IOException { 705 throw new UnsupportedOperationException(); 706 } 707 708 @Override doUpgrade(Storage storage)709 public void doUpgrade(Storage storage) throws IOException { 710 throw new UnsupportedOperationException(); 711 } 712 713 @Override getJournalCTime()714 public long getJournalCTime() throws IOException { 715 throw new UnsupportedOperationException(); 716 } 717 718 @Override doFinalize()719 public void doFinalize() throws IOException { 720 throw new UnsupportedOperationException(); 721 } 722 723 @Override canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion)724 public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, 725 int targetLayoutVersion) throws IOException { 726 throw new UnsupportedOperationException(); 727 } 728 729 @Override doRollback()730 public void doRollback() throws IOException { 731 throw new UnsupportedOperationException(); 732 } 733 734 @Override close()735 public void close() throws IOException { 736 try { 737 bkc.close(); 738 zkc.close(); 739 } catch (BKException bke) { 740 throw new IOException("Couldn't close bookkeeper client", bke); 741 } catch (InterruptedException ie) { 742 Thread.currentThread().interrupt(); 743 throw new IOException("Interrupted while closing journal manager", ie); 744 } 745 } 746 747 /** 748 * Set the amount of memory that this stream should use to buffer edits. 749 * Setting this will only affect future output stream. Streams 750 * which have currently be created won't be affected. 751 */ 752 @Override setOutputBufferCapacity(int size)753 public void setOutputBufferCapacity(int size) { 754 conf.getInt(BKJM_OUTPUT_BUFFER_SIZE, size); 755 } 756 757 /** 758 * Find the id of the last edit log transaction writen to a edit log 759 * ledger. 760 */ recoverLastTxId(EditLogLedgerMetadata l, boolean fence)761 private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence) 762 throws IOException, SegmentEmptyException { 763 LedgerHandle lh = null; 764 try { 765 if (fence) { 766 lh = bkc.openLedger(l.getLedgerId(), 767 BookKeeper.DigestType.MAC, 768 digestpw.getBytes(Charsets.UTF_8)); 769 } else { 770 lh = bkc.openLedgerNoRecovery(l.getLedgerId(), 771 BookKeeper.DigestType.MAC, 772 digestpw.getBytes(Charsets.UTF_8)); 773 } 774 } catch (BKException bke) { 775 throw new IOException("Exception opening ledger for " + l, bke); 776 } catch (InterruptedException ie) { 777 Thread.currentThread().interrupt(); 778 throw new IOException("Interrupted opening ledger for " + l, ie); 779 } 780 781 BookKeeperEditLogInputStream in = null; 782 783 try { 784 long lastAddConfirmed = lh.getLastAddConfirmed(); 785 if (lastAddConfirmed == -1) { 786 throw new SegmentEmptyException(); 787 } 788 789 in = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed); 790 791 long endTxId = HdfsConstants.INVALID_TXID; 792 FSEditLogOp op = in.readOp(); 793 while (op != null) { 794 if (endTxId == HdfsConstants.INVALID_TXID 795 || op.getTransactionId() == endTxId+1) { 796 endTxId = op.getTransactionId(); 797 } 798 op = in.readOp(); 799 } 800 return endTxId; 801 } finally { 802 if (in != null) { 803 in.close(); 804 } 805 } 806 } 807 808 /** 809 * Get a list of all segments in the journal. 810 */ getLedgerList(boolean inProgressOk)811 List<EditLogLedgerMetadata> getLedgerList(boolean inProgressOk) 812 throws IOException { 813 return getLedgerList(-1, inProgressOk); 814 } 815 getLedgerList(long fromTxId, boolean inProgressOk)816 private List<EditLogLedgerMetadata> getLedgerList(long fromTxId, 817 boolean inProgressOk) throws IOException { 818 List<EditLogLedgerMetadata> ledgers 819 = new ArrayList<EditLogLedgerMetadata>(); 820 try { 821 List<String> ledgerNames = zkc.getChildren(ledgerPath, false); 822 for (String ledgerName : ledgerNames) { 823 if (!inProgressOk && ledgerName.contains(BKJM_EDIT_INPROGRESS)) { 824 continue; 825 } 826 String legderMetadataPath = ledgerPath + "/" + ledgerName; 827 try { 828 EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata 829 .read(zkc, legderMetadataPath); 830 if (editLogLedgerMetadata.getLastTxId() != HdfsConstants.INVALID_TXID 831 && editLogLedgerMetadata.getLastTxId() < fromTxId) { 832 // exclude already read closed edits, but include inprogress edits 833 // as this will be handled in caller 834 continue; 835 } 836 ledgers.add(editLogLedgerMetadata); 837 } catch (KeeperException.NoNodeException e) { 838 LOG.warn("ZNode: " + legderMetadataPath 839 + " might have finalized and deleted." 840 + " So ignoring NoNodeException."); 841 } 842 } 843 } catch (KeeperException e) { 844 throw new IOException("Exception reading ledger list from zk", e); 845 } catch (InterruptedException ie) { 846 Thread.currentThread().interrupt(); 847 throw new IOException("Interrupted getting list of ledgers from zk", ie); 848 } 849 850 Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR); 851 return ledgers; 852 } 853 854 /** 855 * Get the znode path for a finalize ledger 856 */ finalizedLedgerZNode(long startTxId, long endTxId)857 String finalizedLedgerZNode(long startTxId, long endTxId) { 858 return String.format("%s/edits_%018d_%018d", 859 ledgerPath, startTxId, endTxId); 860 } 861 862 /** 863 * Get the znode path for the inprogressZNode 864 */ inprogressZNode(long startTxid)865 String inprogressZNode(long startTxid) { 866 return ledgerPath + "/inprogress_" + Long.toString(startTxid, 16); 867 } 868 869 @VisibleForTesting setZooKeeper(ZooKeeper zk)870 void setZooKeeper(ZooKeeper zk) { 871 this.zkc = zk; 872 } 873 874 /** 875 * Simple watcher to notify when zookeeper has connected 876 */ 877 private class ZkConnectionWatcher implements Watcher { process(WatchedEvent event)878 public void process(WatchedEvent event) { 879 if (Event.KeeperState.SyncConnected.equals(event.getState())) { 880 zkConnectLatch.countDown(); 881 } 882 } 883 } 884 885 private static class SegmentEmptyException extends IOException { 886 } 887 } 888