1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.contrib.bkjournal;
19 
20 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
21 import org.apache.hadoop.hdfs.server.common.Storage;
22 import org.apache.hadoop.hdfs.server.common.StorageInfo;
23 import org.apache.hadoop.hdfs.server.namenode.JournalManager;
24 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
25 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
26 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
27 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
28 import org.apache.hadoop.conf.Configuration;
29 
30 import org.apache.bookkeeper.conf.ClientConfiguration;
31 import org.apache.bookkeeper.client.BKException;
32 import org.apache.bookkeeper.client.BookKeeper;
33 import org.apache.bookkeeper.client.LedgerHandle;
34 import org.apache.bookkeeper.util.ZkUtils;
35 
36 import org.apache.zookeeper.data.Stat;
37 import org.apache.zookeeper.ZooKeeper;
38 import org.apache.zookeeper.Watcher;
39 import org.apache.zookeeper.WatchedEvent;
40 import org.apache.zookeeper.KeeperException;
41 import org.apache.zookeeper.CreateMode;
42 import org.apache.zookeeper.ZooDefs.Ids;
43 import org.apache.zookeeper.AsyncCallback.StringCallback;
44 import org.apache.zookeeper.ZKUtil;
45 
46 import java.util.Collection;
47 import java.util.Collections;
48 import java.util.ArrayList;
49 import java.util.List;
50 import java.util.concurrent.CountDownLatch;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.atomic.AtomicBoolean;
53 import java.io.IOException;
54 
55 import java.net.URI;
56 
57 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
58 import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.VersionProto;
59 import com.google.protobuf.TextFormat;
60 import static com.google.common.base.Charsets.UTF_8;
61 
62 import org.apache.commons.io.Charsets;
63 import org.apache.commons.logging.Log;
64 import org.apache.commons.logging.LogFactory;
65 import com.google.common.annotations.VisibleForTesting;
66 /**
67  * BookKeeper Journal Manager
68  *
69  * To use, add the following to hdfs-site.xml.
70  * <pre>
71  * {@code
72  * <property>
73  *   <name>dfs.namenode.edits.dir</name>
74  *   <value>bookkeeper://zk1:2181;zk2:2181;zk3:2181/hdfsjournal</value>
75  * </property>
76  *
77  * <property>
78  *   <name>dfs.namenode.edits.journal-plugin.bookkeeper</name>
79  *   <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
80  * </property>
81  * }
82  * </pre>
83  * The URI format for bookkeeper is bookkeeper://[zkEnsemble]/[rootZnode]
84  * [zookkeeper ensemble] is a list of semi-colon separated, zookeeper host:port
85  * pairs. In the example above there are 3 servers, in the ensemble,
86  * zk1, zk2 &amp; zk3, each one listening on port 2181.
87  *
88  * [root znode] is the path of the zookeeper znode, under which the editlog
89  * information will be stored.
90  *
91  * Other configuration options are:
92  * <ul>
93  *   <li><b>dfs.namenode.bookkeeperjournal.output-buffer-size</b>
94  *       Number of bytes a bookkeeper journal stream will buffer before
95  *       forcing a flush. Default is 1024.</li>
96  *   <li><b>dfs.namenode.bookkeeperjournal.ensemble-size</b>
97  *       Number of bookkeeper servers in edit log ledger ensembles. This
98  *       is the number of bookkeeper servers which need to be available
99  *       for the ledger to be writable. Default is 3.</li>
100  *   <li><b>dfs.namenode.bookkeeperjournal.quorum-size</b>
101  *       Number of bookkeeper servers in the write quorum. This is the
102  *       number of bookkeeper servers which must have acknowledged the
103  *       write of an entry before it is considered written.
104  *       Default is 2.</li>
105  *   <li><b>dfs.namenode.bookkeeperjournal.digestPw</b>
106  *       Password to use when creating ledgers. </li>
107  *   <li><b>dfs.namenode.bookkeeperjournal.zk.session.timeout</b>
108  *       Session timeout for Zookeeper client from BookKeeper Journal Manager.
109  *       Hadoop recommends that, this value should be less than the ZKFC
110  *       session timeout value. Default value is 3000.</li>
111  * </ul>
112  */
113 public class BookKeeperJournalManager implements JournalManager {
114   static final Log LOG = LogFactory.getLog(BookKeeperJournalManager.class);
115 
116   public static final String BKJM_OUTPUT_BUFFER_SIZE
117     = "dfs.namenode.bookkeeperjournal.output-buffer-size";
118   public static final int BKJM_OUTPUT_BUFFER_SIZE_DEFAULT = 1024;
119 
120   public static final String BKJM_BOOKKEEPER_ENSEMBLE_SIZE
121     = "dfs.namenode.bookkeeperjournal.ensemble-size";
122   public static final int BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3;
123 
124  public static final String BKJM_BOOKKEEPER_QUORUM_SIZE
125     = "dfs.namenode.bookkeeperjournal.quorum-size";
126   public static final int BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT = 2;
127 
128   public static final String BKJM_BOOKKEEPER_DIGEST_PW
129     = "dfs.namenode.bookkeeperjournal.digestPw";
130   public static final String BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT = "";
131 
132   private static final int BKJM_LAYOUT_VERSION = -1;
133 
134   public static final String BKJM_ZK_SESSION_TIMEOUT
135     = "dfs.namenode.bookkeeperjournal.zk.session.timeout";
136   public static final int BKJM_ZK_SESSION_TIMEOUT_DEFAULT = 3000;
137 
138   private static final String BKJM_EDIT_INPROGRESS = "inprogress_";
139 
140   public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH
141     = "dfs.namenode.bookkeeperjournal.zk.availablebookies";
142 
143   public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT
144     = "/ledgers/available";
145 
146   public static final String BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS
147     = "dfs.namenode.bookkeeperjournal.speculativeReadTimeoutMs";
148   public static final int BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT
149     = 2000;
150 
151   public static final String BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC
152     = "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec";
153   public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5;
154 
155   public static final String BKJM_BOOKKEEPER_ACK_QUORUM_SIZE
156     = "dfs.namenode.bookkeeperjournal.ack.quorum-size";
157 
158   public static final String BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC
159     = "dfs.namenode.bookkeeperjournal.addEntryTimeoutSec";
160   public static final int BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT = 5;
161 
162   private ZooKeeper zkc;
163   private final Configuration conf;
164   private final BookKeeper bkc;
165   private final CurrentInprogress ci;
166   private final String basePath;
167   private final String ledgerPath;
168   private final String versionPath;
169   private final MaxTxId maxTxId;
170   private final int ensembleSize;
171   private final int quorumSize;
172   private final int ackQuorumSize;
173   private final int addEntryTimeout;
174   private final String digestpw;
175   private final int speculativeReadTimeout;
176   private final int readEntryTimeout;
177   private final CountDownLatch zkConnectLatch;
178   private final NamespaceInfo nsInfo;
179   private boolean initialized = false;
180   private LedgerHandle currentLedger = null;
181 
182   /**
183    * Construct a Bookkeeper journal manager.
184    */
BookKeeperJournalManager(Configuration conf, URI uri, NamespaceInfo nsInfo)185   public BookKeeperJournalManager(Configuration conf, URI uri,
186       NamespaceInfo nsInfo) throws IOException {
187     this.conf = conf;
188     this.nsInfo = nsInfo;
189 
190     String zkConnect = uri.getAuthority().replace(";", ",");
191     basePath = uri.getPath();
192     ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
193                                BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
194     quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
195                              BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
196     ackQuorumSize = conf.getInt(BKJM_BOOKKEEPER_ACK_QUORUM_SIZE, quorumSize);
197     addEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
198                              BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT);
199     speculativeReadTimeout = conf.getInt(
200                              BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
201                              BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT);
202     readEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC,
203                              BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT);
204 
205     ledgerPath = basePath + "/ledgers";
206     String maxTxIdPath = basePath + "/maxtxid";
207     String currentInprogressNodePath = basePath + "/CurrentInprogress";
208     versionPath = basePath + "/version";
209     digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
210                         BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
211 
212     try {
213       zkConnectLatch = new CountDownLatch(1);
214       int bkjmZKSessionTimeout = conf.getInt(BKJM_ZK_SESSION_TIMEOUT,
215           BKJM_ZK_SESSION_TIMEOUT_DEFAULT);
216       zkc = new ZooKeeper(zkConnect, bkjmZKSessionTimeout,
217           new ZkConnectionWatcher());
218       // Configured zk session timeout + some extra grace period (here
219       // BKJM_ZK_SESSION_TIMEOUT_DEFAULT used as grace period)
220       int zkConnectionLatchTimeout = bkjmZKSessionTimeout
221           + BKJM_ZK_SESSION_TIMEOUT_DEFAULT;
222       if (!zkConnectLatch
223           .await(zkConnectionLatchTimeout, TimeUnit.MILLISECONDS)) {
224         throw new IOException("Error connecting to zookeeper");
225       }
226 
227       prepareBookKeeperEnv();
228       ClientConfiguration clientConf = new ClientConfiguration();
229       clientConf.setSpeculativeReadTimeout(speculativeReadTimeout);
230       clientConf.setReadEntryTimeout(readEntryTimeout);
231       clientConf.setAddEntryTimeout(addEntryTimeout);
232       bkc = new BookKeeper(clientConf, zkc);
233     } catch (KeeperException e) {
234       throw new IOException("Error initializing zk", e);
235     } catch (InterruptedException ie) {
236       Thread.currentThread().interrupt();
237       throw new IOException("Interrupted while initializing bk journal manager",
238                             ie);
239     }
240 
241     ci = new CurrentInprogress(zkc, currentInprogressNodePath);
242     maxTxId = new MaxTxId(zkc, maxTxIdPath);
243   }
244 
245   /**
246    * Pre-creating bookkeeper metadata path in zookeeper.
247    */
prepareBookKeeperEnv()248   private void prepareBookKeeperEnv() throws IOException {
249     // create bookie available path in zookeeper if it doesn't exists
250     final String zkAvailablePath = conf.get(BKJM_ZK_LEDGERS_AVAILABLE_PATH,
251         BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT);
252     final CountDownLatch zkPathLatch = new CountDownLatch(1);
253 
254     final AtomicBoolean success = new AtomicBoolean(false);
255     StringCallback callback = new StringCallback() {
256       @Override
257       public void processResult(int rc, String path, Object ctx, String name) {
258         if (KeeperException.Code.OK.intValue() == rc
259             || KeeperException.Code.NODEEXISTS.intValue() == rc) {
260           LOG.info("Successfully created bookie available path : "
261               + zkAvailablePath);
262           success.set(true);
263         } else {
264           KeeperException.Code code = KeeperException.Code.get(rc);
265           LOG.error("Error : "
266                   + KeeperException.create(code, path).getMessage()
267                   + ", failed to create bookie available path : "
268                   + zkAvailablePath);
269         }
270         zkPathLatch.countDown();
271       }
272     };
273     ZkUtils.asyncCreateFullPathOptimistic(zkc, zkAvailablePath, new byte[0],
274         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null);
275 
276     try {
277       if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS)
278           || !success.get()) {
279         throw new IOException("Couldn't create bookie available path :"
280             + zkAvailablePath + ", timed out " + zkc.getSessionTimeout()
281             + " millis");
282       }
283     } catch (InterruptedException e) {
284       Thread.currentThread().interrupt();
285       throw new IOException(
286           "Interrupted when creating the bookie available path : "
287               + zkAvailablePath, e);
288     }
289   }
290 
291   @Override
format(NamespaceInfo ns)292   public void format(NamespaceInfo ns) throws IOException {
293     try {
294       // delete old info
295       Stat baseStat = null;
296       Stat ledgerStat = null;
297       if ((baseStat = zkc.exists(basePath, false)) != null) {
298         if ((ledgerStat = zkc.exists(ledgerPath, false)) != null) {
299           for (EditLogLedgerMetadata l : getLedgerList(true)) {
300             try {
301               bkc.deleteLedger(l.getLedgerId());
302             } catch (BKException.BKNoSuchLedgerExistsException bke) {
303               LOG.warn("Ledger " + l.getLedgerId() + " does not exist;"
304                        + " Cannot delete.");
305             }
306           }
307         }
308         ZKUtil.deleteRecursive(zkc, basePath);
309       }
310 
311       // should be clean now.
312       zkc.create(basePath, new byte[] {'0'},
313           Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
314 
315       VersionProto.Builder builder = VersionProto.newBuilder();
316       builder.setNamespaceInfo(PBHelper.convert(ns))
317         .setLayoutVersion(BKJM_LAYOUT_VERSION);
318 
319       byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8);
320       zkc.create(versionPath, data,
321                  Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
322 
323       zkc.create(ledgerPath, new byte[] {'0'},
324                  Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
325     } catch (KeeperException ke) {
326       LOG.error("Error accessing zookeeper to format", ke);
327       throw new IOException("Error accessing zookeeper to format", ke);
328     } catch (InterruptedException ie) {
329       Thread.currentThread().interrupt();
330       throw new IOException("Interrupted during format", ie);
331     } catch (BKException bke) {
332       throw new IOException("Error cleaning up ledgers during format", bke);
333     }
334   }
335 
336   @Override
hasSomeData()337   public boolean hasSomeData() throws IOException {
338     try {
339       return zkc.exists(basePath, false) != null;
340     } catch (KeeperException ke) {
341       throw new IOException("Couldn't contact zookeeper", ke);
342     } catch (InterruptedException ie) {
343       Thread.currentThread().interrupt();
344       throw new IOException("Interrupted while checking for data", ie);
345     }
346   }
347 
checkEnv()348   synchronized private void checkEnv() throws IOException {
349     if (!initialized) {
350       try {
351         Stat versionStat = zkc.exists(versionPath, false);
352         if (versionStat == null) {
353           throw new IOException("Environment not initialized. "
354                                 +"Have you forgotten to format?");
355         }
356         byte[] d = zkc.getData(versionPath, false, versionStat);
357 
358         VersionProto.Builder builder = VersionProto.newBuilder();
359         TextFormat.merge(new String(d, UTF_8), builder);
360         if (!builder.isInitialized()) {
361           throw new IOException("Invalid/Incomplete data in znode");
362         }
363         VersionProto vp = builder.build();
364 
365         // There's only one version at the moment
366         assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION;
367 
368         NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo());
369 
370         if (nsInfo.getNamespaceID() != readns.getNamespaceID() ||
371             !nsInfo.clusterID.equals(readns.getClusterID()) ||
372             !nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) {
373           String err = String.format("Environment mismatch. Running process %s"
374                                      +", stored in ZK %s", nsInfo, readns);
375           LOG.error(err);
376           throw new IOException(err);
377         }
378 
379         ci.init();
380         initialized = true;
381       } catch (KeeperException ke) {
382         throw new IOException("Cannot access ZooKeeper", ke);
383       } catch (InterruptedException ie) {
384         Thread.currentThread().interrupt();
385         throw new IOException("Interrupted while checking environment", ie);
386       }
387     }
388   }
389 
390   /**
391    * Start a new log segment in a BookKeeper ledger.
392    * First ensure that we have the write lock for this journal.
393    * Then create a ledger and stream based on that ledger.
394    * The ledger id is written to the inprogress znode, so that in the
395    * case of a crash, a recovery process can find the ledger we were writing
396    * to when we crashed.
397    * @param txId First transaction id to be written to the stream
398    */
399   @Override
startLogSegment(long txId, int layoutVersion)400   public EditLogOutputStream startLogSegment(long txId, int layoutVersion)
401       throws IOException {
402     checkEnv();
403 
404     if (txId <= maxTxId.get()) {
405       throw new IOException("We've already seen " + txId
406           + ". A new stream cannot be created with it");
407     }
408 
409     try {
410       String existingInprogressNode = ci.read();
411       if (null != existingInprogressNode
412           && zkc.exists(existingInprogressNode, false) != null) {
413         throw new IOException("Inprogress node already exists");
414       }
415       if (currentLedger != null) {
416         // bookkeeper errored on last stream, clean up ledger
417         currentLedger.close();
418       }
419       currentLedger = bkc.createLedger(ensembleSize, quorumSize, ackQuorumSize,
420                                        BookKeeper.DigestType.MAC,
421                                        digestpw.getBytes(Charsets.UTF_8));
422     } catch (BKException bke) {
423       throw new IOException("Error creating ledger", bke);
424     } catch (KeeperException ke) {
425       throw new IOException("Error in zookeeper while creating ledger", ke);
426     } catch (InterruptedException ie) {
427       Thread.currentThread().interrupt();
428       throw new IOException("Interrupted creating ledger", ie);
429     }
430 
431     try {
432       String znodePath = inprogressZNode(txId);
433       EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
434           layoutVersion, currentLedger.getId(), txId);
435       /* Write the ledger metadata out to the inprogress ledger znode
436        * This can fail if for some reason our write lock has
437        * expired (@see WriteLock) and another process has managed to
438        * create the inprogress znode.
439        * In this case, throw an exception. We don't want to continue
440        * as this would lead to a split brain situation.
441        */
442       l.write(zkc, znodePath);
443 
444       maxTxId.store(txId);
445       ci.update(znodePath);
446       return new BookKeeperEditLogOutputStream(conf, currentLedger);
447     } catch (KeeperException ke) {
448       cleanupLedger(currentLedger);
449       throw new IOException("Error storing ledger metadata", ke);
450     }
451   }
452 
cleanupLedger(LedgerHandle lh)453   private void cleanupLedger(LedgerHandle lh) {
454     try {
455       long id = currentLedger.getId();
456       currentLedger.close();
457       bkc.deleteLedger(id);
458     } catch (BKException bke) {
459       //log & ignore, an IOException will be thrown soon
460       LOG.error("Error closing ledger", bke);
461     } catch (InterruptedException ie) {
462       Thread.currentThread().interrupt();
463       LOG.warn("Interrupted while closing ledger", ie);
464     }
465   }
466 
467 
468 
469   /**
470    * Finalize a log segment. If the journal manager is currently
471    * writing to a ledger, ensure that this is the ledger of the log segment
472    * being finalized.
473    *
474    * Otherwise this is the recovery case. In the recovery case, ensure that
475    * the firstTxId of the ledger matches firstTxId for the segment we are
476    * trying to finalize.
477    */
478   @Override
finalizeLogSegment(long firstTxId, long lastTxId)479   public void finalizeLogSegment(long firstTxId, long lastTxId)
480       throws IOException {
481     checkEnv();
482 
483     String inprogressPath = inprogressZNode(firstTxId);
484     try {
485       Stat inprogressStat = zkc.exists(inprogressPath, false);
486       if (inprogressStat == null) {
487         throw new IOException("Inprogress znode " + inprogressPath
488                               + " doesn't exist");
489       }
490 
491       EditLogLedgerMetadata l
492         =  EditLogLedgerMetadata.read(zkc, inprogressPath);
493 
494       if (currentLedger != null) { // normal, non-recovery case
495         if (l.getLedgerId() == currentLedger.getId()) {
496           try {
497             currentLedger.close();
498           } catch (BKException bke) {
499             LOG.error("Error closing current ledger", bke);
500           }
501           currentLedger = null;
502         } else {
503           throw new IOException(
504               "Active ledger has different ID to inprogress. "
505               + l.getLedgerId() + " found, "
506               + currentLedger.getId() + " expected");
507         }
508       }
509 
510       if (l.getFirstTxId() != firstTxId) {
511         throw new IOException("Transaction id not as expected, "
512             + l.getFirstTxId() + " found, " + firstTxId + " expected");
513       }
514 
515       l.finalizeLedger(lastTxId);
516       String finalisedPath = finalizedLedgerZNode(firstTxId, lastTxId);
517       try {
518         l.write(zkc, finalisedPath);
519       } catch (KeeperException.NodeExistsException nee) {
520         if (!l.verify(zkc, finalisedPath)) {
521           throw new IOException("Node " + finalisedPath + " already exists"
522                                 + " but data doesn't match");
523         }
524       }
525       maxTxId.store(lastTxId);
526       zkc.delete(inprogressPath, inprogressStat.getVersion());
527       String inprogressPathFromCI = ci.read();
528       if (inprogressPath.equals(inprogressPathFromCI)) {
529         ci.clear();
530       }
531     } catch (KeeperException e) {
532       throw new IOException("Error finalising ledger", e);
533     } catch (InterruptedException ie) {
534       Thread.currentThread().interrupt();
535       throw new IOException("Error finalising ledger", ie);
536     }
537   }
538 
539   @Override
selectInputStreams(Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk)540   public void selectInputStreams(Collection<EditLogInputStream> streams,
541       long fromTxId, boolean inProgressOk)
542       throws IOException {
543     List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
544         inProgressOk);
545     try {
546       BookKeeperEditLogInputStream elis = null;
547       for (EditLogLedgerMetadata l : currentLedgerList) {
548         long lastTxId = l.getLastTxId();
549         if (l.isInProgress()) {
550           lastTxId = recoverLastTxId(l, false);
551         }
552         // Check once again, required in case of InProgress and is case of any
553         // gap.
554         if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
555           LedgerHandle h;
556           if (l.isInProgress()) { // we don't want to fence the current journal
557             h = bkc.openLedgerNoRecovery(l.getLedgerId(),
558                 BookKeeper.DigestType.MAC, digestpw.getBytes(Charsets.UTF_8));
559           } else {
560             h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
561                 digestpw.getBytes(Charsets.UTF_8));
562           }
563           elis = new BookKeeperEditLogInputStream(h, l);
564           elis.skipTo(fromTxId);
565         } else {
566           // If mismatches then there might be some gap, so we should not check
567           // further.
568           return;
569         }
570         streams.add(elis);
571         if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) {
572           return;
573         }
574         fromTxId = elis.getLastTxId() + 1;
575       }
576     } catch (BKException e) {
577       throw new IOException("Could not open ledger for " + fromTxId, e);
578     } catch (InterruptedException ie) {
579       Thread.currentThread().interrupt();
580       throw new IOException("Interrupted opening ledger for " + fromTxId, ie);
581     }
582   }
583 
getNumberOfTransactions(long fromTxId, boolean inProgressOk)584   long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
585       throws IOException {
586     long count = 0;
587     long expectedStart = 0;
588     for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) {
589       long lastTxId = l.getLastTxId();
590       if (l.isInProgress()) {
591         lastTxId = recoverLastTxId(l, false);
592         if (lastTxId == HdfsConstants.INVALID_TXID) {
593           break;
594         }
595       }
596 
597       assert lastTxId >= l.getFirstTxId();
598 
599       if (lastTxId < fromTxId) {
600         continue;
601       } else if (l.getFirstTxId() <= fromTxId && lastTxId >= fromTxId) {
602         // we can start in the middle of a segment
603         count = (lastTxId - l.getFirstTxId()) + 1;
604         expectedStart = lastTxId + 1;
605       } else {
606         if (expectedStart != l.getFirstTxId()) {
607           if (count == 0) {
608             throw new CorruptionException("StartTxId " + l.getFirstTxId()
609                 + " is not as expected " + expectedStart
610                 + ". Gap in transaction log?");
611           } else {
612             break;
613           }
614         }
615         count += (lastTxId - l.getFirstTxId()) + 1;
616         expectedStart = lastTxId + 1;
617       }
618     }
619     return count;
620   }
621 
622   @Override
recoverUnfinalizedSegments()623   public void recoverUnfinalizedSegments() throws IOException {
624     checkEnv();
625 
626     synchronized (this) {
627       try {
628         List<String> children = zkc.getChildren(ledgerPath, false);
629         for (String child : children) {
630           if (!child.startsWith(BKJM_EDIT_INPROGRESS)) {
631             continue;
632           }
633           String znode = ledgerPath + "/" + child;
634           EditLogLedgerMetadata l = EditLogLedgerMetadata.read(zkc, znode);
635           try {
636             long endTxId = recoverLastTxId(l, true);
637             if (endTxId == HdfsConstants.INVALID_TXID) {
638               LOG.error("Unrecoverable corruption has occurred in segment "
639                   + l.toString() + " at path " + znode
640                   + ". Unable to continue recovery.");
641               throw new IOException("Unrecoverable corruption,"
642                   + " please check logs.");
643             }
644             finalizeLogSegment(l.getFirstTxId(), endTxId);
645           } catch (SegmentEmptyException see) {
646             LOG.warn("Inprogress znode " + child
647                 + " refers to a ledger which is empty. This occurs when the NN"
648                 + " crashes after opening a segment, but before writing the"
649                 + " OP_START_LOG_SEGMENT op. It is safe to delete."
650                 + " MetaData [" + l.toString() + "]");
651 
652             // If the max seen transaction is the same as what would
653             // have been the first transaction of the failed ledger,
654             // decrement it, as that transaction never happened and as
655             // such, is _not_ the last seen
656             if (maxTxId.get() == l.getFirstTxId()) {
657               maxTxId.reset(maxTxId.get() - 1);
658             }
659 
660             zkc.delete(znode, -1);
661           }
662         }
663       } catch (KeeperException.NoNodeException nne) {
664           // nothing to recover, ignore
665       } catch (KeeperException ke) {
666         throw new IOException("Couldn't get list of inprogress segments", ke);
667       } catch (InterruptedException ie) {
668         Thread.currentThread().interrupt();
669         throw new IOException("Interrupted getting list of inprogress segments",
670                               ie);
671       }
672     }
673   }
674 
675   @Override
purgeLogsOlderThan(long minTxIdToKeep)676   public void purgeLogsOlderThan(long minTxIdToKeep)
677       throws IOException {
678     checkEnv();
679 
680     for (EditLogLedgerMetadata l : getLedgerList(false)) {
681       if (l.getLastTxId() < minTxIdToKeep) {
682         try {
683           Stat stat = zkc.exists(l.getZkPath(), false);
684           zkc.delete(l.getZkPath(), stat.getVersion());
685           bkc.deleteLedger(l.getLedgerId());
686         } catch (InterruptedException ie) {
687           Thread.currentThread().interrupt();
688           LOG.error("Interrupted while purging " + l, ie);
689         } catch (BKException bke) {
690           LOG.error("Couldn't delete ledger from bookkeeper", bke);
691         } catch (KeeperException ke) {
692           LOG.error("Error deleting ledger entry in zookeeper", ke);
693         }
694       }
695     }
696   }
697 
698   @Override
discardSegments(long startTxId)699   public void discardSegments(long startTxId) throws IOException {
700     throw new UnsupportedOperationException();
701   }
702 
703   @Override
doPreUpgrade()704   public void doPreUpgrade() throws IOException {
705     throw new UnsupportedOperationException();
706   }
707 
708   @Override
doUpgrade(Storage storage)709   public void doUpgrade(Storage storage) throws IOException {
710     throw new UnsupportedOperationException();
711   }
712 
713   @Override
getJournalCTime()714   public long getJournalCTime() throws IOException {
715     throw new UnsupportedOperationException();
716   }
717 
718   @Override
doFinalize()719   public void doFinalize() throws IOException {
720     throw new UnsupportedOperationException();
721   }
722 
723   @Override
canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion)724   public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
725       int targetLayoutVersion) throws IOException {
726     throw new UnsupportedOperationException();
727   }
728 
729   @Override
doRollback()730   public void doRollback() throws IOException {
731     throw new UnsupportedOperationException();
732   }
733 
734   @Override
close()735   public void close() throws IOException {
736     try {
737       bkc.close();
738       zkc.close();
739     } catch (BKException bke) {
740       throw new IOException("Couldn't close bookkeeper client", bke);
741     } catch (InterruptedException ie) {
742       Thread.currentThread().interrupt();
743       throw new IOException("Interrupted while closing journal manager", ie);
744     }
745   }
746 
747   /**
748    * Set the amount of memory that this stream should use to buffer edits.
749    * Setting this will only affect future output stream. Streams
750    * which have currently be created won't be affected.
751    */
752   @Override
setOutputBufferCapacity(int size)753   public void setOutputBufferCapacity(int size) {
754     conf.getInt(BKJM_OUTPUT_BUFFER_SIZE, size);
755   }
756 
757   /**
758    * Find the id of the last edit log transaction writen to a edit log
759    * ledger.
760    */
recoverLastTxId(EditLogLedgerMetadata l, boolean fence)761   private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence)
762       throws IOException, SegmentEmptyException {
763     LedgerHandle lh = null;
764     try {
765       if (fence) {
766         lh = bkc.openLedger(l.getLedgerId(),
767                             BookKeeper.DigestType.MAC,
768                             digestpw.getBytes(Charsets.UTF_8));
769       } else {
770         lh = bkc.openLedgerNoRecovery(l.getLedgerId(),
771                                       BookKeeper.DigestType.MAC,
772                                       digestpw.getBytes(Charsets.UTF_8));
773       }
774     } catch (BKException bke) {
775       throw new IOException("Exception opening ledger for " + l, bke);
776     } catch (InterruptedException ie) {
777       Thread.currentThread().interrupt();
778       throw new IOException("Interrupted opening ledger for " + l, ie);
779     }
780 
781     BookKeeperEditLogInputStream in = null;
782 
783     try {
784       long lastAddConfirmed = lh.getLastAddConfirmed();
785       if (lastAddConfirmed == -1) {
786         throw new SegmentEmptyException();
787       }
788 
789       in = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
790 
791       long endTxId = HdfsConstants.INVALID_TXID;
792       FSEditLogOp op = in.readOp();
793       while (op != null) {
794         if (endTxId == HdfsConstants.INVALID_TXID
795             || op.getTransactionId() == endTxId+1) {
796           endTxId = op.getTransactionId();
797         }
798         op = in.readOp();
799       }
800       return endTxId;
801     } finally {
802       if (in != null) {
803         in.close();
804       }
805     }
806   }
807 
808   /**
809    * Get a list of all segments in the journal.
810    */
getLedgerList(boolean inProgressOk)811   List<EditLogLedgerMetadata> getLedgerList(boolean inProgressOk)
812       throws IOException {
813     return getLedgerList(-1, inProgressOk);
814   }
815 
getLedgerList(long fromTxId, boolean inProgressOk)816   private List<EditLogLedgerMetadata> getLedgerList(long fromTxId,
817       boolean inProgressOk) throws IOException {
818     List<EditLogLedgerMetadata> ledgers
819       = new ArrayList<EditLogLedgerMetadata>();
820     try {
821       List<String> ledgerNames = zkc.getChildren(ledgerPath, false);
822       for (String ledgerName : ledgerNames) {
823         if (!inProgressOk && ledgerName.contains(BKJM_EDIT_INPROGRESS)) {
824           continue;
825         }
826         String legderMetadataPath = ledgerPath + "/" + ledgerName;
827         try {
828           EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata
829               .read(zkc, legderMetadataPath);
830           if (editLogLedgerMetadata.getLastTxId() != HdfsConstants.INVALID_TXID
831               && editLogLedgerMetadata.getLastTxId() < fromTxId) {
832             // exclude already read closed edits, but include inprogress edits
833             // as this will be handled in caller
834             continue;
835           }
836           ledgers.add(editLogLedgerMetadata);
837         } catch (KeeperException.NoNodeException e) {
838           LOG.warn("ZNode: " + legderMetadataPath
839               + " might have finalized and deleted."
840               + " So ignoring NoNodeException.");
841         }
842       }
843     } catch (KeeperException e) {
844       throw new IOException("Exception reading ledger list from zk", e);
845     } catch (InterruptedException ie) {
846       Thread.currentThread().interrupt();
847       throw new IOException("Interrupted getting list of ledgers from zk", ie);
848     }
849 
850     Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR);
851     return ledgers;
852   }
853 
854   /**
855    * Get the znode path for a finalize ledger
856    */
finalizedLedgerZNode(long startTxId, long endTxId)857   String finalizedLedgerZNode(long startTxId, long endTxId) {
858     return String.format("%s/edits_%018d_%018d",
859                          ledgerPath, startTxId, endTxId);
860   }
861 
862   /**
863    * Get the znode path for the inprogressZNode
864    */
inprogressZNode(long startTxid)865   String inprogressZNode(long startTxid) {
866     return ledgerPath + "/inprogress_" + Long.toString(startTxid, 16);
867   }
868 
869   @VisibleForTesting
setZooKeeper(ZooKeeper zk)870   void setZooKeeper(ZooKeeper zk) {
871     this.zkc = zk;
872   }
873 
874   /**
875    * Simple watcher to notify when zookeeper has connected
876    */
877   private class ZkConnectionWatcher implements Watcher {
process(WatchedEvent event)878     public void process(WatchedEvent event) {
879       if (Event.KeeperState.SyncConnected.equals(event.getState())) {
880         zkConnectLatch.countDown();
881       }
882     }
883   }
884 
885   private static class SegmentEmptyException extends IOException {
886   }
887 }
888