1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hbase.regionserver.wal; 19 20 import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER; 21 22 import java.io.FileNotFoundException; 23 import java.io.IOException; 24 import java.io.InterruptedIOException; 25 import java.io.OutputStream; 26 import java.lang.management.ManagementFactory; 27 import java.lang.management.MemoryUsage; 28 import java.lang.reflect.InvocationTargetException; 29 import java.net.URLEncoder; 30 import java.util.ArrayList; 31 import java.util.Arrays; 32 import java.util.Comparator; 33 import java.util.List; 34 import java.util.Map; 35 import java.util.NavigableMap; 36 import java.util.Set; 37 import java.util.concurrent.BlockingQueue; 38 import java.util.concurrent.ConcurrentHashMap; 39 import java.util.concurrent.ConcurrentSkipListMap; 40 import java.util.concurrent.CopyOnWriteArrayList; 41 import java.util.concurrent.CountDownLatch; 42 import java.util.concurrent.ExecutionException; 43 import java.util.concurrent.ExecutorService; 44 import java.util.concurrent.Executors; 45 import java.util.concurrent.LinkedBlockingQueue; 46 import java.util.concurrent.TimeUnit; 47 import java.util.concurrent.atomic.AtomicBoolean; 48 import java.util.concurrent.atomic.AtomicInteger; 49 import java.util.concurrent.atomic.AtomicLong; 50 import java.util.concurrent.locks.ReentrantLock; 51 52 import org.apache.commons.logging.Log; 53 import org.apache.commons.logging.LogFactory; 54 import org.apache.hadoop.conf.Configuration; 55 import org.apache.hadoop.fs.FSDataOutputStream; 56 import org.apache.hadoop.fs.FileStatus; 57 import org.apache.hadoop.fs.FileSystem; 58 import org.apache.hadoop.fs.Path; 59 import org.apache.hadoop.fs.PathFilter; 60 import org.apache.hadoop.hbase.Cell; 61 import org.apache.hadoop.hbase.CellUtil; 62 import org.apache.hadoop.hbase.HBaseConfiguration; 63 import org.apache.hadoop.hbase.HConstants; 64 import org.apache.hadoop.hbase.HRegionInfo; 65 import org.apache.hadoop.hbase.HTableDescriptor; 66 import org.apache.hadoop.hbase.classification.InterfaceAudience; 67 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; 68 import org.apache.hadoop.hbase.util.Bytes; 69 import org.apache.hadoop.hbase.util.ClassSize; 70 import org.apache.hadoop.hbase.util.DrainBarrier; 71 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 72 import org.apache.hadoop.hbase.util.FSUtils; 73 import org.apache.hadoop.hbase.util.HasThread; 74 import org.apache.hadoop.hbase.util.Threads; 75 import org.apache.hadoop.hbase.wal.DefaultWALProvider; 76 import org.apache.hadoop.hbase.wal.WAL; 77 import org.apache.hadoop.hbase.wal.WALFactory; 78 import org.apache.hadoop.hbase.wal.WALKey; 79 import org.apache.hadoop.hbase.wal.WALPrettyPrinter; 80 import org.apache.hadoop.hbase.wal.WALProvider.Writer; 81 import org.apache.hadoop.hbase.wal.WALSplitter; 82 import org.apache.hadoop.hdfs.DFSOutputStream; 83 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; 84 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 85 import org.apache.hadoop.util.StringUtils; 86 import org.apache.htrace.NullScope; 87 import org.apache.htrace.Span; 88 import org.apache.htrace.Trace; 89 import org.apache.htrace.TraceScope; 90 91 import com.google.common.annotations.VisibleForTesting; 92 import com.lmax.disruptor.BlockingWaitStrategy; 93 import com.lmax.disruptor.EventHandler; 94 import com.lmax.disruptor.ExceptionHandler; 95 import com.lmax.disruptor.LifecycleAware; 96 import com.lmax.disruptor.TimeoutException; 97 import com.lmax.disruptor.dsl.Disruptor; 98 import com.lmax.disruptor.dsl.ProducerType; 99 100 /** 101 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. 102 * Only one WAL is ever being written at a time. When a WAL hits a configured maximum size, 103 * it is rolled. This is done internal to the implementation. 104 * 105 * <p>As data is flushed from the MemStore to other on-disk structures (files sorted by 106 * key, hfiles), a WAL becomes obsolete. We can let go of all the log edits/entries for a given 107 * HRegion-sequence id. A bunch of work in the below is done keeping account of these region 108 * sequence ids -- what is flushed out to hfiles, and what is yet in WAL and in memory only. 109 * 110 * <p>It is only practical to delete entire files. Thus, we delete an entire on-disk file 111 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older 112 * (smaller) than the most-recent flush. 113 * 114 * <p>To read an WAL, call {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, 115 * org.apache.hadoop.fs.Path)}. 116 * 117 * <h2>Failure Semantic</h2> 118 * If an exception on append or sync, roll the WAL because the current WAL is now a lame duck; 119 * any more appends or syncs will fail also with the same original exception. If we have made 120 * successful appends to the WAL and we then are unable to sync them, our current semantic is to 121 * return error to the client that the appends failed but also to abort the current context, 122 * usually the hosting server. We need to replay the WALs. TODO: Change this semantic. A roll of 123 * WAL may be sufficient as long as we have flagged client that the append failed. TODO: 124 * replication may pick up these last edits though they have been marked as failed append (Need to 125 * keep our own file lengths, not rely on HDFS). 126 */ 127 @InterfaceAudience.Private 128 public class FSHLog implements WAL { 129 // IMPLEMENTATION NOTES: 130 // 131 // At the core is a ring buffer. Our ring buffer is the LMAX Disruptor. It tries to 132 // minimize synchronizations and volatile writes when multiple contending threads as is the case 133 // here appending and syncing on a single WAL. The Disruptor is configured to handle multiple 134 // producers but it has one consumer only (the producers in HBase are IPC Handlers calling append 135 // and then sync). The single consumer/writer pulls the appends and syncs off the ring buffer. 136 // When a handler calls sync, it is given back a future. The producer 'blocks' on the future so 137 // it does not return until the sync completes. The future is passed over the ring buffer from 138 // the producer/handler to the consumer thread where it does its best to batch up the producer 139 // syncs so one WAL sync actually spans multiple producer sync invocations. How well the 140 // batching works depends on the write rate; i.e. we tend to batch more in times of 141 // high writes/syncs. 142 // 143 // Calls to append now also wait until the append has been done on the consumer side of the 144 // disruptor. We used to not wait but it makes the implemenation easier to grok if we have 145 // the region edit/sequence id after the append returns. 146 // 147 // TODO: Handlers need to coordinate appending AND syncing. Can we have the threads contend 148 // once only? Probably hard given syncs take way longer than an append. 149 // 150 // The consumer threads pass the syncs off to multiple syncing threads in a round robin fashion 151 // to ensure we keep up back-to-back FS sync calls (FS sync calls are the long poll writing the 152 // WAL). The consumer thread passes the futures to the sync threads for it to complete 153 // the futures when done. 154 // 155 // The 'sequence' in the below is the sequence of the append/sync on the ringbuffer. It 156 // acts as a sort-of transaction id. It is always incrementing. 157 // 158 // The RingBufferEventHandler class hosts the ring buffer consuming code. The threads that 159 // do the actual FS sync are implementations of SyncRunner. SafePointZigZagLatch is a 160 // synchronization class used to halt the consumer at a safe point -- just after all outstanding 161 // syncs and appends have completed -- so the log roller can swap the WAL out under it. 162 163 private static final Log LOG = LogFactory.getLog(FSHLog.class); 164 165 private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms 166 167 /** 168 * The nexus at which all incoming handlers meet. Does appends and sync with an ordering. 169 * Appends and syncs are each put on the ring which means handlers need to 170 * smash up against the ring twice (can we make it once only? ... maybe not since time to append 171 * is so different from time to sync and sometimes we don't want to sync or we want to async 172 * the sync). The ring is where we make sure of our ordering and it is also where we do 173 * batching up of handler sync calls. 174 */ 175 private final Disruptor<RingBufferTruck> disruptor; 176 177 /** 178 * An executorservice that runs the disruptor AppendEventHandler append executor. 179 */ 180 private final ExecutorService appendExecutor; 181 182 /** 183 * This fellow is run by the above appendExecutor service but it is all about batching up appends 184 * and syncs; it may shutdown without cleaning out the last few appends or syncs. To guard 185 * against this, keep a reference to this handler and do explicit close on way out to make sure 186 * all flushed out before we exit. 187 */ 188 private final RingBufferEventHandler ringBufferEventHandler; 189 190 /** 191 * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures. 192 * TODO: Reus FSWALEntry's rather than create them anew each time as we do SyncFutures here. 193 * TODO: Add a FSWalEntry and SyncFuture as thread locals on handlers rather than have them 194 * get them from this Map? 195 */ 196 private final Map<Thread, SyncFuture> syncFuturesByHandler; 197 198 /** 199 * The highest known outstanding unsync'd WALEdit sequence number where sequence number is the 200 * ring buffer sequence. Maintained by the ring buffer consumer. 201 */ 202 private volatile long highestUnsyncedSequence = -1; 203 204 /** 205 * Updated to the ring buffer sequence of the last successful sync call. This can be less than 206 * {@link #highestUnsyncedSequence} for case where we have an append where a sync has not yet 207 * come in for it. Maintained by the syncing threads. 208 */ 209 private final AtomicLong highestSyncedSequence = new AtomicLong(0); 210 211 /** 212 * file system instance 213 */ 214 protected final FileSystem fs; 215 216 /** 217 * WAL directory, where all WAL files would be placed. 218 */ 219 private final Path fullPathLogDir; 220 221 /** 222 * dir path where old logs are kept. 223 */ 224 private final Path fullPathArchiveDir; 225 226 /** 227 * Matches just those wal files that belong to this wal instance. 228 */ 229 private final PathFilter ourFiles; 230 231 /** 232 * Prefix of a WAL file, usually the region server name it is hosted on. 233 */ 234 private final String logFilePrefix; 235 236 /** 237 * Suffix included on generated wal file names 238 */ 239 private final String logFileSuffix; 240 241 /** 242 * Prefix used when checking for wal membership. 243 */ 244 private final String prefixPathStr; 245 246 private final WALCoprocessorHost coprocessorHost; 247 248 /** 249 * conf object 250 */ 251 protected final Configuration conf; 252 253 /** Listeners that are called on WAL events. */ 254 private final List<WALActionsListener> listeners = 255 new CopyOnWriteArrayList<WALActionsListener>(); 256 257 @Override registerWALActionsListener(final WALActionsListener listener)258 public void registerWALActionsListener(final WALActionsListener listener) { 259 this.listeners.add(listener); 260 } 261 262 @Override unregisterWALActionsListener(final WALActionsListener listener)263 public boolean unregisterWALActionsListener(final WALActionsListener listener) { 264 return this.listeners.remove(listener); 265 } 266 267 @Override getCoprocessorHost()268 public WALCoprocessorHost getCoprocessorHost() { 269 return coprocessorHost; 270 } 271 272 /** 273 * FSDataOutputStream associated with the current SequenceFile.writer 274 */ 275 private FSDataOutputStream hdfs_out; 276 277 // All about log rolling if not enough replicas outstanding. 278 279 // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered 280 private final int minTolerableReplication; 281 282 private final int slowSyncNs; 283 284 // If live datanode count is lower than the default replicas value, 285 // RollWriter will be triggered in each sync(So the RollWriter will be 286 // triggered one by one in a short time). Using it as a workaround to slow 287 // down the roll frequency triggered by checkLowReplication(). 288 private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0); 289 290 private final int lowReplicationRollLimit; 291 292 // If consecutiveLogRolls is larger than lowReplicationRollLimit, 293 // then disable the rolling in checkLowReplication(). 294 // Enable it if the replications recover. 295 private volatile boolean lowReplicationRollEnabled = true; 296 297 /** 298 * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding 299 * sequence id as yet not flushed as well as the most recent edit sequence id appended to the 300 * WAL. Has facility for answering questions such as "Is it safe to GC a WAL?". 301 */ 302 private SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); 303 304 /** 305 * Current log file. 306 */ 307 volatile Writer writer; 308 309 /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */ 310 private final DrainBarrier closeBarrier = new DrainBarrier(); 311 312 /** 313 * This lock makes sure only one log roll runs at a time. Should not be taken while any other 314 * lock is held. We don't just use synchronized because that results in bogus and tedious 315 * findbugs warning when it thinks synchronized controls writer thread safety. It is held when 316 * we are actually rolling the log. It is checked when we are looking to see if we should roll 317 * the log or not. 318 */ 319 private final ReentrantLock rollWriterLock = new ReentrantLock(true); 320 321 private volatile boolean closed = false; 322 private final AtomicBoolean shutdown = new AtomicBoolean(false); 323 324 // The timestamp (in ms) when the log file was created. 325 private final AtomicLong filenum = new AtomicLong(-1); 326 327 // Number of transactions in the current Wal. 328 private final AtomicInteger numEntries = new AtomicInteger(0); 329 330 // If > than this size, roll the log. 331 private final long logrollsize; 332 333 /** 334 * The total size of wal 335 */ 336 private AtomicLong totalLogSize = new AtomicLong(0); 337 338 /* 339 * If more than this many logs, force flush of oldest region to oldest edit 340 * goes to disk. If too many and we crash, then will take forever replaying. 341 * Keep the number of logs tidy. 342 */ 343 private final int maxLogs; 344 345 /** Number of log close errors tolerated before we abort */ 346 private final int closeErrorsTolerated; 347 348 private final AtomicInteger closeErrorCount = new AtomicInteger(); 349 350 351 /** 352 * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. 353 * Throws an IllegalArgumentException if used to compare paths from different wals. 354 */ 355 final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() { 356 @Override 357 public int compare(Path o1, Path o2) { 358 long t1 = getFileNumFromFileName(o1); 359 long t2 = getFileNumFromFileName(o2); 360 if (t1 == t2) return 0; 361 return (t1 > t2) ? 1 : -1; 362 } 363 }; 364 365 /** 366 * Map of WAL log file to the latest sequence ids of all regions it has entries of. 367 * The map is sorted by the log file creation timestamp (contained in the log file name). 368 */ 369 private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds = 370 new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR); 371 372 /** 373 * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it 374 * logs using our logger instead of java native logger. 375 */ 376 static class RingBufferExceptionHandler implements ExceptionHandler { 377 @Override handleEventException(Throwable ex, long sequence, Object event)378 public void handleEventException(Throwable ex, long sequence, Object event) { 379 LOG.error("Sequence=" + sequence + ", event=" + event, ex); 380 throw new RuntimeException(ex); 381 } 382 383 @Override handleOnStartException(Throwable ex)384 public void handleOnStartException(Throwable ex) { 385 LOG.error(ex); 386 throw new RuntimeException(ex); 387 } 388 389 @Override handleOnShutdownException(Throwable ex)390 public void handleOnShutdownException(Throwable ex) { 391 LOG.error(ex); 392 throw new RuntimeException(ex); 393 } 394 } 395 396 /** 397 * Constructor. 398 * 399 * @param fs filesystem handle 400 * @param root path for stored and archived wals 401 * @param logDir dir where wals are stored 402 * @param conf configuration to use 403 * @throws IOException 404 */ FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)405 public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf) 406 throws IOException { 407 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); 408 } 409 410 /** 411 * Create an edit log at the given <code>dir</code> location. 412 * 413 * You should never have to load an existing log. If there is a log at 414 * startup, it should have already been processed and deleted by the time the 415 * WAL object is started up. 416 * 417 * @param fs filesystem handle 418 * @param rootDir path to where logs and oldlogs 419 * @param logDir dir where wals are stored 420 * @param archiveDir dir where wals are archived 421 * @param conf configuration to use 422 * @param listeners Listeners on WAL events. Listeners passed here will 423 * be registered before we do anything else; e.g. the 424 * Constructor {@link #rollWriter()}. 425 * @param failIfWALExists If true IOException will be thrown if files related to this wal 426 * already exist. 427 * @param prefix should always be hostname and port in distributed env and 428 * it will be URL encoded before being used. 429 * If prefix is null, "wal" will be used 430 * @param suffix will be url encoded. null is treated as empty. non-empty must start with 431 * {@link DefaultWALProvider#WAL_FILE_NAME_DELIMITER} 432 * @throws IOException 433 */ FSHLog(final FileSystem fs, final Path rootDir, final String logDir, final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, final String suffix)434 public FSHLog(final FileSystem fs, final Path rootDir, final String logDir, 435 final String archiveDir, final Configuration conf, 436 final List<WALActionsListener> listeners, 437 final boolean failIfWALExists, final String prefix, final String suffix) 438 throws IOException { 439 this.fs = fs; 440 this.fullPathLogDir = new Path(rootDir, logDir); 441 this.fullPathArchiveDir = new Path(rootDir, archiveDir); 442 this.conf = conf; 443 444 if (!fs.exists(fullPathLogDir) && !fs.mkdirs(fullPathLogDir)) { 445 throw new IOException("Unable to mkdir " + fullPathLogDir); 446 } 447 448 if (!fs.exists(this.fullPathArchiveDir)) { 449 if (!fs.mkdirs(this.fullPathArchiveDir)) { 450 throw new IOException("Unable to mkdir " + this.fullPathArchiveDir); 451 } 452 } 453 454 // If prefix is null||empty then just name it wal 455 this.logFilePrefix = 456 prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); 457 // we only correctly differentiate suffices when numeric ones start with '.' 458 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { 459 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER + 460 "' but instead was '" + suffix + "'"); 461 } 462 // Now that it exists, set the storage policy for the entire directory of wal files related to 463 // this FSHLog instance 464 FSUtils.setStoragePolicy(fs, conf, this.fullPathLogDir, HConstants.WAL_STORAGE_POLICY, 465 HConstants.DEFAULT_WAL_STORAGE_POLICY); 466 this.logFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); 467 this.prefixPathStr = new Path(fullPathLogDir, 468 logFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); 469 470 this.ourFiles = new PathFilter() { 471 @Override 472 public boolean accept(final Path fileName) { 473 // The path should start with dir/<prefix> and end with our suffix 474 final String fileNameString = fileName.toString(); 475 if (!fileNameString.startsWith(prefixPathStr)) { 476 return false; 477 } 478 if (logFileSuffix.isEmpty()) { 479 // in the case of the null suffix, we need to ensure the filename ends with a timestamp. 480 return org.apache.commons.lang.StringUtils.isNumeric( 481 fileNameString.substring(prefixPathStr.length())); 482 } else if (!fileNameString.endsWith(logFileSuffix)) { 483 return false; 484 } 485 return true; 486 } 487 }; 488 489 if (failIfWALExists) { 490 final FileStatus[] walFiles = FSUtils.listStatus(fs, fullPathLogDir, ourFiles); 491 if (null != walFiles && 0 != walFiles.length) { 492 throw new IOException("Target WAL already exists within directory " + fullPathLogDir); 493 } 494 } 495 496 // Register listeners. TODO: Should this exist anymore? We have CPs? 497 if (listeners != null) { 498 for (WALActionsListener i: listeners) { 499 registerWALActionsListener(i); 500 } 501 } 502 this.coprocessorHost = new WALCoprocessorHost(this, conf); 503 504 // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks 505 // (it costs a little x'ing bocks) 506 final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", 507 FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir)); 508 this.logrollsize = 509 (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); 510 511 float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, 512 conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, 513 HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE)); 514 boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null; 515 if(maxLogsDefined){ 516 LOG.warn("'hbase.regionserver.maxlogs' was deprecated."); 517 } 518 this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 519 Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize))); 520 this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication", 521 FSUtils.getDefaultReplication(fs, this.fullPathLogDir)); 522 this.lowReplicationRollLimit = 523 conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5); 524 this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 0); 525 int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); 526 527 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + 528 ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + 529 ", prefix=" + this.logFilePrefix + ", suffix=" + logFileSuffix + ", logDir=" + 530 this.fullPathLogDir + ", archiveDir=" + this.fullPathArchiveDir); 531 532 // rollWriter sets this.hdfs_out if it can. 533 rollWriter(); 534 535 this.slowSyncNs = 536 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", 537 DEFAULT_SLOW_SYNC_TIME_MS); 538 539 // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is 540 // put on the ring buffer. 541 String hostingThreadName = Thread.currentThread().getName(); 542 this.appendExecutor = Executors. 543 newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append")); 544 // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will 545 // be stuck and make no progress if the buffer is filled with appends only and there is no 546 // sync. If no sync, then the handlers will be outstanding just waiting on sync completion 547 // before they return. 548 final int preallocatedEventCount = 549 this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); 550 // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense 551 // spinning as other strategies do. 552 this.disruptor = 553 new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount, 554 this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy()); 555 // Advance the ring buffer sequence so that it starts from 1 instead of 0, 556 // because SyncFuture.NOT_DONE = 0. 557 this.disruptor.getRingBuffer().next(); 558 this.ringBufferEventHandler = 559 new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5), 560 maxHandlersCount); 561 this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler()); 562 this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler}); 563 // Presize our map of SyncFutures by handler objects. 564 this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount); 565 // Starting up threads in constructor is a no no; Interface should have an init call. 566 this.disruptor.start(); 567 } 568 calculateMaxLogFiles(float memstoreSizeRatio, long logRollSize)569 private int calculateMaxLogFiles(float memstoreSizeRatio, long logRollSize) { 570 MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); 571 int maxLogs = Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize); 572 return maxLogs; 573 } 574 575 /** 576 * Get the backing files associated with this WAL. 577 * @return may be null if there are no files. 578 */ getFiles()579 protected FileStatus[] getFiles() throws IOException { 580 return FSUtils.listStatus(fs, fullPathLogDir, ourFiles); 581 } 582 583 /** 584 * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate 585 * the default behavior (such as setting the maxRecoveryErrorCount value for example (see 586 * {@link TestWALReplay#testReplayEditsWrittenIntoWAL()}). This is done using reflection on the 587 * underlying HDFS OutputStream. 588 * NOTE: This could be removed once Hadoop1 support is removed. 589 * @return null if underlying stream is not ready. 590 */ 591 @VisibleForTesting getOutputStream()592 OutputStream getOutputStream() { 593 FSDataOutputStream fsdos = this.hdfs_out; 594 if (fsdos == null) return null; 595 return fsdos.getWrappedStream(); 596 } 597 598 @Override rollWriter()599 public byte [][] rollWriter() throws FailedLogCloseException, IOException { 600 return rollWriter(false); 601 } 602 603 /** 604 * retrieve the next path to use for writing. 605 * Increments the internal filenum. 606 */ getNewPath()607 private Path getNewPath() throws IOException { 608 this.filenum.set(System.currentTimeMillis()); 609 Path newPath = getCurrentFileName(); 610 while (fs.exists(newPath)) { 611 this.filenum.incrementAndGet(); 612 newPath = getCurrentFileName(); 613 } 614 return newPath; 615 } 616 getOldPath()617 Path getOldPath() { 618 long currentFilenum = this.filenum.get(); 619 Path oldPath = null; 620 if (currentFilenum > 0) { 621 // ComputeFilename will take care of meta wal filename 622 oldPath = computeFilename(currentFilenum); 623 } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? 624 return oldPath; 625 } 626 627 /** 628 * Tell listeners about pre log roll. 629 * @throws IOException 630 */ tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)631 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) 632 throws IOException { 633 if (!this.listeners.isEmpty()) { 634 for (WALActionsListener i : this.listeners) { 635 i.preLogRoll(oldPath, newPath); 636 } 637 } 638 } 639 640 /** 641 * Tell listeners about post log roll. 642 * @throws IOException 643 */ tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)644 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) 645 throws IOException { 646 if (!this.listeners.isEmpty()) { 647 for (WALActionsListener i : this.listeners) { 648 i.postLogRoll(oldPath, newPath); 649 } 650 } 651 } 652 653 /** 654 * Run a sync after opening to set up the pipeline. 655 * @param nextWriter 656 * @param startTimeNanos 657 */ preemptiveSync(final ProtobufLogWriter nextWriter)658 private void preemptiveSync(final ProtobufLogWriter nextWriter) { 659 long startTimeNanos = System.nanoTime(); 660 try { 661 nextWriter.sync(); 662 postSync(System.nanoTime() - startTimeNanos, 0); 663 } catch (IOException e) { 664 // optimization failed, no need to abort here. 665 LOG.warn("pre-sync failed but an optimization so keep going", e); 666 } 667 } 668 669 @Override rollWriter(boolean force)670 public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException { 671 rollWriterLock.lock(); 672 try { 673 // Return if nothing to flush. 674 if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null; 675 byte [][] regionsToFlush = null; 676 if (this.closed) { 677 LOG.debug("WAL closed. Skipping rolling of writer"); 678 return regionsToFlush; 679 } 680 if (!closeBarrier.beginOp()) { 681 LOG.debug("WAL closing. Skipping rolling of writer"); 682 return regionsToFlush; 683 } 684 TraceScope scope = Trace.startSpan("FSHLog.rollWriter"); 685 try { 686 Path oldPath = getOldPath(); 687 Path newPath = getNewPath(); 688 // Any exception from here on is catastrophic, non-recoverable so we currently abort. 689 Writer nextWriter = this.createWriterInstance(newPath); 690 FSDataOutputStream nextHdfsOut = null; 691 if (nextWriter instanceof ProtobufLogWriter) { 692 nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream(); 693 // If a ProtobufLogWriter, go ahead and try and sync to force setup of pipeline. 694 // If this fails, we just keep going.... it is an optimization, not the end of the world. 695 preemptiveSync((ProtobufLogWriter)nextWriter); 696 } 697 tellListenersAboutPreLogRoll(oldPath, newPath); 698 // NewPath could be equal to oldPath if replaceWriter fails. 699 newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut); 700 tellListenersAboutPostLogRoll(oldPath, newPath); 701 // Can we delete any of the old log files? 702 if (getNumRolledLogFiles() > 0) { 703 cleanOldLogs(); 704 regionsToFlush = findRegionsToForceFlush(); 705 } 706 } finally { 707 closeBarrier.endOp(); 708 assert scope == NullScope.INSTANCE || !scope.isDetached(); 709 scope.close(); 710 } 711 return regionsToFlush; 712 } finally { 713 rollWriterLock.unlock(); 714 } 715 } 716 717 /** 718 * This method allows subclasses to inject different writers without having to 719 * extend other methods like rollWriter(). 720 * 721 * @return Writer instance 722 */ createWriterInstance(final Path path)723 protected Writer createWriterInstance(final Path path) throws IOException { 724 return DefaultWALProvider.createWriter(conf, fs, path, false); 725 } 726 727 /** 728 * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. 729 * @throws IOException 730 */ cleanOldLogs()731 private void cleanOldLogs() throws IOException { 732 List<Path> logsToArchive = null; 733 // For each log file, look at its Map of regions to highest sequence id; if all sequence ids 734 // are older than what is currently in memory, the WAL can be GC'd. 735 for (Map.Entry<Path, Map<byte[], Long>> e : this.byWalRegionSequenceIds.entrySet()) { 736 Path log = e.getKey(); 737 Map<byte[], Long> sequenceNums = e.getValue(); 738 if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { 739 if (logsToArchive == null) logsToArchive = new ArrayList<Path>(); 740 logsToArchive.add(log); 741 if (LOG.isTraceEnabled()) LOG.trace("WAL file ready for archiving " + log); 742 } 743 } 744 if (logsToArchive != null) { 745 for (Path p : logsToArchive) { 746 this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen()); 747 archiveLogFile(p); 748 this.byWalRegionSequenceIds.remove(p); 749 } 750 } 751 } 752 753 /** 754 * If the number of un-archived WAL files is greater than maximum allowed, check the first 755 * (oldest) WAL file, and returns those regions which should be flushed so that it can 756 * be archived. 757 * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file. 758 * @throws IOException 759 */ findRegionsToForceFlush()760 byte[][] findRegionsToForceFlush() throws IOException { 761 byte [][] regions = null; 762 int logCount = getNumRolledLogFiles(); 763 if (logCount > this.maxLogs && logCount > 0) { 764 Map.Entry<Path, Map<byte[], Long>> firstWALEntry = 765 this.byWalRegionSequenceIds.firstEntry(); 766 regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue()); 767 } 768 if (regions != null) { 769 StringBuilder sb = new StringBuilder(); 770 for (int i = 0; i < regions.length; i++) { 771 if (i > 0) sb.append(", "); 772 sb.append(Bytes.toStringBinary(regions[i])); 773 } 774 LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + 775 "; forcing flush of " + regions.length + " regions(s): " + sb.toString()); 776 } 777 return regions; 778 } 779 780 /** 781 * Used to manufacture race condition reliably. For testing only. 782 * @see #beforeWaitOnSafePoint() 783 */ 784 @VisibleForTesting afterCreatingZigZagLatch()785 protected void afterCreatingZigZagLatch() {} 786 787 /** 788 * @see #afterCreatingZigZagLatch() 789 */ 790 @VisibleForTesting beforeWaitOnSafePoint()791 protected void beforeWaitOnSafePoint() {}; 792 793 /** 794 * Cleans up current writer closing it and then puts in place the passed in 795 * <code>nextWriter</code>. 796 * 797 * In the case of creating a new WAL, oldPath will be null. 798 * 799 * In the case of rolling over from one file to the next, none of the params will be null. 800 * 801 * In the case of closing out this FSHLog with no further use newPath, nextWriter, and 802 * nextHdfsOut will be null. 803 * 804 * @param oldPath may be null 805 * @param newPath may be null 806 * @param nextWriter may be null 807 * @param nextHdfsOut may be null 808 * @return the passed in <code>newPath</code> 809 * @throws IOException if there is a problem flushing or closing the underlying FS 810 */ replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter, final FSDataOutputStream nextHdfsOut)811 Path replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter, 812 final FSDataOutputStream nextHdfsOut) 813 throws IOException { 814 // Ask the ring buffer writer to pause at a safe point. Once we do this, the writer 815 // thread will eventually pause. An error hereafter needs to release the writer thread 816 // regardless -- hence the finally block below. Note, this method is called from the FSHLog 817 // constructor BEFORE the ring buffer is set running so it is null on first time through 818 // here; allow for that. 819 SyncFuture syncFuture = null; 820 SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)? 821 null: this.ringBufferEventHandler.attainSafePoint(); 822 afterCreatingZigZagLatch(); 823 TraceScope scope = Trace.startSpan("FSHFile.replaceWriter"); 824 try { 825 // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the 826 // ring buffer between the above notification of writer that we want it to go to 827 // 'safe point' and then here where we are waiting on it to attain safe point. Use 828 // 'sendSync' instead of 'sync' because we do not want this thread to block waiting on it 829 // to come back. Cleanup this syncFuture down below after we are ready to run again. 830 try { 831 if (zigzagLatch != null) { 832 Trace.addTimelineAnnotation("awaiting safepoint"); 833 syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer()); 834 } 835 } catch (FailedSyncBeforeLogCloseException e) { 836 // If unflushed/unsynced entries on close, it is reason to abort. 837 if (isUnflushedEntries()) throw e; 838 LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " + 839 e.getMessage()); 840 } 841 842 // It is at the safe point. Swap out writer from under the blocked writer thread. 843 // TODO: This is close is inline with critical section. Should happen in background? 844 try { 845 if (this.writer != null) { 846 Trace.addTimelineAnnotation("closing writer"); 847 this.writer.close(); 848 Trace.addTimelineAnnotation("writer closed"); 849 } 850 this.closeErrorCount.set(0); 851 } catch (IOException ioe) { 852 int errors = closeErrorCount.incrementAndGet(); 853 if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) { 854 LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" + 855 ioe.getMessage() + "\", errors=" + errors + 856 "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK"); 857 } else { 858 throw ioe; 859 } 860 } 861 this.writer = nextWriter; 862 this.hdfs_out = nextHdfsOut; 863 int oldNumEntries = this.numEntries.get(); 864 this.numEntries.set(0); 865 final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath)); 866 if (oldPath != null) { 867 this.byWalRegionSequenceIds.put(oldPath, this.sequenceIdAccounting.resetHighest()); 868 long oldFileLen = this.fs.getFileStatus(oldPath).getLen(); 869 this.totalLogSize.addAndGet(oldFileLen); 870 LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries + 871 ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + 872 newPathString); 873 } else { 874 LOG.info("New WAL " + newPathString); 875 } 876 } catch (InterruptedException ie) { 877 // Perpetuate the interrupt 878 Thread.currentThread().interrupt(); 879 } catch (IOException e) { 880 long count = getUnflushedEntriesCount(); 881 LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e); 882 throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e); 883 } finally { 884 try { 885 // Let the writer thread go regardless, whether error or not. 886 if (zigzagLatch != null) { 887 zigzagLatch.releaseSafePoint(); 888 // syncFuture will be null if we failed our wait on safe point above. Otherwise, if 889 // latch was obtained successfully, the sync we threw in either trigger the latch or it 890 // got stamped with an exception because the WAL was damaged and we could not sync. Now 891 // the write pipeline has been opened up again by releasing the safe point, process the 892 // syncFuture we got above. This is probably a noop but it may be stale exception from 893 // when old WAL was in place. Catch it if so. 894 if (syncFuture != null) { 895 try { 896 blockOnSync(syncFuture); 897 } catch (IOException ioe) { 898 if (LOG.isTraceEnabled()) LOG.trace("Stale sync exception", ioe); 899 } 900 } 901 } 902 } finally { 903 scope.close(); 904 } 905 } 906 return newPath; 907 } 908 getUnflushedEntriesCount()909 long getUnflushedEntriesCount() { 910 long highestSynced = this.highestSyncedSequence.get(); 911 return highestSynced > this.highestUnsyncedSequence? 912 0: this.highestUnsyncedSequence - highestSynced; 913 } 914 isUnflushedEntries()915 boolean isUnflushedEntries() { 916 return getUnflushedEntriesCount() > 0; 917 } 918 919 /* 920 * only public so WALSplitter can use. 921 * @return archived location of a WAL file with the given path p 922 */ getWALArchivePath(Path archiveDir, Path p)923 public static Path getWALArchivePath(Path archiveDir, Path p) { 924 return new Path(archiveDir, p.getName()); 925 } 926 archiveLogFile(final Path p)927 private void archiveLogFile(final Path p) throws IOException { 928 Path newPath = getWALArchivePath(this.fullPathArchiveDir, p); 929 // Tell our listeners that a log is going to be archived. 930 if (!this.listeners.isEmpty()) { 931 for (WALActionsListener i : this.listeners) { 932 i.preLogArchive(p, newPath); 933 } 934 } 935 LOG.info("Archiving " + p + " to " + newPath); 936 if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { 937 throw new IOException("Unable to rename " + p + " to " + newPath); 938 } 939 // Tell our listeners that a log has been archived. 940 if (!this.listeners.isEmpty()) { 941 for (WALActionsListener i : this.listeners) { 942 i.postLogArchive(p, newPath); 943 } 944 } 945 } 946 947 /** 948 * This is a convenience method that computes a new filename with a given 949 * file-number. 950 * @param filenum to use 951 * @return Path 952 */ computeFilename(final long filenum)953 protected Path computeFilename(final long filenum) { 954 if (filenum < 0) { 955 throw new RuntimeException("WAL file number can't be < 0"); 956 } 957 String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix; 958 return new Path(fullPathLogDir, child); 959 } 960 961 /** 962 * This is a convenience method that computes a new filename with a given 963 * using the current WAL file-number 964 * @return Path 965 */ getCurrentFileName()966 public Path getCurrentFileName() { 967 return computeFilename(this.filenum.get()); 968 } 969 970 @Override toString()971 public String toString() { 972 return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")"; 973 } 974 975 /** 976 * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. 977 * This helper method returns the creation timestamp from a given log file. 978 * It extracts the timestamp assuming the filename is created with the 979 * {@link #computeFilename(long filenum)} method. 980 * @param fileName 981 * @return timestamp, as in the log file name. 982 */ getFileNumFromFileName(Path fileName)983 protected long getFileNumFromFileName(Path fileName) { 984 if (fileName == null) throw new IllegalArgumentException("file name can't be null"); 985 if (!ourFiles.accept(fileName)) { 986 throw new IllegalArgumentException("The log file " + fileName + 987 " doesn't belong to this WAL. (" + toString() + ")"); 988 } 989 final String fileNameString = fileName.toString(); 990 String chompedPath = fileNameString.substring(prefixPathStr.length(), 991 (fileNameString.length() - logFileSuffix.length())); 992 return Long.parseLong(chompedPath); 993 } 994 995 @Override close()996 public void close() throws IOException { 997 shutdown(); 998 final FileStatus[] files = getFiles(); 999 if (null != files && 0 != files.length) { 1000 for (FileStatus file : files) { 1001 Path p = getWALArchivePath(this.fullPathArchiveDir, file.getPath()); 1002 // Tell our listeners that a log is going to be archived. 1003 if (!this.listeners.isEmpty()) { 1004 for (WALActionsListener i : this.listeners) { 1005 i.preLogArchive(file.getPath(), p); 1006 } 1007 } 1008 1009 if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { 1010 throw new IOException("Unable to rename " + file.getPath() + " to " + p); 1011 } 1012 // Tell our listeners that a log was archived. 1013 if (!this.listeners.isEmpty()) { 1014 for (WALActionsListener i : this.listeners) { 1015 i.postLogArchive(file.getPath(), p); 1016 } 1017 } 1018 } 1019 LOG.debug("Moved " + files.length + " WAL file(s) to " + 1020 FSUtils.getPath(this.fullPathArchiveDir)); 1021 } 1022 LOG.info("Closed WAL: " + toString()); 1023 } 1024 1025 @Override shutdown()1026 public void shutdown() throws IOException { 1027 if (shutdown.compareAndSet(false, true)) { 1028 try { 1029 // Prevent all further flushing and rolling. 1030 closeBarrier.stopAndDrainOps(); 1031 } catch (InterruptedException e) { 1032 LOG.error("Exception while waiting for cache flushes and log rolls", e); 1033 Thread.currentThread().interrupt(); 1034 } 1035 1036 // Shutdown the disruptor. Will stop after all entries have been processed. Make sure we 1037 // have stopped incoming appends before calling this else it will not shutdown. We are 1038 // conservative below waiting a long time and if not elapsed, then halting. 1039 if (this.disruptor != null) { 1040 long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000); 1041 try { 1042 this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS); 1043 } catch (TimeoutException e) { 1044 LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " + 1045 "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)"); 1046 this.disruptor.halt(); 1047 this.disruptor.shutdown(); 1048 } 1049 } 1050 // With disruptor down, this is safe to let go. 1051 if (this.appendExecutor != null) this.appendExecutor.shutdown(); 1052 1053 // Tell our listeners that the log is closing 1054 if (!this.listeners.isEmpty()) { 1055 for (WALActionsListener i : this.listeners) { 1056 i.logCloseRequested(); 1057 } 1058 } 1059 this.closed = true; 1060 if (LOG.isDebugEnabled()) { 1061 LOG.debug("Closing WAL writer in " + FSUtils.getPath(fullPathLogDir)); 1062 } 1063 if (this.writer != null) { 1064 this.writer.close(); 1065 this.writer = null; 1066 } 1067 } 1068 } 1069 1070 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION", 1071 justification="Will never be null") 1072 @Override append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key, final WALEdit edits, final boolean inMemstore)1073 public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key, 1074 final WALEdit edits, final boolean inMemstore) throws IOException { 1075 if (this.closed) throw new IOException("Cannot append; log is closed"); 1076 // Make a trace scope for the append. It is closed on other side of the ring buffer by the 1077 // single consuming thread. Don't have to worry about it. 1078 TraceScope scope = Trace.startSpan("FSHLog.append"); 1079 1080 // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need 1081 // all this to make a key and then below to append the edit, we need to carry htd, info, 1082 // etc. all over the ring buffer. 1083 FSWALEntry entry = null; 1084 long sequence = this.disruptor.getRingBuffer().next(); 1085 try { 1086 RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); 1087 // TODO: reuse FSWALEntry as we do SyncFuture rather create per append. 1088 entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore); 1089 truck.loadPayload(entry, scope.detach()); 1090 } finally { 1091 this.disruptor.getRingBuffer().publish(sequence); 1092 } 1093 return sequence; 1094 } 1095 1096 /** 1097 * Thread to runs the hdfs sync call. This call takes a while to complete. This is the longest 1098 * pole adding edits to the WAL and this must complete to be sure all edits persisted. We run 1099 * multiple threads sync'ng rather than one that just syncs in series so we have better 1100 * latencies; otherwise, an edit that arrived just after a sync started, might have to wait 1101 * almost the length of two sync invocations before it is marked done. 1102 * <p>When the sync completes, it marks all the passed in futures done. On the other end of the 1103 * sync future is a blocked thread, usually a regionserver Handler. There may be more than one 1104 * future passed in the case where a few threads arrive at about the same time and all invoke 1105 * 'sync'. In this case we'll batch up the invocations and run one filesystem sync only for a 1106 * batch of Handler sync invocations. Do not confuse these Handler SyncFutures with the futures 1107 * an ExecutorService returns when you call submit. We have no use for these in this model. These 1108 * SyncFutures are 'artificial', something to hold the Handler until the filesystem sync 1109 * completes. 1110 */ 1111 private class SyncRunner extends HasThread { 1112 private volatile long sequence; 1113 // Keep around last exception thrown. Clear on successful sync. 1114 private final BlockingQueue<SyncFuture> syncFutures; 1115 1116 /** 1117 * UPDATE! 1118 * @param syncs the batch of calls to sync that arrived as this thread was starting; when done, 1119 * we will put the result of the actual hdfs sync call as the result. 1120 * @param sequence The sequence number on the ring buffer when this thread was set running. 1121 * If this actual writer sync completes then all appends up this point have been 1122 * flushed/synced/pushed to datanodes. If we fail, then the passed in <code>syncs</code> 1123 * futures will return the exception to their clients; some of the edits may have made it out 1124 * to data nodes but we will report all that were part of this session as failed. 1125 */ SyncRunner(final String name, final int maxHandlersCount)1126 SyncRunner(final String name, final int maxHandlersCount) { 1127 super(name); 1128 // LinkedBlockingQueue because of 1129 // http://www.javacodegeeks.com/2010/09/java-best-practices-queue-battle-and.html 1130 // Could use other blockingqueues here or concurrent queues. 1131 // 1132 // We could let the capacity be 'open' but bound it so we get alerted in pathological case 1133 // where we cannot sync and we have a bunch of threads all backed up waiting on their syncs 1134 // to come in. LinkedBlockingQueue actually shrinks when you remove elements so Q should 1135 // stay neat and tidy in usual case. Let the max size be three times the maximum handlers. 1136 // The passed in maxHandlerCount is the user-level handlers which is what we put up most of 1137 // but HBase has other handlers running too -- opening region handlers which want to write 1138 // the meta table when succesful (i.e. sync), closing handlers -- etc. These are usually 1139 // much fewer in number than the user-space handlers so Q-size should be user handlers plus 1140 // some space for these other handlers. Lets multiply by 3 for good-measure. 1141 this.syncFutures = new LinkedBlockingQueue<SyncFuture>(maxHandlersCount * 3); 1142 } 1143 offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount)1144 void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount) { 1145 // Set sequence first because the add to the queue will wake the thread if sleeping. 1146 this.sequence = sequence; 1147 for (int i = 0; i < syncFutureCount; ++i) { 1148 this.syncFutures.add(syncFutures[i]); 1149 } 1150 } 1151 1152 /** 1153 * Release the passed <code>syncFuture</code> 1154 * @param syncFuture 1155 * @param currentSequence 1156 * @param t 1157 * @return Returns 1. 1158 */ releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence, final Throwable t)1159 private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence, 1160 final Throwable t) { 1161 if (!syncFuture.done(currentSequence, t)) throw new IllegalStateException(); 1162 // This function releases one sync future only. 1163 return 1; 1164 } 1165 1166 /** 1167 * Release all SyncFutures whose sequence is <= <code>currentSequence</code>. 1168 * @param currentSequence 1169 * @param t May be non-null if we are processing SyncFutures because an exception was thrown. 1170 * @return Count of SyncFutures we let go. 1171 */ releaseSyncFutures(final long currentSequence, final Throwable t)1172 private int releaseSyncFutures(final long currentSequence, final Throwable t) { 1173 int syncCount = 0; 1174 for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) { 1175 if (syncFuture.getRingBufferSequence() > currentSequence) break; 1176 releaseSyncFuture(syncFuture, currentSequence, t); 1177 if (!this.syncFutures.remove(syncFuture)) { 1178 throw new IllegalStateException(syncFuture.toString()); 1179 } 1180 syncCount++; 1181 } 1182 return syncCount; 1183 } 1184 1185 /** 1186 * @param sequence The sequence we ran the filesystem sync against. 1187 * @return Current highest synced sequence. 1188 */ updateHighestSyncedSequence(long sequence)1189 private long updateHighestSyncedSequence(long sequence) { 1190 long currentHighestSyncedSequence; 1191 // Set the highestSyncedSequence IFF our current sequence id is the 'highest'. 1192 do { 1193 currentHighestSyncedSequence = highestSyncedSequence.get(); 1194 if (currentHighestSyncedSequence >= sequence) { 1195 // Set the sync number to current highwater mark; might be able to let go more 1196 // queued sync futures 1197 sequence = currentHighestSyncedSequence; 1198 break; 1199 } 1200 } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence)); 1201 return sequence; 1202 } 1203 run()1204 public void run() { 1205 long currentSequence; 1206 while (!isInterrupted()) { 1207 int syncCount = 0; 1208 SyncFuture takeSyncFuture; 1209 try { 1210 while (true) { 1211 // We have to process what we 'take' from the queue 1212 takeSyncFuture = this.syncFutures.take(); 1213 currentSequence = this.sequence; 1214 long syncFutureSequence = takeSyncFuture.getRingBufferSequence(); 1215 if (syncFutureSequence > currentSequence) { 1216 throw new IllegalStateException("currentSequence=" + syncFutureSequence + 1217 ", syncFutureSequence=" + syncFutureSequence); 1218 } 1219 // See if we can process any syncfutures BEFORE we go sync. 1220 long currentHighestSyncedSequence = highestSyncedSequence.get(); 1221 if (currentSequence < currentHighestSyncedSequence) { 1222 syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); 1223 // Done with the 'take'. Go around again and do a new 'take'. 1224 continue; 1225 } 1226 break; 1227 } 1228 // I got something. Lets run. Save off current sequence number in case it changes 1229 // while we run. 1230 TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan()); 1231 long start = System.nanoTime(); 1232 Throwable lastException = null; 1233 try { 1234 Trace.addTimelineAnnotation("syncing writer"); 1235 writer.sync(); 1236 Trace.addTimelineAnnotation("writer synced"); 1237 currentSequence = updateHighestSyncedSequence(currentSequence); 1238 } catch (IOException e) { 1239 LOG.error("Error syncing, request close of WAL", e); 1240 lastException = e; 1241 } catch (Exception e) { 1242 LOG.warn("UNEXPECTED", e); 1243 lastException = e; 1244 } finally { 1245 // reattach the span to the future before releasing. 1246 takeSyncFuture.setSpan(scope.detach()); 1247 // First release what we 'took' from the queue. 1248 syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException); 1249 // Can we release other syncs? 1250 syncCount += releaseSyncFutures(currentSequence, lastException); 1251 if (lastException != null) requestLogRoll(); 1252 else checkLogRoll(); 1253 } 1254 postSync(System.nanoTime() - start, syncCount); 1255 } catch (InterruptedException e) { 1256 // Presume legit interrupt. 1257 Thread.currentThread().interrupt(); 1258 } catch (Throwable t) { 1259 LOG.warn("UNEXPECTED, continuing", t); 1260 } 1261 } 1262 } 1263 } 1264 1265 /** 1266 * Schedule a log roll if needed. 1267 */ checkLogRoll()1268 void checkLogRoll() { 1269 // Will return immediately if we are in the middle of a WAL log roll currently. 1270 if (!rollWriterLock.tryLock()) return; 1271 boolean lowReplication; 1272 try { 1273 lowReplication = checkLowReplication(); 1274 } finally { 1275 rollWriterLock.unlock(); 1276 } 1277 try { 1278 if (lowReplication || writer != null && writer.getLength() > logrollsize) { 1279 requestLogRoll(lowReplication); 1280 } 1281 } catch (IOException e) { 1282 LOG.warn("Writer.getLength() failed; continuing", e); 1283 } 1284 } 1285 1286 /* 1287 * @return true if number of replicas for the WAL is lower than threshold 1288 */ checkLowReplication()1289 private boolean checkLowReplication() { 1290 boolean logRollNeeded = false; 1291 // if the number of replicas in HDFS has fallen below the configured 1292 // value, then roll logs. 1293 try { 1294 int numCurrentReplicas = getLogReplication(); 1295 if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) { 1296 if (this.lowReplicationRollEnabled) { 1297 if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) { 1298 LOG.warn("HDFS pipeline error detected. " + "Found " 1299 + numCurrentReplicas + " replicas but expecting no less than " 1300 + this.minTolerableReplication + " replicas. " 1301 + " Requesting close of WAL. current pipeline: " 1302 + Arrays.toString(getPipeLine())); 1303 logRollNeeded = true; 1304 // If rollWriter is requested, increase consecutiveLogRolls. Once it 1305 // is larger than lowReplicationRollLimit, disable the 1306 // LowReplication-Roller 1307 this.consecutiveLogRolls.getAndIncrement(); 1308 } else { 1309 LOG.warn("Too many consecutive RollWriter requests, it's a sign of " 1310 + "the total number of live datanodes is lower than the tolerable replicas."); 1311 this.consecutiveLogRolls.set(0); 1312 this.lowReplicationRollEnabled = false; 1313 } 1314 } 1315 } else if (numCurrentReplicas >= this.minTolerableReplication) { 1316 if (!this.lowReplicationRollEnabled) { 1317 // The new writer's log replicas is always the default value. 1318 // So we should not enable LowReplication-Roller. If numEntries 1319 // is lower than or equals 1, we consider it as a new writer. 1320 if (this.numEntries.get() <= 1) { 1321 return logRollNeeded; 1322 } 1323 // Once the live datanode number and the replicas return to normal, 1324 // enable the LowReplication-Roller. 1325 this.lowReplicationRollEnabled = true; 1326 LOG.info("LowReplication-Roller was enabled."); 1327 } 1328 } 1329 } catch (Exception e) { 1330 LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e + 1331 ", continuing..."); 1332 } 1333 return logRollNeeded; 1334 } 1335 publishSyncOnRingBuffer()1336 private SyncFuture publishSyncOnRingBuffer() { 1337 return publishSyncOnRingBuffer(null); 1338 } 1339 publishSyncOnRingBuffer(Span span)1340 private SyncFuture publishSyncOnRingBuffer(Span span) { 1341 long sequence = this.disruptor.getRingBuffer().next(); 1342 SyncFuture syncFuture = getSyncFuture(sequence, span); 1343 try { 1344 RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); 1345 truck.loadPayload(syncFuture); 1346 } finally { 1347 this.disruptor.getRingBuffer().publish(sequence); 1348 } 1349 return syncFuture; 1350 } 1351 1352 // Sync all known transactions publishSyncThenBlockOnCompletion(Span span)1353 private Span publishSyncThenBlockOnCompletion(Span span) throws IOException { 1354 return blockOnSync(publishSyncOnRingBuffer(span)); 1355 } 1356 blockOnSync(final SyncFuture syncFuture)1357 private Span blockOnSync(final SyncFuture syncFuture) throws IOException { 1358 // Now we have published the ringbuffer, halt the current thread until we get an answer back. 1359 try { 1360 syncFuture.get(); 1361 return syncFuture.getSpan(); 1362 } catch (InterruptedException ie) { 1363 LOG.warn("Interrupted", ie); 1364 throw convertInterruptedExceptionToIOException(ie); 1365 } catch (ExecutionException e) { 1366 throw ensureIOException(e.getCause()); 1367 } 1368 } 1369 convertInterruptedExceptionToIOException(final InterruptedException ie)1370 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { 1371 Thread.currentThread().interrupt(); 1372 IOException ioe = new InterruptedIOException(); 1373 ioe.initCause(ie); 1374 return ioe; 1375 } 1376 getSyncFuture(final long sequence, Span span)1377 private SyncFuture getSyncFuture(final long sequence, Span span) { 1378 SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread()); 1379 if (syncFuture == null) { 1380 syncFuture = new SyncFuture(); 1381 this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture); 1382 } 1383 return syncFuture.reset(sequence, span); 1384 } 1385 postSync(final long timeInNanos, final int handlerSyncs)1386 private void postSync(final long timeInNanos, final int handlerSyncs) { 1387 if (timeInNanos > this.slowSyncNs) { 1388 String msg = 1389 new StringBuilder().append("Slow sync cost: ") 1390 .append(timeInNanos / 1000000).append(" ms, current pipeline: ") 1391 .append(Arrays.toString(getPipeLine())).toString(); 1392 Trace.addTimelineAnnotation(msg); 1393 LOG.info(msg); 1394 } 1395 if (!listeners.isEmpty()) { 1396 for (WALActionsListener listener : listeners) { 1397 listener.postSync(timeInNanos, handlerSyncs); 1398 } 1399 } 1400 } 1401 postAppend(final Entry e, final long elapsedTime)1402 private long postAppend(final Entry e, final long elapsedTime) { 1403 long len = 0; 1404 if (!listeners.isEmpty()) { 1405 for (Cell cell : e.getEdit().getCells()) { 1406 len += CellUtil.estimatedSerializedSizeOf(cell); 1407 } 1408 for (WALActionsListener listener : listeners) { 1409 listener.postAppend(len, elapsedTime); 1410 } 1411 } 1412 return len; 1413 } 1414 1415 1416 /** 1417 * This method gets the datanode replication count for the current WAL. 1418 * 1419 * If the pipeline isn't started yet or is empty, you will get the default 1420 * replication factor. Therefore, if this function returns 0, it means you 1421 * are not properly running with the HDFS-826 patch. 1422 * @throws InvocationTargetException 1423 * @throws IllegalAccessException 1424 * @throws IllegalArgumentException 1425 * 1426 * @throws Exception 1427 */ 1428 @VisibleForTesting getLogReplication()1429 int getLogReplication() { 1430 try { 1431 //in standalone mode, it will return 0 1432 if (this.hdfs_out instanceof HdfsDataOutputStream) { 1433 return ((HdfsDataOutputStream) this.hdfs_out).getCurrentBlockReplication(); 1434 } 1435 } catch (IOException e) { 1436 LOG.info("", e); 1437 } 1438 return 0; 1439 } 1440 1441 @Override sync()1442 public void sync() throws IOException { 1443 TraceScope scope = Trace.startSpan("FSHLog.sync"); 1444 try { 1445 scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); 1446 } finally { 1447 assert scope == NullScope.INSTANCE || !scope.isDetached(); 1448 scope.close(); 1449 } 1450 } 1451 1452 @Override sync(long txid)1453 public void sync(long txid) throws IOException { 1454 if (this.highestSyncedSequence.get() >= txid){ 1455 // Already sync'd. 1456 return; 1457 } 1458 TraceScope scope = Trace.startSpan("FSHLog.sync"); 1459 try { 1460 scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach())); 1461 } finally { 1462 assert scope == NullScope.INSTANCE || !scope.isDetached(); 1463 scope.close(); 1464 } 1465 } 1466 1467 // public only until class moves to o.a.h.h.wal requestLogRoll()1468 public void requestLogRoll() { 1469 requestLogRoll(false); 1470 } 1471 requestLogRoll(boolean tooFewReplicas)1472 private void requestLogRoll(boolean tooFewReplicas) { 1473 if (!this.listeners.isEmpty()) { 1474 for (WALActionsListener i: this.listeners) { 1475 i.logRollRequested(tooFewReplicas); 1476 } 1477 } 1478 } 1479 1480 // public only until class moves to o.a.h.h.wal 1481 /** @return the number of rolled log files */ getNumRolledLogFiles()1482 public int getNumRolledLogFiles() { 1483 return byWalRegionSequenceIds.size(); 1484 } 1485 1486 // public only until class moves to o.a.h.h.wal 1487 /** @return the number of log files in use */ getNumLogFiles()1488 public int getNumLogFiles() { 1489 // +1 for current use log 1490 return getNumRolledLogFiles() + 1; 1491 } 1492 1493 // public only until class moves to o.a.h.h.wal 1494 /** @return the size of log files in use */ getLogFileSize()1495 public long getLogFileSize() { 1496 return this.totalLogSize.get(); 1497 } 1498 1499 @Override startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families)1500 public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families) { 1501 if (!closeBarrier.beginOp()) { 1502 LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing."); 1503 return null; 1504 } 1505 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); 1506 } 1507 1508 @Override completeCacheFlush(final byte [] encodedRegionName)1509 public void completeCacheFlush(final byte [] encodedRegionName) { 1510 this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); 1511 closeBarrier.endOp(); 1512 } 1513 1514 @Override abortCacheFlush(byte[] encodedRegionName)1515 public void abortCacheFlush(byte[] encodedRegionName) { 1516 this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); 1517 closeBarrier.endOp(); 1518 } 1519 1520 @VisibleForTesting isLowReplicationRollEnabled()1521 boolean isLowReplicationRollEnabled() { 1522 return lowReplicationRollEnabled; 1523 } 1524 1525 public static final long FIXED_OVERHEAD = ClassSize.align( 1526 ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + 1527 ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG)); 1528 split(final Configuration conf, final Path p)1529 private static void split(final Configuration conf, final Path p) 1530 throws IOException { 1531 FileSystem fs = FileSystem.get(conf); 1532 if (!fs.exists(p)) { 1533 throw new FileNotFoundException(p.toString()); 1534 } 1535 if (!fs.getFileStatus(p).isDirectory()) { 1536 throw new IOException(p + " is not a directory"); 1537 } 1538 1539 final Path baseDir = FSUtils.getRootDir(conf); 1540 final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); 1541 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); 1542 } 1543 1544 1545 @Override getEarliestMemstoreSeqNum(byte[] encodedRegionName)1546 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { 1547 // Used by tests. Deprecated as too subtle for general usage. 1548 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); 1549 } 1550 1551 @Override getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName)1552 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) { 1553 // This method is used by tests and for figuring if we should flush or not because our 1554 // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use 1555 // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId 1556 // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the 1557 // currently flushing sequence ids, and if anything found there, it is returning these. This is 1558 // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if 1559 // we crash during the flush. For figuring what to flush, we might get requeued if our sequence 1560 // id is old even though we are currently flushing. This may mean we do too much flushing. 1561 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); 1562 } 1563 1564 /** 1565 * This class is used coordinating two threads holding one thread at a 1566 * 'safe point' while the orchestrating thread does some work that requires the first thread 1567 * paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another 1568 * thread. 1569 * 1570 * <p>Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until 1571 * Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A. 1572 * Thread B then holds at the 'safe point'. Thread A on notification that Thread B is paused, 1573 * goes ahead and does the work it needs to do while Thread B is holding. When Thread A is done, 1574 * it flags B and then Thread A and Thread B continue along on their merry way. Pause and 1575 * signalling 'zigzags' between the two participating threads. We use two latches -- one the 1576 * inverse of the other -- pausing and signaling when states are achieved. 1577 * 1578 * <p>To start up the drama, Thread A creates an instance of this class each time it would do 1579 * this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot 1580 * only). Thread B notices the new instance (via reading a volatile reference or how ever) and it 1581 * starts to work toward the 'safe point'. Thread A calls {@link #waitSafePoint()} when it 1582 * cannot proceed until the Thread B 'safe point' is attained. Thread A will be held inside in 1583 * {@link #waitSafePoint()} until Thread B reaches the 'safe point'. Once there, Thread B 1584 * frees Thread A by calling {@link #safePointAttained()}. Thread A now knows Thread B 1585 * is at the 'safe point' and that it is holding there (When Thread B calls 1586 * {@link #safePointAttained()} it blocks here until Thread A calls {@link #releaseSafePoint()}). 1587 * Thread A proceeds to do what it needs to do while Thread B is paused. When finished, 1588 * it lets Thread B lose by calling {@link #releaseSafePoint()} and away go both Threads again. 1589 */ 1590 static class SafePointZigZagLatch { 1591 /** 1592 * Count down this latch when safe point attained. 1593 */ 1594 private volatile CountDownLatch safePointAttainedLatch = new CountDownLatch(1); 1595 /** 1596 * Latch to wait on. Will be released when we can proceed. 1597 */ 1598 private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); 1599 1600 /** 1601 * For Thread A to call when it is ready to wait on the 'safe point' to be attained. 1602 * Thread A will be held in here until Thread B calls {@link #safePointAttained()} 1603 * @param syncFuture We need this as barometer on outstanding syncs. If it comes home with 1604 * an exception, then something is up w/ our syncing. 1605 * @throws InterruptedException 1606 * @throws ExecutionException 1607 * @return The passed <code>syncFuture</code> 1608 * @throws FailedSyncBeforeLogCloseException 1609 */ waitSafePoint(final SyncFuture syncFuture)1610 SyncFuture waitSafePoint(final SyncFuture syncFuture) 1611 throws InterruptedException, FailedSyncBeforeLogCloseException { 1612 while (true) { 1613 if (this.safePointAttainedLatch.await(1, TimeUnit.NANOSECONDS)) break; 1614 if (syncFuture.isThrowable()) { 1615 throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable()); 1616 } 1617 } 1618 return syncFuture; 1619 } 1620 1621 /** 1622 * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals 1623 * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} 1624 * is called by Thread A. 1625 * @throws InterruptedException 1626 */ safePointAttained()1627 void safePointAttained() throws InterruptedException { 1628 this.safePointAttainedLatch.countDown(); 1629 this.safePointReleasedLatch.await(); 1630 } 1631 1632 /** 1633 * Called by Thread A when it is done with the work it needs to do while Thread B is 1634 * halted. This will release the Thread B held in a call to {@link #safePointAttained()} 1635 */ releaseSafePoint()1636 void releaseSafePoint() { 1637 this.safePointReleasedLatch.countDown(); 1638 } 1639 1640 /** 1641 * @return True is this is a 'cocked', fresh instance, and not one that has already fired. 1642 */ isCocked()1643 boolean isCocked() { 1644 return this.safePointAttainedLatch.getCount() > 0 && 1645 this.safePointReleasedLatch.getCount() > 0; 1646 } 1647 } 1648 1649 /** 1650 * Handler that is run by the disruptor ringbuffer consumer. Consumer is a SINGLE 1651 * 'writer/appender' thread. Appends edits and starts up sync runs. Tries its best to batch up 1652 * syncs. There is no discernible benefit batching appends so we just append as they come in 1653 * because it simplifies the below implementation. See metrics for batching effectiveness 1654 * (In measurement, at 100 concurrent handlers writing 1k, we are batching > 10 appends and 10 1655 * handler sync invocations for every actual dfsclient sync call; at 10 concurrent handlers, 1656 * YMMV). 1657 * <p>Herein, we have an array into which we store the sync futures as they come in. When we 1658 * have a 'batch', we'll then pass what we have collected to a SyncRunner thread to do the 1659 * filesystem sync. When it completes, it will then call 1660 * {@link SyncFuture#done(long, Throwable)} on each of SyncFutures in the batch to release 1661 * blocked Handler threads. 1662 * <p>I've tried various effects to try and make latencies low while keeping throughput high. 1663 * I've tried keeping a single Queue of SyncFutures in this class appending to its tail as the 1664 * syncs coming and having sync runner threads poll off the head to 'finish' completed 1665 * SyncFutures. I've tried linkedlist, and various from concurrent utils whether 1666 * LinkedBlockingQueue or ArrayBlockingQueue, etc. The more points of synchronization, the 1667 * more 'work' (according to 'perf stats') that has to be done; small increases in stall 1668 * percentages seem to have a big impact on throughput/latencies. The below model where we have 1669 * an array into which we stash the syncs and then hand them off to the sync thread seemed like 1670 * a decent compromise. See HBASE-8755 for more detail. 1671 */ 1672 class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware { 1673 private final SyncRunner [] syncRunners; 1674 private final SyncFuture [] syncFutures; 1675 // Had 'interesting' issues when this was non-volatile. On occasion, we'd not pass all 1676 // syncFutures to the next sync'ing thread. 1677 private volatile int syncFuturesCount = 0; 1678 private volatile SafePointZigZagLatch zigzagLatch; 1679 /** 1680 * Set if we get an exception appending or syncing so that all subsequence appends and syncs 1681 * on this WAL fail until WAL is replaced. 1682 */ 1683 private Exception exception = null; 1684 /** 1685 * Object to block on while waiting on safe point. 1686 */ 1687 private final Object safePointWaiter = new Object(); 1688 private volatile boolean shutdown = false; 1689 1690 /** 1691 * Which syncrunner to use next. 1692 */ 1693 private int syncRunnerIndex; 1694 RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount)1695 RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) { 1696 this.syncFutures = new SyncFuture[maxHandlersCount]; 1697 this.syncRunners = new SyncRunner[syncRunnerCount]; 1698 for (int i = 0; i < syncRunnerCount; i++) { 1699 this.syncRunners[i] = new SyncRunner("sync." + i, maxHandlersCount); 1700 } 1701 } 1702 cleanupOutstandingSyncsOnException(final long sequence, final Exception e)1703 private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) { 1704 // There could be handler-count syncFutures outstanding. 1705 for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e); 1706 this.syncFuturesCount = 0; 1707 } 1708 1709 /** 1710 * @return True if outstanding sync futures still 1711 */ isOutstandingSyncs()1712 private boolean isOutstandingSyncs() { 1713 for (int i = 0; i < this.syncFuturesCount; i++) { 1714 if (!this.syncFutures[i].isDone()) return true; 1715 } 1716 return false; 1717 } 1718 1719 @Override 1720 // We can set endOfBatch in the below method if at end of our this.syncFutures array onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)1721 public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch) 1722 throws Exception { 1723 // Appends and syncs are coming in order off the ringbuffer. We depend on this fact. We'll 1724 // add appends to dfsclient as they come in. Batching appends doesn't give any significant 1725 // benefit on measurement. Handler sync calls we will batch up. If we get an exception 1726 // appending an edit, we fail all subsequent appends and syncs with the same exception until 1727 // the WAL is reset. It is important that we not short-circuit and exit early this method. 1728 // It is important that we always go through the attainSafePoint on the end. Another thread, 1729 // the log roller may be waiting on a signal from us here and will just hang without it. 1730 1731 try { 1732 if (truck.hasSyncFuturePayload()) { 1733 this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload(); 1734 // Force flush of syncs if we are carrying a full complement of syncFutures. 1735 if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true; 1736 } else if (truck.hasFSWALEntryPayload()) { 1737 TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload()); 1738 try { 1739 FSWALEntry entry = truck.unloadFSWALEntryPayload(); 1740 if (this.exception != null) { 1741 // We got an exception on an earlier attempt at append. Do not let this append 1742 // go through. Fail it but stamp the sequenceid into this append though failed. 1743 // We need to do this to close the latch held down deep in WALKey...that is waiting 1744 // on sequenceid assignment otherwise it will just hang out (The #append method 1745 // called below does this also internally). 1746 entry.stampRegionSequenceId(); 1747 // Return to keep processing events coming off the ringbuffer 1748 return; 1749 } 1750 append(entry); 1751 } catch (Exception e) { 1752 // Failed append. Record the exception. 1753 this.exception = e; 1754 // Return to keep processing events coming off the ringbuffer 1755 return; 1756 } finally { 1757 assert scope == NullScope.INSTANCE || !scope.isDetached(); 1758 scope.close(); // append scope is complete 1759 } 1760 } else { 1761 // What is this if not an append or sync. Fail all up to this!!! 1762 cleanupOutstandingSyncsOnException(sequence, 1763 new IllegalStateException("Neither append nor sync")); 1764 // Return to keep processing. 1765 return; 1766 } 1767 1768 // TODO: Check size and if big go ahead and call a sync if we have enough data. 1769 // This is a sync. If existing exception, fall through. Else look to see if batch. 1770 if (this.exception == null) { 1771 // If not a batch, return to consume more events from the ring buffer before proceeding; 1772 // we want to get up a batch of syncs and appends before we go do a filesystem sync. 1773 if (!endOfBatch || this.syncFuturesCount <= 0) return; 1774 // syncRunnerIndex is bound to the range [0, Integer.MAX_INT - 1] as follows: 1775 // * The maximum value possible for syncRunners.length is Integer.MAX_INT 1776 // * syncRunnerIndex starts at 0 and is incremented only here 1777 // * after the increment, the value is bounded by the '%' operator to [0, syncRunners.length), 1778 // presuming the value was positive prior to the '%' operator. 1779 // * after being bound to [0, Integer.MAX_INT - 1], the new value is stored in syncRunnerIndex 1780 // ensuring that it can't grow without bound and overflow. 1781 // * note that the value after the increment must be positive, because the most it could have 1782 // been prior was Integer.MAX_INT - 1 and we only increment by 1. 1783 this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length; 1784 try { 1785 // Below expects that the offer 'transfers' responsibility for the outstanding syncs to 1786 // the syncRunner. We should never get an exception in here. 1787 this.syncRunners[this.syncRunnerIndex].offer(sequence, this.syncFutures, 1788 this.syncFuturesCount); 1789 } catch (Exception e) { 1790 // Should NEVER get here. 1791 requestLogRoll(); 1792 this.exception = new DamagedWALException("Failed offering sync", e); 1793 } 1794 } 1795 // We may have picked up an exception above trying to offer sync 1796 if (this.exception != null) { 1797 cleanupOutstandingSyncsOnException(sequence, 1798 this.exception instanceof DamagedWALException? 1799 this.exception: 1800 new DamagedWALException("On sync", this.exception)); 1801 } 1802 attainSafePoint(sequence); 1803 this.syncFuturesCount = 0; 1804 } catch (Throwable t) { 1805 LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t); 1806 } 1807 } 1808 attainSafePoint()1809 SafePointZigZagLatch attainSafePoint() { 1810 this.zigzagLatch = new SafePointZigZagLatch(); 1811 return this.zigzagLatch; 1812 } 1813 1814 /** 1815 * Check if we should attain safe point. If so, go there and then wait till signalled before 1816 * we proceeding. 1817 */ attainSafePoint(final long currentSequence)1818 private void attainSafePoint(final long currentSequence) { 1819 if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return; 1820 // If here, another thread is waiting on us to get to safe point. Don't leave it hanging. 1821 beforeWaitOnSafePoint(); 1822 try { 1823 // Wait on outstanding syncers; wait for them to finish syncing (unless we've been 1824 // shutdown or unless our latch has been thrown because we have been aborted or unless 1825 // this WAL is broken and we can't get a sync/append to complete). 1826 while (!this.shutdown && this.zigzagLatch.isCocked() && 1827 highestSyncedSequence.get() < currentSequence && 1828 // We could be in here and all syncs are failing or failed. Check for this. Otherwise 1829 // we'll just be stuck here for ever. In other words, ensure there syncs running. 1830 isOutstandingSyncs()) { 1831 synchronized (this.safePointWaiter) { 1832 this.safePointWaiter.wait(0, 1); 1833 } 1834 } 1835 // Tell waiting thread we've attained safe point. Can clear this.throwable if set here 1836 // because we know that next event through the ringbuffer will be going to a new WAL 1837 // after we do the zigzaglatch dance. 1838 this.exception = null; 1839 this.zigzagLatch.safePointAttained(); 1840 } catch (InterruptedException e) { 1841 LOG.warn("Interrupted ", e); 1842 Thread.currentThread().interrupt(); 1843 } 1844 } 1845 1846 /** 1847 * Append to the WAL. Does all CP and WAL listener calls. 1848 * @param entry 1849 * @throws Exception 1850 */ append(final FSWALEntry entry)1851 void append(final FSWALEntry entry) throws Exception { 1852 // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. 1853 atHeadOfRingBufferEventHandlerAppend(); 1854 1855 long start = EnvironmentEdgeManager.currentTime(); 1856 byte [] encodedRegionName = entry.getKey().getEncodedRegionName(); 1857 long regionSequenceId = WALKey.NO_SEQUENCE_ID; 1858 try { 1859 // We are about to append this edit; update the region-scoped sequence number. Do it 1860 // here inside this single appending/writing thread. Events are ordered on the ringbuffer 1861 // so region sequenceids will also be in order. 1862 regionSequenceId = entry.stampRegionSequenceId(); 1863 // Edits are empty, there is nothing to append. Maybe empty when we are looking for a 1864 // region sequence id only, a region edit/sequence id that is not associated with an actual 1865 // edit. It has to go through all the rigmarole to be sure we have the right ordering. 1866 if (entry.getEdit().isEmpty()) { 1867 return; 1868 } 1869 1870 // Coprocessor hook. 1871 if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(), 1872 entry.getEdit())) { 1873 if (entry.getEdit().isReplay()) { 1874 // Set replication scope null so that this won't be replicated 1875 entry.getKey().setScopes(null); 1876 } 1877 } 1878 if (!listeners.isEmpty()) { 1879 for (WALActionsListener i: listeners) { 1880 // TODO: Why does listener take a table description and CPs take a regioninfo? Fix. 1881 i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(), 1882 entry.getEdit()); 1883 } 1884 } 1885 1886 writer.append(entry); 1887 assert highestUnsyncedSequence < entry.getSequence(); 1888 highestUnsyncedSequence = entry.getSequence(); 1889 sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, 1890 entry.isInMemstore()); 1891 coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit()); 1892 // Update metrics. 1893 postAppend(entry, EnvironmentEdgeManager.currentTime() - start); 1894 } catch (Exception e) { 1895 String msg = "Append sequenceId=" + regionSequenceId + ", requesting roll of WAL"; 1896 LOG.warn(msg, e); 1897 requestLogRoll(); 1898 throw new DamagedWALException(msg, e); 1899 } 1900 numEntries.incrementAndGet(); 1901 } 1902 1903 @Override 1904 public void onStart() { 1905 for (SyncRunner syncRunner: this.syncRunners) syncRunner.start(); 1906 } 1907 1908 @Override 1909 public void onShutdown() { 1910 for (SyncRunner syncRunner: this.syncRunners) syncRunner.interrupt(); 1911 } 1912 } 1913 1914 /** 1915 * Exposed for testing only. Use to tricks like halt the ring buffer appending. 1916 */ 1917 @VisibleForTesting 1918 void atHeadOfRingBufferEventHandlerAppend() { 1919 // Noop 1920 } 1921 1922 private static IOException ensureIOException(final Throwable t) { 1923 return (t instanceof IOException)? (IOException)t: new IOException(t); 1924 } 1925 1926 private static void usage() { 1927 System.err.println("Usage: FSHLog <ARGS>"); 1928 System.err.println("Arguments:"); 1929 System.err.println(" --dump Dump textual representation of passed one or more files"); 1930 System.err.println(" For example: " + 1931 "FSHLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE"); 1932 System.err.println(" --split Split the passed directory of WAL logs"); 1933 System.err.println(" For example: " + 1934 "FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR"); 1935 } 1936 1937 /** 1938 * Pass one or more log file names and it will either dump out a text version 1939 * on <code>stdout</code> or split the specified log files. 1940 * 1941 * @param args 1942 * @throws IOException 1943 */ 1944 public static void main(String[] args) throws IOException { 1945 if (args.length < 2) { 1946 usage(); 1947 System.exit(-1); 1948 } 1949 // either dump using the WALPrettyPrinter or split, depending on args 1950 if (args[0].compareTo("--dump") == 0) { 1951 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); 1952 } else if (args[0].compareTo("--perf") == 0) { 1953 LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:"); 1954 LOG.fatal("\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + 1955 args[1]); 1956 System.exit(-1); 1957 } else if (args[0].compareTo("--split") == 0) { 1958 Configuration conf = HBaseConfiguration.create(); 1959 for (int i = 1; i < args.length; i++) { 1960 try { 1961 Path logPath = new Path(args[i]); 1962 FSUtils.setFsDefault(conf, logPath); 1963 split(conf, logPath); 1964 } catch (IOException t) { 1965 t.printStackTrace(System.err); 1966 System.exit(-1); 1967 } 1968 } 1969 } else { 1970 usage(); 1971 System.exit(-1); 1972 } 1973 } 1974 1975 /** 1976 * This method gets the pipeline for the current WAL. 1977 */ 1978 @VisibleForTesting 1979 DatanodeInfo[] getPipeLine() { 1980 if (this.hdfs_out != null) { 1981 if (this.hdfs_out.getWrappedStream() instanceof DFSOutputStream) { 1982 return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline(); 1983 } 1984 } 1985 return new DatanodeInfo[0]; 1986 } 1987 } 1988