1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hbase.regionserver.wal;
19 
20 import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
21 
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.io.OutputStream;
26 import java.lang.management.ManagementFactory;
27 import java.lang.management.MemoryUsage;
28 import java.lang.reflect.InvocationTargetException;
29 import java.net.URLEncoder;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Comparator;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.NavigableMap;
36 import java.util.Set;
37 import java.util.concurrent.BlockingQueue;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ConcurrentSkipListMap;
40 import java.util.concurrent.CopyOnWriteArrayList;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.LinkedBlockingQueue;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.atomic.AtomicInteger;
49 import java.util.concurrent.atomic.AtomicLong;
50 import java.util.concurrent.locks.ReentrantLock;
51 
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.apache.hadoop.conf.Configuration;
55 import org.apache.hadoop.fs.FSDataOutputStream;
56 import org.apache.hadoop.fs.FileStatus;
57 import org.apache.hadoop.fs.FileSystem;
58 import org.apache.hadoop.fs.Path;
59 import org.apache.hadoop.fs.PathFilter;
60 import org.apache.hadoop.hbase.Cell;
61 import org.apache.hadoop.hbase.CellUtil;
62 import org.apache.hadoop.hbase.HBaseConfiguration;
63 import org.apache.hadoop.hbase.HConstants;
64 import org.apache.hadoop.hbase.HRegionInfo;
65 import org.apache.hadoop.hbase.HTableDescriptor;
66 import org.apache.hadoop.hbase.classification.InterfaceAudience;
67 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
68 import org.apache.hadoop.hbase.util.Bytes;
69 import org.apache.hadoop.hbase.util.ClassSize;
70 import org.apache.hadoop.hbase.util.DrainBarrier;
71 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
72 import org.apache.hadoop.hbase.util.FSUtils;
73 import org.apache.hadoop.hbase.util.HasThread;
74 import org.apache.hadoop.hbase.util.Threads;
75 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
76 import org.apache.hadoop.hbase.wal.WAL;
77 import org.apache.hadoop.hbase.wal.WALFactory;
78 import org.apache.hadoop.hbase.wal.WALKey;
79 import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
80 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
81 import org.apache.hadoop.hbase.wal.WALSplitter;
82 import org.apache.hadoop.hdfs.DFSOutputStream;
83 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
84 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
85 import org.apache.hadoop.util.StringUtils;
86 import org.apache.htrace.NullScope;
87 import org.apache.htrace.Span;
88 import org.apache.htrace.Trace;
89 import org.apache.htrace.TraceScope;
90 
91 import com.google.common.annotations.VisibleForTesting;
92 import com.lmax.disruptor.BlockingWaitStrategy;
93 import com.lmax.disruptor.EventHandler;
94 import com.lmax.disruptor.ExceptionHandler;
95 import com.lmax.disruptor.LifecycleAware;
96 import com.lmax.disruptor.TimeoutException;
97 import com.lmax.disruptor.dsl.Disruptor;
98 import com.lmax.disruptor.dsl.ProducerType;
99 
100 /**
101  * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS.
102  * Only one WAL is ever being written at a time.  When a WAL hits a configured maximum size,
103  * it is rolled.  This is done internal to the implementation.
104  *
105  * <p>As data is flushed from the MemStore to other on-disk structures (files sorted by
106  * key, hfiles), a WAL becomes obsolete. We can let go of all the log edits/entries for a given
107  * HRegion-sequence id.  A bunch of work in the below is done keeping account of these region
108  * sequence ids -- what is flushed out to hfiles, and what is yet in WAL and in memory only.
109  *
110  * <p>It is only practical to delete entire files. Thus, we delete an entire on-disk file
111  * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
112  * (smaller) than the most-recent flush.
113  *
114  * <p>To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem,
115  * org.apache.hadoop.fs.Path)}.
116  *
117  * <h2>Failure Semantic</h2>
118  * If an exception on append or sync, roll the WAL because the current WAL is now a lame duck;
119  * any more appends or syncs will fail also with the same original exception. If we have made
120  * successful appends to the WAL and we then are unable to sync them, our current semantic is to
121  * return error to the client that the appends failed but also to abort the current context,
122  * usually the hosting server. We need to replay the WALs. TODO: Change this semantic. A roll of
123  * WAL may be sufficient as long as we have flagged client that the append failed. TODO:
124  * replication may pick up these last edits though they have been marked as failed append (Need to
125  * keep our own file lengths, not rely on HDFS).
126  */
127 @InterfaceAudience.Private
128 public class FSHLog implements WAL {
129   // IMPLEMENTATION NOTES:
130   //
131   // At the core is a ring buffer.  Our ring buffer is the LMAX Disruptor.  It tries to
132   // minimize synchronizations and volatile writes when multiple contending threads as is the case
133   // here appending and syncing on a single WAL.  The Disruptor is configured to handle multiple
134   // producers but it has one consumer only (the producers in HBase are IPC Handlers calling append
135   // and then sync).  The single consumer/writer pulls the appends and syncs off the ring buffer.
136   // When a handler calls sync, it is given back a future. The producer 'blocks' on the future so
137   // it does not return until the sync completes.  The future is passed over the ring buffer from
138   // the producer/handler to the consumer thread where it does its best to batch up the producer
139   // syncs so one WAL sync actually spans multiple producer sync invocations.  How well the
140   // batching works depends on the write rate; i.e. we tend to batch more in times of
141   // high writes/syncs.
142   //
143   // Calls to append now also wait until the append has been done on the consumer side of the
144   // disruptor.  We used to not wait but it makes the implemenation easier to grok if we have
145   // the region edit/sequence id after the append returns.
146   //
147   // TODO: Handlers need to coordinate appending AND syncing.  Can we have the threads contend
148   // once only?  Probably hard given syncs take way longer than an append.
149   //
150   // The consumer threads pass the syncs off to multiple syncing threads in a round robin fashion
151   // to ensure we keep up back-to-back FS sync calls (FS sync calls are the long poll writing the
152   // WAL).  The consumer thread passes the futures to the sync threads for it to complete
153   // the futures when done.
154   //
155   // The 'sequence' in the below is the sequence of the append/sync on the ringbuffer.  It
156   // acts as a sort-of transaction id.  It is always incrementing.
157   //
158   // The RingBufferEventHandler class hosts the ring buffer consuming code.  The threads that
159   // do the actual FS sync are implementations of SyncRunner.  SafePointZigZagLatch is a
160   // synchronization class used to halt the consumer at a safe point --  just after all outstanding
161   // syncs and appends have completed -- so the log roller can swap the WAL out under it.
162 
163   private static final Log LOG = LogFactory.getLog(FSHLog.class);
164 
165   private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
166 
167   /**
168    * The nexus at which all incoming handlers meet.  Does appends and sync with an ordering.
169    * Appends and syncs are each put on the ring which means handlers need to
170    * smash up against the ring twice (can we make it once only? ... maybe not since time to append
171    * is so different from time to sync and sometimes we don't want to sync or we want to async
172    * the sync).  The ring is where we make sure of our ordering and it is also where we do
173    * batching up of handler sync calls.
174    */
175   private final Disruptor<RingBufferTruck> disruptor;
176 
177   /**
178    * An executorservice that runs the disruptor AppendEventHandler append executor.
179    */
180   private final ExecutorService appendExecutor;
181 
182   /**
183    * This fellow is run by the above appendExecutor service but it is all about batching up appends
184    * and syncs; it may shutdown without cleaning out the last few appends or syncs.  To guard
185    * against this, keep a reference to this handler and do explicit close on way out to make sure
186    * all flushed out before we exit.
187    */
188   private final RingBufferEventHandler ringBufferEventHandler;
189 
190   /**
191    * Map of {@link SyncFuture}s keyed by Handler objects.  Used so we reuse SyncFutures.
192    * TODO: Reus FSWALEntry's rather than create them anew each time as we do SyncFutures here.
193    * TODO: Add a FSWalEntry and SyncFuture as thread locals on handlers rather than have them
194    * get them from this Map?
195    */
196   private final Map<Thread, SyncFuture> syncFuturesByHandler;
197 
198   /**
199    * The highest known outstanding unsync'd WALEdit sequence number where sequence number is the
200    * ring buffer sequence.  Maintained by the ring buffer consumer.
201    */
202   private volatile long highestUnsyncedSequence = -1;
203 
204   /**
205    * Updated to the ring buffer sequence of the last successful sync call.  This can be less than
206    * {@link #highestUnsyncedSequence} for case where we have an append where a sync has not yet
207    * come in for it.  Maintained by the syncing threads.
208    */
209   private final AtomicLong highestSyncedSequence = new AtomicLong(0);
210 
211   /**
212    * file system instance
213    */
214   protected final FileSystem fs;
215 
216   /**
217    * WAL directory, where all WAL files would be placed.
218    */
219   private final Path fullPathLogDir;
220 
221   /**
222    * dir path where old logs are kept.
223    */
224   private final Path fullPathArchiveDir;
225 
226   /**
227    * Matches just those wal files that belong to this wal instance.
228    */
229   private final PathFilter ourFiles;
230 
231   /**
232    * Prefix of a WAL file, usually the region server name it is hosted on.
233    */
234   private final String logFilePrefix;
235 
236   /**
237    * Suffix included on generated wal file names
238    */
239   private final String logFileSuffix;
240 
241   /**
242    * Prefix used when checking for wal membership.
243    */
244   private final String prefixPathStr;
245 
246   private final WALCoprocessorHost coprocessorHost;
247 
248   /**
249    * conf object
250    */
251   protected final Configuration conf;
252 
253   /** Listeners that are called on WAL events. */
254   private final List<WALActionsListener> listeners =
255     new CopyOnWriteArrayList<WALActionsListener>();
256 
257   @Override
registerWALActionsListener(final WALActionsListener listener)258   public void registerWALActionsListener(final WALActionsListener listener) {
259     this.listeners.add(listener);
260   }
261 
262   @Override
unregisterWALActionsListener(final WALActionsListener listener)263   public boolean unregisterWALActionsListener(final WALActionsListener listener) {
264     return this.listeners.remove(listener);
265   }
266 
267   @Override
getCoprocessorHost()268   public WALCoprocessorHost getCoprocessorHost() {
269     return coprocessorHost;
270   }
271 
272   /**
273    * FSDataOutputStream associated with the current SequenceFile.writer
274    */
275   private FSDataOutputStream hdfs_out;
276 
277   // All about log rolling if not enough replicas outstanding.
278 
279   // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered
280   private final int minTolerableReplication;
281 
282   private final int slowSyncNs;
283 
284   // If live datanode count is lower than the default replicas value,
285   // RollWriter will be triggered in each sync(So the RollWriter will be
286   // triggered one by one in a short time). Using it as a workaround to slow
287   // down the roll frequency triggered by checkLowReplication().
288   private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
289 
290   private final int lowReplicationRollLimit;
291 
292   // If consecutiveLogRolls is larger than lowReplicationRollLimit,
293   // then disable the rolling in checkLowReplication().
294   // Enable it if the replications recover.
295   private volatile boolean lowReplicationRollEnabled = true;
296 
297   /**
298    * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding
299    * sequence id as yet not flushed as well as the most recent edit sequence id appended to the
300    * WAL. Has facility for answering questions such as "Is it safe to GC a WAL?".
301    */
302   private SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
303 
304   /**
305    * Current log file.
306    */
307   volatile Writer writer;
308 
309   /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */
310   private final DrainBarrier closeBarrier = new DrainBarrier();
311 
312   /**
313    * This lock makes sure only one log roll runs at a time. Should not be taken while any other
314    * lock is held. We don't just use synchronized because that results in bogus and tedious
315    * findbugs warning when it thinks synchronized controls writer thread safety.  It is held when
316    * we are actually rolling the log.  It is checked when we are looking to see if we should roll
317    * the log or not.
318    */
319   private final ReentrantLock rollWriterLock = new ReentrantLock(true);
320 
321   private volatile boolean closed = false;
322   private final AtomicBoolean shutdown = new AtomicBoolean(false);
323 
324   // The timestamp (in ms) when the log file was created.
325   private final AtomicLong filenum = new AtomicLong(-1);
326 
327   // Number of transactions in the current Wal.
328   private final AtomicInteger numEntries = new AtomicInteger(0);
329 
330   // If > than this size, roll the log.
331   private final long logrollsize;
332 
333   /**
334    * The total size of wal
335    */
336   private AtomicLong totalLogSize = new AtomicLong(0);
337 
338   /*
339    * If more than this many logs, force flush of oldest region to oldest edit
340    * goes to disk.  If too many and we crash, then will take forever replaying.
341    * Keep the number of logs tidy.
342    */
343   private final int maxLogs;
344 
345   /** Number of log close errors tolerated before we abort */
346   private final int closeErrorsTolerated;
347 
348   private final AtomicInteger closeErrorCount = new AtomicInteger();
349 
350 
351   /**
352    * WAL Comparator; it compares the timestamp (log filenum), present in the log file name.
353    * Throws an IllegalArgumentException if used to compare paths from different wals.
354    */
355   final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
356     @Override
357     public int compare(Path o1, Path o2) {
358       long t1 = getFileNumFromFileName(o1);
359       long t2 = getFileNumFromFileName(o2);
360       if (t1 == t2) return 0;
361       return (t1 > t2) ? 1 : -1;
362     }
363   };
364 
365   /**
366    * Map of WAL log file to the latest sequence ids of all regions it has entries of.
367    * The map is sorted by the log file creation timestamp (contained in the log file name).
368    */
369   private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds =
370     new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR);
371 
372   /**
373    * Exception handler to pass the disruptor ringbuffer.  Same as native implementation only it
374    * logs using our logger instead of java native logger.
375    */
376   static class RingBufferExceptionHandler implements ExceptionHandler {
377     @Override
handleEventException(Throwable ex, long sequence, Object event)378     public void handleEventException(Throwable ex, long sequence, Object event) {
379       LOG.error("Sequence=" + sequence + ", event=" + event, ex);
380       throw new RuntimeException(ex);
381     }
382 
383     @Override
handleOnStartException(Throwable ex)384     public void handleOnStartException(Throwable ex) {
385       LOG.error(ex);
386       throw new RuntimeException(ex);
387     }
388 
389     @Override
handleOnShutdownException(Throwable ex)390     public void handleOnShutdownException(Throwable ex) {
391       LOG.error(ex);
392       throw new RuntimeException(ex);
393     }
394   }
395 
396   /**
397    * Constructor.
398    *
399    * @param fs filesystem handle
400    * @param root path for stored and archived wals
401    * @param logDir dir where wals are stored
402    * @param conf configuration to use
403    * @throws IOException
404    */
FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)405   public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
406       throws IOException {
407     this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
408   }
409 
410   /**
411    * Create an edit log at the given <code>dir</code> location.
412    *
413    * You should never have to load an existing log. If there is a log at
414    * startup, it should have already been processed and deleted by the time the
415    * WAL object is started up.
416    *
417    * @param fs filesystem handle
418    * @param rootDir path to where logs and oldlogs
419    * @param logDir dir where wals are stored
420    * @param archiveDir dir where wals are archived
421    * @param conf configuration to use
422    * @param listeners Listeners on WAL events. Listeners passed here will
423    * be registered before we do anything else; e.g. the
424    * Constructor {@link #rollWriter()}.
425    * @param failIfWALExists If true IOException will be thrown if files related to this wal
426    *        already exist.
427    * @param prefix should always be hostname and port in distributed env and
428    *        it will be URL encoded before being used.
429    *        If prefix is null, "wal" will be used
430    * @param suffix will be url encoded. null is treated as empty. non-empty must start with
431    *        {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER}
432    * @throws IOException
433    */
FSHLog(final FileSystem fs, final Path rootDir, final String logDir, final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, final String suffix)434   public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
435       final String archiveDir, final Configuration conf,
436       final List<WALActionsListener> listeners,
437       final boolean failIfWALExists, final String prefix, final String suffix)
438       throws IOException {
439     this.fs = fs;
440     this.fullPathLogDir = new Path(rootDir, logDir);
441     this.fullPathArchiveDir = new Path(rootDir, archiveDir);
442     this.conf = conf;
443 
444     if (!fs.exists(fullPathLogDir) && !fs.mkdirs(fullPathLogDir)) {
445       throw new IOException("Unable to mkdir " + fullPathLogDir);
446     }
447 
448     if (!fs.exists(this.fullPathArchiveDir)) {
449       if (!fs.mkdirs(this.fullPathArchiveDir)) {
450         throw new IOException("Unable to mkdir " + this.fullPathArchiveDir);
451       }
452     }
453 
454     // If prefix is null||empty then just name it wal
455     this.logFilePrefix =
456       prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
457     // we only correctly differentiate suffices when numeric ones start with '.'
458     if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
459       throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
460           "' but instead was '" + suffix + "'");
461     }
462     // Now that it exists, set the storage policy for the entire directory of wal files related to
463     // this FSHLog instance
464     FSUtils.setStoragePolicy(fs, conf, this.fullPathLogDir, HConstants.WAL_STORAGE_POLICY,
465       HConstants.DEFAULT_WAL_STORAGE_POLICY);
466     this.logFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
467     this.prefixPathStr = new Path(fullPathLogDir,
468         logFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
469 
470     this.ourFiles = new PathFilter() {
471       @Override
472       public boolean accept(final Path fileName) {
473         // The path should start with dir/<prefix> and end with our suffix
474         final String fileNameString = fileName.toString();
475         if (!fileNameString.startsWith(prefixPathStr)) {
476           return false;
477         }
478         if (logFileSuffix.isEmpty()) {
479           // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
480           return org.apache.commons.lang.StringUtils.isNumeric(
481               fileNameString.substring(prefixPathStr.length()));
482         } else if (!fileNameString.endsWith(logFileSuffix)) {
483           return false;
484         }
485         return true;
486       }
487     };
488 
489     if (failIfWALExists) {
490       final FileStatus[] walFiles = FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
491       if (null != walFiles && 0 != walFiles.length) {
492         throw new IOException("Target WAL already exists within directory " + fullPathLogDir);
493       }
494     }
495 
496     // Register listeners.  TODO: Should this exist anymore?  We have CPs?
497     if (listeners != null) {
498       for (WALActionsListener i: listeners) {
499         registerWALActionsListener(i);
500       }
501     }
502     this.coprocessorHost = new WALCoprocessorHost(this, conf);
503 
504     // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks
505     // (it costs a little x'ing bocks)
506     final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
507         FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir));
508     this.logrollsize =
509       (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
510 
511     float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY,
512       conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY,
513         HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE));
514     boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null;
515     if(maxLogsDefined){
516       LOG.warn("'hbase.regionserver.maxlogs' was deprecated.");
517     }
518     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
519         Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize)));
520     this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication",
521         FSUtils.getDefaultReplication(fs, this.fullPathLogDir));
522     this.lowReplicationRollLimit =
523       conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5);
524     this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 0);
525     int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
526 
527     LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) +
528       ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
529       ", prefix=" + this.logFilePrefix + ", suffix=" + logFileSuffix + ", logDir=" +
530       this.fullPathLogDir + ", archiveDir=" + this.fullPathArchiveDir);
531 
532     // rollWriter sets this.hdfs_out if it can.
533     rollWriter();
534 
535     this.slowSyncNs =
536         1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
537           DEFAULT_SLOW_SYNC_TIME_MS);
538 
539     // This is the 'writer' -- a single threaded executor.  This single thread 'consumes' what is
540     // put on the ring buffer.
541     String hostingThreadName = Thread.currentThread().getName();
542     this.appendExecutor = Executors.
543       newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append"));
544     // Preallocate objects to use on the ring buffer.  The way that appends and syncs work, we will
545     // be stuck and make no progress if the buffer is filled with appends only and there is no
546     // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
547     // before they return.
548     final int preallocatedEventCount =
549       this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
550     // Using BlockingWaitStrategy.  Stuff that is going on here takes so long it makes no sense
551     // spinning as other strategies do.
552     this.disruptor =
553       new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount,
554         this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy());
555     // Advance the ring buffer sequence so that it starts from 1 instead of 0,
556     // because SyncFuture.NOT_DONE = 0.
557     this.disruptor.getRingBuffer().next();
558     this.ringBufferEventHandler =
559       new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5),
560         maxHandlersCount);
561     this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
562     this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
563     // Presize our map of SyncFutures by handler objects.
564     this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount);
565     // Starting up threads in constructor is a no no; Interface should have an init call.
566     this.disruptor.start();
567   }
568 
calculateMaxLogFiles(float memstoreSizeRatio, long logRollSize)569   private int calculateMaxLogFiles(float memstoreSizeRatio, long logRollSize) {
570     MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
571     int maxLogs = Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize);
572     return maxLogs;
573   }
574 
575   /**
576    * Get the backing files associated with this WAL.
577    * @return may be null if there are no files.
578    */
getFiles()579   protected FileStatus[] getFiles() throws IOException {
580     return FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
581   }
582 
583   /**
584    * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate
585    * the default behavior (such as setting the maxRecoveryErrorCount value for example (see
586    * {@link TestWALReplay#testReplayEditsWrittenIntoWAL()}). This is done using reflection on the
587    * underlying HDFS OutputStream.
588    * NOTE: This could be removed once Hadoop1 support is removed.
589    * @return null if underlying stream is not ready.
590    */
591   @VisibleForTesting
getOutputStream()592   OutputStream getOutputStream() {
593     FSDataOutputStream fsdos = this.hdfs_out;
594     if (fsdos == null) return null;
595     return fsdos.getWrappedStream();
596   }
597 
598   @Override
rollWriter()599   public byte [][] rollWriter() throws FailedLogCloseException, IOException {
600     return rollWriter(false);
601   }
602 
603   /**
604    * retrieve the next path to use for writing.
605    * Increments the internal filenum.
606    */
getNewPath()607   private Path getNewPath() throws IOException {
608     this.filenum.set(System.currentTimeMillis());
609     Path newPath = getCurrentFileName();
610     while (fs.exists(newPath)) {
611       this.filenum.incrementAndGet();
612       newPath = getCurrentFileName();
613     }
614     return newPath;
615   }
616 
getOldPath()617   Path getOldPath() {
618     long currentFilenum = this.filenum.get();
619     Path oldPath = null;
620     if (currentFilenum > 0) {
621       // ComputeFilename  will take care of meta wal filename
622       oldPath = computeFilename(currentFilenum);
623     } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
624     return oldPath;
625   }
626 
627   /**
628    * Tell listeners about pre log roll.
629    * @throws IOException
630    */
tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)631   private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
632   throws IOException {
633     if (!this.listeners.isEmpty()) {
634       for (WALActionsListener i : this.listeners) {
635         i.preLogRoll(oldPath, newPath);
636       }
637     }
638   }
639 
640   /**
641    * Tell listeners about post log roll.
642    * @throws IOException
643    */
tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)644   private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
645   throws IOException {
646     if (!this.listeners.isEmpty()) {
647       for (WALActionsListener i : this.listeners) {
648         i.postLogRoll(oldPath, newPath);
649       }
650     }
651   }
652 
653   /**
654    * Run a sync after opening to set up the pipeline.
655    * @param nextWriter
656    * @param startTimeNanos
657    */
preemptiveSync(final ProtobufLogWriter nextWriter)658   private void preemptiveSync(final ProtobufLogWriter nextWriter) {
659     long startTimeNanos = System.nanoTime();
660     try {
661       nextWriter.sync();
662       postSync(System.nanoTime() - startTimeNanos, 0);
663     } catch (IOException e) {
664       // optimization failed, no need to abort here.
665       LOG.warn("pre-sync failed but an optimization so keep going", e);
666     }
667   }
668 
669   @Override
rollWriter(boolean force)670   public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
671     rollWriterLock.lock();
672     try {
673       // Return if nothing to flush.
674       if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null;
675       byte [][] regionsToFlush = null;
676       if (this.closed) {
677         LOG.debug("WAL closed. Skipping rolling of writer");
678         return regionsToFlush;
679       }
680       if (!closeBarrier.beginOp()) {
681         LOG.debug("WAL closing. Skipping rolling of writer");
682         return regionsToFlush;
683       }
684       TraceScope scope = Trace.startSpan("FSHLog.rollWriter");
685       try {
686         Path oldPath = getOldPath();
687         Path newPath = getNewPath();
688         // Any exception from here on is catastrophic, non-recoverable so we currently abort.
689         Writer nextWriter = this.createWriterInstance(newPath);
690         FSDataOutputStream nextHdfsOut = null;
691         if (nextWriter instanceof ProtobufLogWriter) {
692           nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
693           // If a ProtobufLogWriter, go ahead and try and sync to force setup of pipeline.
694           // If this fails, we just keep going.... it is an optimization, not the end of the world.
695           preemptiveSync((ProtobufLogWriter)nextWriter);
696         }
697         tellListenersAboutPreLogRoll(oldPath, newPath);
698         // NewPath could be equal to oldPath if replaceWriter fails.
699         newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut);
700         tellListenersAboutPostLogRoll(oldPath, newPath);
701         // Can we delete any of the old log files?
702         if (getNumRolledLogFiles() > 0) {
703           cleanOldLogs();
704           regionsToFlush = findRegionsToForceFlush();
705         }
706       } finally {
707         closeBarrier.endOp();
708         assert scope == NullScope.INSTANCE || !scope.isDetached();
709         scope.close();
710       }
711       return regionsToFlush;
712     } finally {
713       rollWriterLock.unlock();
714     }
715   }
716 
717   /**
718    * This method allows subclasses to inject different writers without having to
719    * extend other methods like rollWriter().
720    *
721    * @return Writer instance
722    */
createWriterInstance(final Path path)723   protected Writer createWriterInstance(final Path path) throws IOException {
724     return DefaultWALProvider.createWriter(conf, fs, path, false);
725   }
726 
727   /**
728    * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
729    * @throws IOException
730    */
cleanOldLogs()731   private void cleanOldLogs() throws IOException {
732     List<Path> logsToArchive = null;
733     // For each log file, look at its Map of regions to highest sequence id; if all sequence ids
734     // are older than what is currently in memory, the WAL can be GC'd.
735     for (Map.Entry<Path, Map<byte[], Long>> e : this.byWalRegionSequenceIds.entrySet()) {
736       Path log = e.getKey();
737       Map<byte[], Long> sequenceNums = e.getValue();
738       if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
739         if (logsToArchive == null) logsToArchive = new ArrayList<Path>();
740         logsToArchive.add(log);
741         if (LOG.isTraceEnabled()) LOG.trace("WAL file ready for archiving " + log);
742       }
743     }
744     if (logsToArchive != null) {
745       for (Path p : logsToArchive) {
746         this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
747         archiveLogFile(p);
748         this.byWalRegionSequenceIds.remove(p);
749       }
750     }
751   }
752 
753   /**
754    * If the number of un-archived WAL files is greater than maximum allowed, check the first
755    * (oldest) WAL file, and returns those regions which should be flushed so that it can
756    * be archived.
757    * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
758    * @throws IOException
759    */
findRegionsToForceFlush()760   byte[][] findRegionsToForceFlush() throws IOException {
761     byte [][] regions = null;
762     int logCount = getNumRolledLogFiles();
763     if (logCount > this.maxLogs && logCount > 0) {
764       Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
765         this.byWalRegionSequenceIds.firstEntry();
766       regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue());
767     }
768     if (regions != null) {
769       StringBuilder sb = new StringBuilder();
770       for (int i = 0; i < regions.length; i++) {
771         if (i > 0) sb.append(", ");
772         sb.append(Bytes.toStringBinary(regions[i]));
773       }
774       LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
775         "; forcing flush of " + regions.length + " regions(s): " + sb.toString());
776     }
777     return regions;
778   }
779 
780   /**
781    * Used to manufacture race condition reliably. For testing only.
782    * @see #beforeWaitOnSafePoint()
783    */
784   @VisibleForTesting
afterCreatingZigZagLatch()785   protected void afterCreatingZigZagLatch() {}
786 
787   /**
788    * @see #afterCreatingZigZagLatch()
789    */
790   @VisibleForTesting
beforeWaitOnSafePoint()791   protected void beforeWaitOnSafePoint() {};
792 
793   /**
794    * Cleans up current writer closing it and then puts in place the passed in
795    * <code>nextWriter</code>.
796    *
797    * In the case of creating a new WAL, oldPath will be null.
798    *
799    * In the case of rolling over from one file to the next, none of the params will be null.
800    *
801    * In the case of closing out this FSHLog with no further use newPath, nextWriter, and
802    * nextHdfsOut will be null.
803    *
804    * @param oldPath may be null
805    * @param newPath may be null
806    * @param nextWriter may be null
807    * @param nextHdfsOut may be null
808    * @return the passed in <code>newPath</code>
809    * @throws IOException if there is a problem flushing or closing the underlying FS
810    */
replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter, final FSDataOutputStream nextHdfsOut)811   Path replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter,
812       final FSDataOutputStream nextHdfsOut)
813   throws IOException {
814     // Ask the ring buffer writer to pause at a safe point.  Once we do this, the writer
815     // thread will eventually pause. An error hereafter needs to release the writer thread
816     // regardless -- hence the finally block below.  Note, this method is called from the FSHLog
817     // constructor BEFORE the ring buffer is set running so it is null on first time through
818     // here; allow for that.
819     SyncFuture syncFuture = null;
820     SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)?
821       null: this.ringBufferEventHandler.attainSafePoint();
822     afterCreatingZigZagLatch();
823     TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
824     try {
825       // Wait on the safe point to be achieved.  Send in a sync in case nothing has hit the
826       // ring buffer between the above notification of writer that we want it to go to
827       // 'safe point' and then here where we are waiting on it to attain safe point.  Use
828       // 'sendSync' instead of 'sync' because we do not want this thread to block waiting on it
829       // to come back.  Cleanup this syncFuture down below after we are ready to run again.
830       try {
831         if (zigzagLatch != null) {
832           Trace.addTimelineAnnotation("awaiting safepoint");
833           syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer());
834         }
835       } catch (FailedSyncBeforeLogCloseException e) {
836         // If unflushed/unsynced entries on close, it is reason to abort.
837         if (isUnflushedEntries()) throw e;
838         LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " +
839           e.getMessage());
840       }
841 
842       // It is at the safe point.  Swap out writer from under the blocked writer thread.
843       // TODO: This is close is inline with critical section.  Should happen in background?
844       try {
845         if (this.writer != null) {
846           Trace.addTimelineAnnotation("closing writer");
847           this.writer.close();
848           Trace.addTimelineAnnotation("writer closed");
849         }
850         this.closeErrorCount.set(0);
851       } catch (IOException ioe) {
852         int errors = closeErrorCount.incrementAndGet();
853         if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) {
854           LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" +
855             ioe.getMessage() + "\", errors=" + errors +
856             "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK");
857         } else {
858           throw ioe;
859         }
860       }
861       this.writer = nextWriter;
862       this.hdfs_out = nextHdfsOut;
863       int oldNumEntries = this.numEntries.get();
864       this.numEntries.set(0);
865       final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
866       if (oldPath != null) {
867         this.byWalRegionSequenceIds.put(oldPath, this.sequenceIdAccounting.resetHighest());
868         long oldFileLen = this.fs.getFileStatus(oldPath).getLen();
869         this.totalLogSize.addAndGet(oldFileLen);
870         LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
871           ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " +
872           newPathString);
873       } else {
874         LOG.info("New WAL " + newPathString);
875       }
876     } catch (InterruptedException ie) {
877       // Perpetuate the interrupt
878       Thread.currentThread().interrupt();
879     } catch (IOException e) {
880       long count = getUnflushedEntriesCount();
881       LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e);
882       throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e);
883     } finally {
884       try {
885         // Let the writer thread go regardless, whether error or not.
886         if (zigzagLatch != null) {
887           zigzagLatch.releaseSafePoint();
888           // syncFuture will be null if we failed our wait on safe point above. Otherwise, if
889           // latch was obtained successfully, the sync we threw in either trigger the latch or it
890           // got stamped with an exception because the WAL was damaged and we could not sync. Now
891           // the write pipeline has been opened up again by releasing the safe point, process the
892           // syncFuture we got above. This is probably a noop but it may be stale exception from
893           // when old WAL was in place. Catch it if so.
894           if (syncFuture != null) {
895             try {
896               blockOnSync(syncFuture);
897             } catch (IOException ioe) {
898               if (LOG.isTraceEnabled()) LOG.trace("Stale sync exception", ioe);
899             }
900           }
901         }
902       } finally {
903         scope.close();
904       }
905     }
906     return newPath;
907   }
908 
getUnflushedEntriesCount()909   long getUnflushedEntriesCount() {
910     long highestSynced = this.highestSyncedSequence.get();
911     return highestSynced > this.highestUnsyncedSequence?
912       0: this.highestUnsyncedSequence - highestSynced;
913   }
914 
isUnflushedEntries()915   boolean isUnflushedEntries() {
916     return getUnflushedEntriesCount() > 0;
917   }
918 
919   /*
920    * only public so WALSplitter can use.
921    * @return archived location of a WAL file with the given path p
922    */
getWALArchivePath(Path archiveDir, Path p)923   public static Path getWALArchivePath(Path archiveDir, Path p) {
924     return new Path(archiveDir, p.getName());
925   }
926 
archiveLogFile(final Path p)927   private void archiveLogFile(final Path p) throws IOException {
928     Path newPath = getWALArchivePath(this.fullPathArchiveDir, p);
929     // Tell our listeners that a log is going to be archived.
930     if (!this.listeners.isEmpty()) {
931       for (WALActionsListener i : this.listeners) {
932         i.preLogArchive(p, newPath);
933       }
934     }
935     LOG.info("Archiving " + p + " to " + newPath);
936     if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
937       throw new IOException("Unable to rename " + p + " to " + newPath);
938     }
939     // Tell our listeners that a log has been archived.
940     if (!this.listeners.isEmpty()) {
941       for (WALActionsListener i : this.listeners) {
942         i.postLogArchive(p, newPath);
943       }
944     }
945   }
946 
947   /**
948    * This is a convenience method that computes a new filename with a given
949    * file-number.
950    * @param filenum to use
951    * @return Path
952    */
computeFilename(final long filenum)953   protected Path computeFilename(final long filenum) {
954     if (filenum < 0) {
955       throw new RuntimeException("WAL file number can't be < 0");
956     }
957     String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix;
958     return new Path(fullPathLogDir, child);
959   }
960 
961   /**
962    * This is a convenience method that computes a new filename with a given
963    * using the current WAL file-number
964    * @return Path
965    */
getCurrentFileName()966   public Path getCurrentFileName() {
967     return computeFilename(this.filenum.get());
968   }
969 
970   @Override
toString()971   public String toString() {
972     return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")";
973   }
974 
975 /**
976  * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}.
977  * This helper method returns the creation timestamp from a given log file.
978  * It extracts the timestamp assuming the filename is created with the
979  * {@link #computeFilename(long filenum)} method.
980  * @param fileName
981  * @return timestamp, as in the log file name.
982  */
getFileNumFromFileName(Path fileName)983   protected long getFileNumFromFileName(Path fileName) {
984     if (fileName == null) throw new IllegalArgumentException("file name can't be null");
985     if (!ourFiles.accept(fileName)) {
986       throw new IllegalArgumentException("The log file " + fileName +
987           " doesn't belong to this WAL. (" + toString() + ")");
988     }
989     final String fileNameString = fileName.toString();
990     String chompedPath = fileNameString.substring(prefixPathStr.length(),
991         (fileNameString.length() - logFileSuffix.length()));
992     return Long.parseLong(chompedPath);
993   }
994 
995   @Override
close()996   public void close() throws IOException {
997     shutdown();
998     final FileStatus[] files = getFiles();
999     if (null != files && 0 != files.length) {
1000       for (FileStatus file : files) {
1001         Path p = getWALArchivePath(this.fullPathArchiveDir, file.getPath());
1002         // Tell our listeners that a log is going to be archived.
1003         if (!this.listeners.isEmpty()) {
1004           for (WALActionsListener i : this.listeners) {
1005             i.preLogArchive(file.getPath(), p);
1006           }
1007         }
1008 
1009         if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
1010           throw new IOException("Unable to rename " + file.getPath() + " to " + p);
1011         }
1012         // Tell our listeners that a log was archived.
1013         if (!this.listeners.isEmpty()) {
1014           for (WALActionsListener i : this.listeners) {
1015             i.postLogArchive(file.getPath(), p);
1016           }
1017         }
1018       }
1019       LOG.debug("Moved " + files.length + " WAL file(s) to " +
1020         FSUtils.getPath(this.fullPathArchiveDir));
1021     }
1022     LOG.info("Closed WAL: " + toString());
1023   }
1024 
1025   @Override
shutdown()1026   public void shutdown() throws IOException {
1027     if (shutdown.compareAndSet(false, true)) {
1028       try {
1029         // Prevent all further flushing and rolling.
1030         closeBarrier.stopAndDrainOps();
1031       } catch (InterruptedException e) {
1032         LOG.error("Exception while waiting for cache flushes and log rolls", e);
1033         Thread.currentThread().interrupt();
1034       }
1035 
1036       // Shutdown the disruptor.  Will stop after all entries have been processed.  Make sure we
1037       // have stopped incoming appends before calling this else it will not shutdown.  We are
1038       // conservative below waiting a long time and if not elapsed, then halting.
1039       if (this.disruptor != null) {
1040         long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000);
1041         try {
1042           this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
1043         } catch (TimeoutException e) {
1044           LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " +
1045             "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)");
1046           this.disruptor.halt();
1047           this.disruptor.shutdown();
1048         }
1049       }
1050       // With disruptor down, this is safe to let go.
1051       if (this.appendExecutor !=  null) this.appendExecutor.shutdown();
1052 
1053       // Tell our listeners that the log is closing
1054       if (!this.listeners.isEmpty()) {
1055         for (WALActionsListener i : this.listeners) {
1056           i.logCloseRequested();
1057         }
1058       }
1059       this.closed = true;
1060       if (LOG.isDebugEnabled()) {
1061         LOG.debug("Closing WAL writer in " + FSUtils.getPath(fullPathLogDir));
1062       }
1063       if (this.writer != null) {
1064         this.writer.close();
1065         this.writer = null;
1066       }
1067     }
1068   }
1069 
1070   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
1071       justification="Will never be null")
1072   @Override
append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key, final WALEdit edits, final boolean inMemstore)1073   public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
1074       final WALEdit edits, final boolean inMemstore) throws IOException {
1075     if (this.closed) throw new IOException("Cannot append; log is closed");
1076     // Make a trace scope for the append.  It is closed on other side of the ring buffer by the
1077     // single consuming thread.  Don't have to worry about it.
1078     TraceScope scope = Trace.startSpan("FSHLog.append");
1079 
1080     // This is crazy how much it takes to make an edit.  Do we need all this stuff!!!!????  We need
1081     // all this to make a key and then below to append the edit, we need to carry htd, info,
1082     // etc. all over the ring buffer.
1083     FSWALEntry entry = null;
1084     long sequence = this.disruptor.getRingBuffer().next();
1085     try {
1086       RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1087       // TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
1088       entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
1089       truck.loadPayload(entry, scope.detach());
1090     } finally {
1091       this.disruptor.getRingBuffer().publish(sequence);
1092     }
1093     return sequence;
1094   }
1095 
1096   /**
1097    * Thread to runs the hdfs sync call. This call takes a while to complete.  This is the longest
1098    * pole adding edits to the WAL and this must complete to be sure all edits persisted.  We run
1099    * multiple threads sync'ng rather than one that just syncs in series so we have better
1100    * latencies; otherwise, an edit that arrived just after a sync started, might have to wait
1101    * almost the length of two sync invocations before it is marked done.
1102    * <p>When the sync completes, it marks all the passed in futures done.  On the other end of the
1103    * sync future is a blocked thread, usually a regionserver Handler.  There may be more than one
1104    * future passed in the case where a few threads arrive at about the same time and all invoke
1105    * 'sync'.  In this case we'll batch up the invocations and run one filesystem sync only for a
1106    * batch of Handler sync invocations.  Do not confuse these Handler SyncFutures with the futures
1107    * an ExecutorService returns when you call submit. We have no use for these in this model. These
1108    * SyncFutures are 'artificial', something to hold the Handler until the filesystem sync
1109    * completes.
1110    */
1111   private class SyncRunner extends HasThread {
1112     private volatile long sequence;
1113     // Keep around last exception thrown. Clear on successful sync.
1114     private final BlockingQueue<SyncFuture> syncFutures;
1115 
1116     /**
1117      * UPDATE!
1118      * @param syncs the batch of calls to sync that arrived as this thread was starting; when done,
1119      * we will put the result of the actual hdfs sync call as the result.
1120      * @param sequence The sequence number on the ring buffer when this thread was set running.
1121      * If this actual writer sync completes then all appends up this point have been
1122      * flushed/synced/pushed to datanodes.  If we fail, then the passed in <code>syncs</code>
1123      * futures will return the exception to their clients; some of the edits may have made it out
1124      * to data nodes but we will report all that were part of this session as failed.
1125      */
SyncRunner(final String name, final int maxHandlersCount)1126     SyncRunner(final String name, final int maxHandlersCount) {
1127       super(name);
1128       // LinkedBlockingQueue because of
1129       // http://www.javacodegeeks.com/2010/09/java-best-practices-queue-battle-and.html
1130       // Could use other blockingqueues here or concurrent queues.
1131       //
1132       // We could let the capacity be 'open' but bound it so we get alerted in pathological case
1133       // where we cannot sync and we have a bunch of threads all backed up waiting on their syncs
1134       // to come in.  LinkedBlockingQueue actually shrinks when you remove elements so Q should
1135       // stay neat and tidy in usual case.  Let the max size be three times the maximum handlers.
1136       // The passed in maxHandlerCount is the user-level handlers which is what we put up most of
1137       // but HBase has other handlers running too -- opening region handlers which want to write
1138       // the meta table when succesful (i.e. sync), closing handlers -- etc.  These are usually
1139       // much fewer in number than the user-space handlers so Q-size should be user handlers plus
1140       // some space for these other handlers.  Lets multiply by 3 for good-measure.
1141       this.syncFutures = new LinkedBlockingQueue<SyncFuture>(maxHandlersCount * 3);
1142     }
1143 
offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount)1144     void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount) {
1145       // Set sequence first because the add to the queue will wake the thread if sleeping.
1146       this.sequence = sequence;
1147       for (int i = 0; i < syncFutureCount; ++i) {
1148         this.syncFutures.add(syncFutures[i]);
1149       }
1150     }
1151 
1152     /**
1153      * Release the passed <code>syncFuture</code>
1154      * @param syncFuture
1155      * @param currentSequence
1156      * @param t
1157      * @return Returns 1.
1158      */
releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence, final Throwable t)1159     private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence,
1160         final Throwable t) {
1161       if (!syncFuture.done(currentSequence, t)) throw new IllegalStateException();
1162       // This function releases one sync future only.
1163       return 1;
1164     }
1165 
1166     /**
1167      * Release all SyncFutures whose sequence is <= <code>currentSequence</code>.
1168      * @param currentSequence
1169      * @param t May be non-null if we are processing SyncFutures because an exception was thrown.
1170      * @return Count of SyncFutures we let go.
1171      */
releaseSyncFutures(final long currentSequence, final Throwable t)1172     private int releaseSyncFutures(final long currentSequence, final Throwable t) {
1173       int syncCount = 0;
1174       for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) {
1175         if (syncFuture.getRingBufferSequence() > currentSequence) break;
1176         releaseSyncFuture(syncFuture, currentSequence, t);
1177         if (!this.syncFutures.remove(syncFuture)) {
1178           throw new IllegalStateException(syncFuture.toString());
1179         }
1180         syncCount++;
1181       }
1182       return syncCount;
1183     }
1184 
1185     /**
1186      * @param sequence The sequence we ran the filesystem sync against.
1187      * @return Current highest synced sequence.
1188      */
updateHighestSyncedSequence(long sequence)1189     private long updateHighestSyncedSequence(long sequence) {
1190       long currentHighestSyncedSequence;
1191       // Set the highestSyncedSequence IFF our current sequence id is the 'highest'.
1192       do {
1193         currentHighestSyncedSequence = highestSyncedSequence.get();
1194         if (currentHighestSyncedSequence >= sequence) {
1195           // Set the sync number to current highwater mark; might be able to let go more
1196           // queued sync futures
1197           sequence = currentHighestSyncedSequence;
1198           break;
1199         }
1200       } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence));
1201       return sequence;
1202     }
1203 
run()1204     public void run() {
1205       long currentSequence;
1206       while (!isInterrupted()) {
1207         int syncCount = 0;
1208         SyncFuture takeSyncFuture;
1209         try {
1210           while (true) {
1211             // We have to process what we 'take' from the queue
1212             takeSyncFuture = this.syncFutures.take();
1213             currentSequence = this.sequence;
1214             long syncFutureSequence = takeSyncFuture.getRingBufferSequence();
1215             if (syncFutureSequence > currentSequence) {
1216               throw new IllegalStateException("currentSequence=" + syncFutureSequence +
1217                 ", syncFutureSequence=" + syncFutureSequence);
1218             }
1219             // See if we can process any syncfutures BEFORE we go sync.
1220             long currentHighestSyncedSequence = highestSyncedSequence.get();
1221             if (currentSequence < currentHighestSyncedSequence) {
1222               syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
1223               // Done with the 'take'.  Go around again and do a new 'take'.
1224               continue;
1225             }
1226             break;
1227           }
1228           // I got something.  Lets run.  Save off current sequence number in case it changes
1229           // while we run.
1230           TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
1231           long start = System.nanoTime();
1232           Throwable lastException = null;
1233           try {
1234             Trace.addTimelineAnnotation("syncing writer");
1235             writer.sync();
1236             Trace.addTimelineAnnotation("writer synced");
1237             currentSequence = updateHighestSyncedSequence(currentSequence);
1238           } catch (IOException e) {
1239             LOG.error("Error syncing, request close of WAL", e);
1240             lastException = e;
1241           } catch (Exception e) {
1242             LOG.warn("UNEXPECTED", e);
1243             lastException = e;
1244           } finally {
1245             // reattach the span to the future before releasing.
1246             takeSyncFuture.setSpan(scope.detach());
1247             // First release what we 'took' from the queue.
1248             syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException);
1249             // Can we release other syncs?
1250             syncCount += releaseSyncFutures(currentSequence, lastException);
1251             if (lastException != null) requestLogRoll();
1252             else checkLogRoll();
1253           }
1254           postSync(System.nanoTime() - start, syncCount);
1255         } catch (InterruptedException e) {
1256           // Presume legit interrupt.
1257           Thread.currentThread().interrupt();
1258         } catch (Throwable t) {
1259           LOG.warn("UNEXPECTED, continuing", t);
1260         }
1261       }
1262     }
1263   }
1264 
1265   /**
1266    * Schedule a log roll if needed.
1267    */
checkLogRoll()1268   void checkLogRoll() {
1269     // Will return immediately if we are in the middle of a WAL log roll currently.
1270     if (!rollWriterLock.tryLock()) return;
1271     boolean lowReplication;
1272     try {
1273       lowReplication = checkLowReplication();
1274     } finally {
1275       rollWriterLock.unlock();
1276     }
1277     try {
1278       if (lowReplication || writer != null && writer.getLength() > logrollsize) {
1279         requestLogRoll(lowReplication);
1280       }
1281     } catch (IOException e) {
1282       LOG.warn("Writer.getLength() failed; continuing", e);
1283     }
1284   }
1285 
1286   /*
1287    * @return true if number of replicas for the WAL is lower than threshold
1288    */
checkLowReplication()1289   private boolean checkLowReplication() {
1290     boolean logRollNeeded = false;
1291     // if the number of replicas in HDFS has fallen below the configured
1292     // value, then roll logs.
1293     try {
1294       int numCurrentReplicas = getLogReplication();
1295       if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) {
1296         if (this.lowReplicationRollEnabled) {
1297           if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
1298             LOG.warn("HDFS pipeline error detected. " + "Found "
1299                 + numCurrentReplicas + " replicas but expecting no less than "
1300                 + this.minTolerableReplication + " replicas. "
1301                 + " Requesting close of WAL. current pipeline: "
1302                 + Arrays.toString(getPipeLine()));
1303             logRollNeeded = true;
1304             // If rollWriter is requested, increase consecutiveLogRolls. Once it
1305             // is larger than lowReplicationRollLimit, disable the
1306             // LowReplication-Roller
1307             this.consecutiveLogRolls.getAndIncrement();
1308           } else {
1309             LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1310                 + "the total number of live datanodes is lower than the tolerable replicas.");
1311             this.consecutiveLogRolls.set(0);
1312             this.lowReplicationRollEnabled = false;
1313           }
1314         }
1315       } else if (numCurrentReplicas >= this.minTolerableReplication) {
1316         if (!this.lowReplicationRollEnabled) {
1317           // The new writer's log replicas is always the default value.
1318           // So we should not enable LowReplication-Roller. If numEntries
1319           // is lower than or equals 1, we consider it as a new writer.
1320           if (this.numEntries.get() <= 1) {
1321             return logRollNeeded;
1322           }
1323           // Once the live datanode number and the replicas return to normal,
1324           // enable the LowReplication-Roller.
1325           this.lowReplicationRollEnabled = true;
1326           LOG.info("LowReplication-Roller was enabled.");
1327         }
1328       }
1329     } catch (Exception e) {
1330       LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e +
1331         ", continuing...");
1332     }
1333     return logRollNeeded;
1334   }
1335 
publishSyncOnRingBuffer()1336   private SyncFuture publishSyncOnRingBuffer() {
1337     return publishSyncOnRingBuffer(null);
1338   }
1339 
publishSyncOnRingBuffer(Span span)1340   private SyncFuture publishSyncOnRingBuffer(Span span) {
1341     long sequence = this.disruptor.getRingBuffer().next();
1342     SyncFuture syncFuture = getSyncFuture(sequence, span);
1343     try {
1344       RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1345       truck.loadPayload(syncFuture);
1346     } finally {
1347       this.disruptor.getRingBuffer().publish(sequence);
1348     }
1349     return syncFuture;
1350   }
1351 
1352   // Sync all known transactions
publishSyncThenBlockOnCompletion(Span span)1353   private Span publishSyncThenBlockOnCompletion(Span span) throws IOException {
1354     return blockOnSync(publishSyncOnRingBuffer(span));
1355   }
1356 
blockOnSync(final SyncFuture syncFuture)1357   private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
1358     // Now we have published the ringbuffer, halt the current thread until we get an answer back.
1359     try {
1360       syncFuture.get();
1361       return syncFuture.getSpan();
1362     } catch (InterruptedException ie) {
1363       LOG.warn("Interrupted", ie);
1364       throw convertInterruptedExceptionToIOException(ie);
1365     } catch (ExecutionException e) {
1366       throw ensureIOException(e.getCause());
1367     }
1368   }
1369 
convertInterruptedExceptionToIOException(final InterruptedException ie)1370   private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
1371     Thread.currentThread().interrupt();
1372     IOException ioe = new InterruptedIOException();
1373     ioe.initCause(ie);
1374     return ioe;
1375   }
1376 
getSyncFuture(final long sequence, Span span)1377   private SyncFuture getSyncFuture(final long sequence, Span span) {
1378     SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
1379     if (syncFuture == null) {
1380       syncFuture = new SyncFuture();
1381       this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
1382     }
1383     return syncFuture.reset(sequence, span);
1384   }
1385 
postSync(final long timeInNanos, final int handlerSyncs)1386   private void postSync(final long timeInNanos, final int handlerSyncs) {
1387     if (timeInNanos > this.slowSyncNs) {
1388       String msg =
1389           new StringBuilder().append("Slow sync cost: ")
1390               .append(timeInNanos / 1000000).append(" ms, current pipeline: ")
1391               .append(Arrays.toString(getPipeLine())).toString();
1392       Trace.addTimelineAnnotation(msg);
1393       LOG.info(msg);
1394     }
1395     if (!listeners.isEmpty()) {
1396       for (WALActionsListener listener : listeners) {
1397         listener.postSync(timeInNanos, handlerSyncs);
1398       }
1399     }
1400   }
1401 
postAppend(final Entry e, final long elapsedTime)1402   private long postAppend(final Entry e, final long elapsedTime) {
1403     long len = 0;
1404     if (!listeners.isEmpty()) {
1405       for (Cell cell : e.getEdit().getCells()) {
1406         len += CellUtil.estimatedSerializedSizeOf(cell);
1407       }
1408       for (WALActionsListener listener : listeners) {
1409         listener.postAppend(len, elapsedTime);
1410       }
1411     }
1412     return len;
1413   }
1414 
1415 
1416   /**
1417    * This method gets the datanode replication count for the current WAL.
1418    *
1419    * If the pipeline isn't started yet or is empty, you will get the default
1420    * replication factor.  Therefore, if this function returns 0, it means you
1421    * are not properly running with the HDFS-826 patch.
1422    * @throws InvocationTargetException
1423    * @throws IllegalAccessException
1424    * @throws IllegalArgumentException
1425    *
1426    * @throws Exception
1427    */
1428   @VisibleForTesting
getLogReplication()1429   int getLogReplication() {
1430     try {
1431       //in standalone mode, it will return 0
1432       if (this.hdfs_out instanceof HdfsDataOutputStream) {
1433         return ((HdfsDataOutputStream) this.hdfs_out).getCurrentBlockReplication();
1434       }
1435     } catch (IOException e) {
1436       LOG.info("", e);
1437     }
1438     return 0;
1439   }
1440 
1441   @Override
sync()1442   public void sync() throws IOException {
1443     TraceScope scope = Trace.startSpan("FSHLog.sync");
1444     try {
1445       scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1446     } finally {
1447       assert scope == NullScope.INSTANCE || !scope.isDetached();
1448       scope.close();
1449     }
1450   }
1451 
1452   @Override
sync(long txid)1453   public void sync(long txid) throws IOException {
1454     if (this.highestSyncedSequence.get() >= txid){
1455       // Already sync'd.
1456       return;
1457     }
1458     TraceScope scope = Trace.startSpan("FSHLog.sync");
1459     try {
1460       scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1461     } finally {
1462       assert scope == NullScope.INSTANCE || !scope.isDetached();
1463       scope.close();
1464     }
1465   }
1466 
1467   // public only until class moves to o.a.h.h.wal
requestLogRoll()1468   public void requestLogRoll() {
1469     requestLogRoll(false);
1470   }
1471 
requestLogRoll(boolean tooFewReplicas)1472   private void requestLogRoll(boolean tooFewReplicas) {
1473     if (!this.listeners.isEmpty()) {
1474       for (WALActionsListener i: this.listeners) {
1475         i.logRollRequested(tooFewReplicas);
1476       }
1477     }
1478   }
1479 
1480   // public only until class moves to o.a.h.h.wal
1481   /** @return the number of rolled log files */
getNumRolledLogFiles()1482   public int getNumRolledLogFiles() {
1483     return byWalRegionSequenceIds.size();
1484   }
1485 
1486   // public only until class moves to o.a.h.h.wal
1487   /** @return the number of log files in use */
getNumLogFiles()1488   public int getNumLogFiles() {
1489     // +1 for current use log
1490     return getNumRolledLogFiles() + 1;
1491   }
1492 
1493   // public only until class moves to o.a.h.h.wal
1494   /** @return the size of log files in use */
getLogFileSize()1495   public long getLogFileSize() {
1496     return this.totalLogSize.get();
1497   }
1498 
1499   @Override
startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families)1500   public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families) {
1501     if (!closeBarrier.beginOp()) {
1502       LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing.");
1503       return null;
1504     }
1505     return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
1506   }
1507 
1508   @Override
completeCacheFlush(final byte [] encodedRegionName)1509   public void completeCacheFlush(final byte [] encodedRegionName) {
1510     this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
1511     closeBarrier.endOp();
1512   }
1513 
1514   @Override
abortCacheFlush(byte[] encodedRegionName)1515   public void abortCacheFlush(byte[] encodedRegionName) {
1516     this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
1517     closeBarrier.endOp();
1518   }
1519 
1520   @VisibleForTesting
isLowReplicationRollEnabled()1521   boolean isLowReplicationRollEnabled() {
1522       return lowReplicationRollEnabled;
1523   }
1524 
1525   public static final long FIXED_OVERHEAD = ClassSize.align(
1526     ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1527     ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1528 
split(final Configuration conf, final Path p)1529   private static void split(final Configuration conf, final Path p)
1530   throws IOException {
1531     FileSystem fs = FileSystem.get(conf);
1532     if (!fs.exists(p)) {
1533       throw new FileNotFoundException(p.toString());
1534     }
1535     if (!fs.getFileStatus(p).isDirectory()) {
1536       throw new IOException(p + " is not a directory");
1537     }
1538 
1539     final Path baseDir = FSUtils.getRootDir(conf);
1540     final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1541     WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1542   }
1543 
1544 
1545   @Override
getEarliestMemstoreSeqNum(byte[] encodedRegionName)1546   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
1547     // Used by tests. Deprecated as too subtle for general usage.
1548     return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
1549   }
1550 
1551   @Override
getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName)1552   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
1553     // This method is used by tests and for figuring if we should flush or not because our
1554     // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
1555     // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
1556     // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
1557     // currently flushing sequence ids, and if anything found there, it is returning these. This is
1558     // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
1559     // we crash during the flush. For figuring what to flush, we might get requeued if our sequence
1560     // id is old even though we are currently flushing. This may mean we do too much flushing.
1561     return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
1562   }
1563 
1564   /**
1565    * This class is used coordinating two threads holding one thread at a
1566    * 'safe point' while the orchestrating thread does some work that requires the first thread
1567    * paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another
1568    * thread.
1569    *
1570    * <p>Thread A signals Thread B to hold when it gets to a 'safe point'.  Thread A wait until
1571    * Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A.
1572    * Thread B then holds at the 'safe point'.  Thread A on notification that Thread B is paused,
1573    * goes ahead and does the work it needs to do while Thread B is holding.  When Thread A is done,
1574    * it flags B and then Thread A and Thread B continue along on their merry way.  Pause and
1575    * signalling 'zigzags' between the two participating threads.  We use two latches -- one the
1576    * inverse of the other -- pausing and signaling when states are achieved.
1577    *
1578    * <p>To start up the drama, Thread A creates an instance of this class each time it would do
1579    * this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot
1580    * only). Thread B notices the new instance (via reading a volatile reference or how ever) and it
1581    * starts to work toward the 'safe point'.  Thread A calls {@link #waitSafePoint()} when it
1582    * cannot proceed until the Thread B 'safe point' is attained. Thread A will be held inside in
1583    * {@link #waitSafePoint()} until Thread B reaches the 'safe point'.  Once there, Thread B
1584    * frees Thread A by calling {@link #safePointAttained()}.  Thread A now knows Thread B
1585    * is at the 'safe point' and that it is holding there (When Thread B calls
1586    * {@link #safePointAttained()} it blocks here until Thread A calls {@link #releaseSafePoint()}).
1587    * Thread A proceeds to do what it needs to do while Thread B is paused.  When finished,
1588    * it lets Thread B lose by calling {@link #releaseSafePoint()} and away go both Threads again.
1589    */
1590   static class SafePointZigZagLatch {
1591     /**
1592      * Count down this latch when safe point attained.
1593      */
1594     private volatile CountDownLatch safePointAttainedLatch = new CountDownLatch(1);
1595     /**
1596      * Latch to wait on.  Will be released when we can proceed.
1597      */
1598     private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
1599 
1600     /**
1601      * For Thread A to call when it is ready to wait on the 'safe point' to be attained.
1602      * Thread A will be held in here until Thread B calls {@link #safePointAttained()}
1603      * @param syncFuture We need this as barometer on outstanding syncs.  If it comes home with
1604      * an exception, then something is up w/ our syncing.
1605      * @throws InterruptedException
1606      * @throws ExecutionException
1607      * @return The passed <code>syncFuture</code>
1608      * @throws FailedSyncBeforeLogCloseException
1609      */
waitSafePoint(final SyncFuture syncFuture)1610     SyncFuture waitSafePoint(final SyncFuture syncFuture)
1611     throws InterruptedException, FailedSyncBeforeLogCloseException {
1612       while (true) {
1613         if (this.safePointAttainedLatch.await(1, TimeUnit.NANOSECONDS)) break;
1614         if (syncFuture.isThrowable()) {
1615           throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
1616         }
1617       }
1618       return syncFuture;
1619     }
1620 
1621     /**
1622      * Called by Thread B when it attains the 'safe point'.  In this method, Thread B signals
1623      * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()}
1624      * is called by Thread A.
1625      * @throws InterruptedException
1626      */
safePointAttained()1627     void safePointAttained() throws InterruptedException {
1628       this.safePointAttainedLatch.countDown();
1629       this.safePointReleasedLatch.await();
1630     }
1631 
1632     /**
1633      * Called by Thread A when it is done with the work it needs to do while Thread B is
1634      * halted.  This will release the Thread B held in a call to {@link #safePointAttained()}
1635      */
releaseSafePoint()1636     void releaseSafePoint() {
1637       this.safePointReleasedLatch.countDown();
1638     }
1639 
1640     /**
1641      * @return True is this is a 'cocked', fresh instance, and not one that has already fired.
1642      */
isCocked()1643     boolean isCocked() {
1644       return this.safePointAttainedLatch.getCount() > 0 &&
1645         this.safePointReleasedLatch.getCount() > 0;
1646     }
1647   }
1648 
1649   /**
1650    * Handler that is run by the disruptor ringbuffer consumer. Consumer is a SINGLE
1651    * 'writer/appender' thread.  Appends edits and starts up sync runs.  Tries its best to batch up
1652    * syncs.  There is no discernible benefit batching appends so we just append as they come in
1653    * because it simplifies the below implementation.  See metrics for batching effectiveness
1654    * (In measurement, at 100 concurrent handlers writing 1k, we are batching > 10 appends and 10
1655    * handler sync invocations for every actual dfsclient sync call; at 10 concurrent handlers,
1656    * YMMV).
1657    * <p>Herein, we have an array into which we store the sync futures as they come in.  When we
1658    * have a 'batch', we'll then pass what we have collected to a SyncRunner thread to do the
1659    * filesystem sync.  When it completes, it will then call
1660    * {@link SyncFuture#done(long, Throwable)} on each of SyncFutures in the batch to release
1661    * blocked Handler threads.
1662    * <p>I've tried various effects to try and make latencies low while keeping throughput high.
1663    * I've tried keeping a single Queue of SyncFutures in this class appending to its tail as the
1664    * syncs coming and having sync runner threads poll off the head to 'finish' completed
1665    * SyncFutures.  I've tried linkedlist, and various from concurrent utils whether
1666    * LinkedBlockingQueue or ArrayBlockingQueue, etc.  The more points of synchronization, the
1667    * more 'work' (according to 'perf stats') that has to be done; small increases in stall
1668    * percentages seem to have a big impact on throughput/latencies.  The below model where we have
1669    * an array into which we stash the syncs and then hand them off to the sync thread seemed like
1670    * a decent compromise.  See HBASE-8755 for more detail.
1671    */
1672   class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware {
1673     private final SyncRunner [] syncRunners;
1674     private final SyncFuture [] syncFutures;
1675     // Had 'interesting' issues when this was non-volatile.  On occasion, we'd not pass all
1676     // syncFutures to the next sync'ing thread.
1677     private volatile int syncFuturesCount = 0;
1678     private volatile SafePointZigZagLatch zigzagLatch;
1679     /**
1680      * Set if we get an exception appending or syncing so that all subsequence appends and syncs
1681      * on this WAL fail until WAL is replaced.
1682      */
1683     private Exception exception = null;
1684     /**
1685      * Object to block on while waiting on safe point.
1686      */
1687     private final Object safePointWaiter = new Object();
1688     private volatile boolean shutdown = false;
1689 
1690     /**
1691      * Which syncrunner to use next.
1692      */
1693     private int syncRunnerIndex;
1694 
RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount)1695     RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) {
1696       this.syncFutures = new SyncFuture[maxHandlersCount];
1697       this.syncRunners = new SyncRunner[syncRunnerCount];
1698       for (int i = 0; i < syncRunnerCount; i++) {
1699         this.syncRunners[i] = new SyncRunner("sync." + i, maxHandlersCount);
1700       }
1701     }
1702 
cleanupOutstandingSyncsOnException(final long sequence, final Exception e)1703     private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) {
1704       // There could be handler-count syncFutures outstanding.
1705       for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e);
1706       this.syncFuturesCount = 0;
1707     }
1708 
1709     /**
1710      * @return True if outstanding sync futures still
1711      */
isOutstandingSyncs()1712     private boolean isOutstandingSyncs() {
1713       for (int i = 0; i < this.syncFuturesCount; i++) {
1714         if (!this.syncFutures[i].isDone()) return true;
1715       }
1716       return false;
1717     }
1718 
1719     @Override
1720     // We can set endOfBatch in the below method if at end of our this.syncFutures array
onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)1721     public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)
1722     throws Exception {
1723       // Appends and syncs are coming in order off the ringbuffer.  We depend on this fact.  We'll
1724       // add appends to dfsclient as they come in.  Batching appends doesn't give any significant
1725       // benefit on measurement.  Handler sync calls we will batch up. If we get an exception
1726       // appending an edit, we fail all subsequent appends and syncs with the same exception until
1727       // the WAL is reset. It is important that we not short-circuit and exit early this method.
1728       // It is important that we always go through the attainSafePoint on the end. Another thread,
1729       // the log roller may be waiting on a signal from us here and will just hang without it.
1730 
1731       try {
1732         if (truck.hasSyncFuturePayload()) {
1733           this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload();
1734           // Force flush of syncs if we are carrying a full complement of syncFutures.
1735           if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true;
1736         } else if (truck.hasFSWALEntryPayload()) {
1737           TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
1738           try {
1739             FSWALEntry entry = truck.unloadFSWALEntryPayload();
1740             if (this.exception != null) {
1741               // We got an exception on an earlier attempt at append. Do not let this append
1742               // go through. Fail it but stamp the sequenceid into this append though failed.
1743               // We need to do this to close the latch held down deep in WALKey...that is waiting
1744               // on sequenceid assignment otherwise it will just hang out (The #append method
1745               // called below does this also internally).
1746               entry.stampRegionSequenceId();
1747               // Return to keep processing events coming off the ringbuffer
1748               return;
1749             }
1750             append(entry);
1751           } catch (Exception e) {
1752             // Failed append. Record the exception.
1753             this.exception = e;
1754             // Return to keep processing events coming off the ringbuffer
1755             return;
1756           } finally {
1757             assert scope == NullScope.INSTANCE || !scope.isDetached();
1758             scope.close(); // append scope is complete
1759           }
1760         } else {
1761           // What is this if not an append or sync. Fail all up to this!!!
1762           cleanupOutstandingSyncsOnException(sequence,
1763             new IllegalStateException("Neither append nor sync"));
1764           // Return to keep processing.
1765           return;
1766         }
1767 
1768         // TODO: Check size and if big go ahead and call a sync if we have enough data.
1769         // This is a sync. If existing exception, fall through. Else look to see if batch.
1770         if (this.exception == null) {
1771           // If not a batch, return to consume more events from the ring buffer before proceeding;
1772           // we want to get up a batch of syncs and appends before we go do a filesystem sync.
1773           if (!endOfBatch || this.syncFuturesCount <= 0) return;
1774           // syncRunnerIndex is bound to the range [0, Integer.MAX_INT - 1] as follows:
1775           //   * The maximum value possible for syncRunners.length is Integer.MAX_INT
1776           //   * syncRunnerIndex starts at 0 and is incremented only here
1777           //   * after the increment, the value is bounded by the '%' operator to [0, syncRunners.length),
1778           //     presuming the value was positive prior to the '%' operator.
1779           //   * after being bound to [0, Integer.MAX_INT - 1], the new value is stored in syncRunnerIndex
1780           //     ensuring that it can't grow without bound and overflow.
1781           //   * note that the value after the increment must be positive, because the most it could have
1782           //     been prior was Integer.MAX_INT - 1 and we only increment by 1.
1783           this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length;
1784           try {
1785             // Below expects that the offer 'transfers' responsibility for the outstanding syncs to
1786             // the syncRunner. We should never get an exception in here.
1787             this.syncRunners[this.syncRunnerIndex].offer(sequence, this.syncFutures,
1788               this.syncFuturesCount);
1789           } catch (Exception e) {
1790             // Should NEVER get here.
1791             requestLogRoll();
1792             this.exception = new DamagedWALException("Failed offering sync", e);
1793           }
1794         }
1795         // We may have picked up an exception above trying to offer sync
1796         if (this.exception != null) {
1797           cleanupOutstandingSyncsOnException(sequence,
1798             this.exception instanceof DamagedWALException?
1799               this.exception:
1800               new DamagedWALException("On sync", this.exception));
1801         }
1802         attainSafePoint(sequence);
1803         this.syncFuturesCount = 0;
1804       } catch (Throwable t) {
1805         LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
1806       }
1807     }
1808 
attainSafePoint()1809     SafePointZigZagLatch attainSafePoint() {
1810       this.zigzagLatch = new SafePointZigZagLatch();
1811       return this.zigzagLatch;
1812     }
1813 
1814     /**
1815      * Check if we should attain safe point.  If so, go there and then wait till signalled before
1816      * we proceeding.
1817      */
attainSafePoint(final long currentSequence)1818     private void attainSafePoint(final long currentSequence) {
1819       if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return;
1820       // If here, another thread is waiting on us to get to safe point.  Don't leave it hanging.
1821       beforeWaitOnSafePoint();
1822       try {
1823         // Wait on outstanding syncers; wait for them to finish syncing (unless we've been
1824         // shutdown or unless our latch has been thrown because we have been aborted or unless
1825         // this WAL is broken and we can't get a sync/append to complete).
1826         while (!this.shutdown && this.zigzagLatch.isCocked() &&
1827             highestSyncedSequence.get() < currentSequence &&
1828             // We could be in here and all syncs are failing or failed. Check for this. Otherwise
1829             // we'll just be stuck here for ever. In other words, ensure there syncs running.
1830             isOutstandingSyncs()) {
1831           synchronized (this.safePointWaiter) {
1832             this.safePointWaiter.wait(0, 1);
1833           }
1834         }
1835         // Tell waiting thread we've attained safe point. Can clear this.throwable if set here
1836         // because we know that next event through the ringbuffer will be going to a new WAL
1837         // after we do the zigzaglatch dance.
1838         this.exception = null;
1839         this.zigzagLatch.safePointAttained();
1840       } catch (InterruptedException e) {
1841         LOG.warn("Interrupted ", e);
1842         Thread.currentThread().interrupt();
1843       }
1844     }
1845 
1846     /**
1847      * Append to the WAL.  Does all CP and WAL listener calls.
1848      * @param entry
1849      * @throws Exception
1850      */
append(final FSWALEntry entry)1851     void append(final FSWALEntry entry) throws Exception {
1852       // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
1853       atHeadOfRingBufferEventHandlerAppend();
1854 
1855       long start = EnvironmentEdgeManager.currentTime();
1856       byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
1857       long regionSequenceId = WALKey.NO_SEQUENCE_ID;
1858       try {
1859         // We are about to append this edit; update the region-scoped sequence number.  Do it
1860         // here inside this single appending/writing thread.  Events are ordered on the ringbuffer
1861         // so region sequenceids will also be in order.
1862         regionSequenceId = entry.stampRegionSequenceId();
1863         // Edits are empty, there is nothing to append.  Maybe empty when we are looking for a
1864         // region sequence id only, a region edit/sequence id that is not associated with an actual
1865         // edit. It has to go through all the rigmarole to be sure we have the right ordering.
1866         if (entry.getEdit().isEmpty()) {
1867           return;
1868         }
1869 
1870         // Coprocessor hook.
1871         if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
1872             entry.getEdit())) {
1873           if (entry.getEdit().isReplay()) {
1874             // Set replication scope null so that this won't be replicated
1875             entry.getKey().setScopes(null);
1876           }
1877         }
1878         if (!listeners.isEmpty()) {
1879           for (WALActionsListener i: listeners) {
1880             // TODO: Why does listener take a table description and CPs take a regioninfo?  Fix.
1881             i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(),
1882               entry.getEdit());
1883           }
1884         }
1885 
1886         writer.append(entry);
1887         assert highestUnsyncedSequence < entry.getSequence();
1888         highestUnsyncedSequence = entry.getSequence();
1889         sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1890           entry.isInMemstore());
1891         coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
1892         // Update metrics.
1893         postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
1894       } catch (Exception e) {
1895         String msg = "Append sequenceId=" + regionSequenceId + ", requesting roll of WAL";
1896         LOG.warn(msg, e);
1897         requestLogRoll();
1898         throw new DamagedWALException(msg, e);
1899       }
1900       numEntries.incrementAndGet();
1901     }
1902 
1903     @Override
1904     public void onStart() {
1905       for (SyncRunner syncRunner: this.syncRunners) syncRunner.start();
1906     }
1907 
1908     @Override
1909     public void onShutdown() {
1910       for (SyncRunner syncRunner: this.syncRunners) syncRunner.interrupt();
1911     }
1912   }
1913 
1914   /**
1915    * Exposed for testing only.  Use to tricks like halt the ring buffer appending.
1916    */
1917   @VisibleForTesting
1918   void atHeadOfRingBufferEventHandlerAppend() {
1919     // Noop
1920   }
1921 
1922   private static IOException ensureIOException(final Throwable t) {
1923     return (t instanceof IOException)? (IOException)t: new IOException(t);
1924   }
1925 
1926   private static void usage() {
1927     System.err.println("Usage: FSHLog <ARGS>");
1928     System.err.println("Arguments:");
1929     System.err.println(" --dump  Dump textual representation of passed one or more files");
1930     System.err.println("         For example: " +
1931       "FSHLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
1932     System.err.println(" --split Split the passed directory of WAL logs");
1933     System.err.println("         For example: " +
1934       "FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR");
1935   }
1936 
1937   /**
1938    * Pass one or more log file names and it will either dump out a text version
1939    * on <code>stdout</code> or split the specified log files.
1940    *
1941    * @param args
1942    * @throws IOException
1943    */
1944   public static void main(String[] args) throws IOException {
1945     if (args.length < 2) {
1946       usage();
1947       System.exit(-1);
1948     }
1949     // either dump using the WALPrettyPrinter or split, depending on args
1950     if (args[0].compareTo("--dump") == 0) {
1951       WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1952     } else if (args[0].compareTo("--perf") == 0) {
1953       LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
1954       LOG.fatal("\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " +
1955           args[1]);
1956       System.exit(-1);
1957     } else if (args[0].compareTo("--split") == 0) {
1958       Configuration conf = HBaseConfiguration.create();
1959       for (int i = 1; i < args.length; i++) {
1960         try {
1961           Path logPath = new Path(args[i]);
1962           FSUtils.setFsDefault(conf, logPath);
1963           split(conf, logPath);
1964         } catch (IOException t) {
1965           t.printStackTrace(System.err);
1966           System.exit(-1);
1967         }
1968       }
1969     } else {
1970       usage();
1971       System.exit(-1);
1972     }
1973   }
1974 
1975   /**
1976    * This method gets the pipeline for the current WAL.
1977    */
1978   @VisibleForTesting
1979   DatanodeInfo[] getPipeLine() {
1980     if (this.hdfs_out != null) {
1981       if (this.hdfs_out.getWrappedStream() instanceof DFSOutputStream) {
1982         return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline();
1983       }
1984     }
1985     return new DatanodeInfo[0];
1986   }
1987 }
1988