1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 20 21 package org.apache.hadoop.hbase.wal; 22 23 import java.io.IOException; 24 import java.util.Arrays; 25 import java.io.InterruptedIOException; 26 import java.util.Collections; 27 import java.util.List; 28 import java.util.concurrent.atomic.AtomicReference; 29 30 import com.google.common.annotations.VisibleForTesting; 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 import org.apache.hadoop.hbase.classification.InterfaceAudience; 34 import org.apache.hadoop.conf.Configuration; 35 import org.apache.hadoop.fs.FSDataInputStream; 36 import org.apache.hadoop.fs.FileSystem; 37 import org.apache.hadoop.fs.Path; 38 import org.apache.hadoop.hbase.HConstants; 39 import org.apache.hadoop.hbase.wal.WAL.Reader; 40 import org.apache.hadoop.hbase.wal.WALProvider.Writer; 41 import org.apache.hadoop.hbase.util.CancelableProgressable; 42 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 43 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 44 45 // imports for things that haven't moved from regionserver.wal yet. 46 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; 47 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; 48 import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader; 49 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 50 51 /** 52 * Entry point for users of the Write Ahead Log. 53 * Acts as the shim between internal use and the particular WALProvider we use to handle wal 54 * requests. 55 * 56 * Configure which provider gets used with the configuration setting "hbase.wal.provider". Available 57 * implementations: 58 * <ul> 59 * <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently 60 * "filesystem"</li> 61 * <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop 62 * FileSystem interface, normally HDFS.</li> 63 * <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region 64 * server.</li> 65 * </ul> 66 * 67 * Alternatively, you may provide a custome implementation of {@link WALProvider} by class name. 68 */ 69 @InterfaceAudience.Private 70 public class WALFactory { 71 72 private static final Log LOG = LogFactory.getLog(WALFactory.class); 73 74 /** 75 * Maps between configuration names for providers and implementation classes. 76 */ 77 static enum Providers { 78 defaultProvider(DefaultWALProvider.class), 79 filesystem(DefaultWALProvider.class), 80 multiwal(BoundedRegionGroupingProvider.class); 81 82 Class<? extends WALProvider> clazz; Providers(Class<? extends WALProvider> clazz)83 Providers(Class<? extends WALProvider> clazz) { 84 this.clazz = clazz; 85 } 86 } 87 88 public static final String WAL_PROVIDER = "hbase.wal.provider"; 89 static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); 90 91 static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; 92 static final String DEFAULT_META_WAL_PROVIDER = Providers.defaultProvider.name(); 93 94 final String factoryId; 95 final WALProvider provider; 96 // The meta updates are written to a different wal. If this 97 // regionserver holds meta regions, then this ref will be non-null. 98 // lazily intialized; most RegionServers don't deal with META 99 final AtomicReference<WALProvider> metaProvider = new AtomicReference<WALProvider>(); 100 101 /** 102 * Configuration-specified WAL Reader used when a custom reader is requested 103 */ 104 private final Class<? extends DefaultWALProvider.Reader> logReaderClass; 105 106 /** 107 * How long to attempt opening in-recovery wals 108 */ 109 private final int timeoutMillis; 110 111 private final Configuration conf; 112 113 // Used for the singleton WALFactory, see below. WALFactory(Configuration conf)114 private WALFactory(Configuration conf) { 115 // this code is duplicated here so we can keep our members final. 116 // until we've moved reader/writer construction down into providers, this initialization must 117 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 118 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 119 /* TODO Both of these are probably specific to the fs wal provider */ 120 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 121 DefaultWALProvider.Reader.class); 122 this.conf = conf; 123 // end required early initialization 124 125 // this instance can't create wals, just reader/writers. 126 provider = null; 127 factoryId = SINGLETON_ID; 128 } 129 130 /** 131 * instantiate a provider from a config property. 132 * requires conf to have already been set (as well as anything the provider might need to read). 133 */ getProvider(final String key, final String defaultValue, final List<WALActionsListener> listeners, final String providerId)134 WALProvider getProvider(final String key, final String defaultValue, 135 final List<WALActionsListener> listeners, final String providerId) throws IOException { 136 Class<? extends WALProvider> clazz; 137 try { 138 clazz = Providers.valueOf(conf.get(key, defaultValue)).clazz; 139 } catch (IllegalArgumentException exception) { 140 // Fall back to them specifying a class name 141 // Note that the passed default class shouldn't actually be used, since the above only fails 142 // when there is a config value present. 143 clazz = conf.getClass(key, DefaultWALProvider.class, WALProvider.class); 144 } 145 LOG.info("Instantiating WALProvider of type " + clazz); 146 try { 147 final WALProvider result = clazz.newInstance(); 148 result.init(this, conf, listeners, providerId); 149 return result; 150 } catch (InstantiationException exception) { 151 LOG.error("couldn't set up WALProvider, check config key " + key); 152 LOG.debug("Exception details for failure to load WALProvider.", exception); 153 throw new IOException("couldn't set up WALProvider", exception); 154 } catch (IllegalAccessException exception) { 155 LOG.error("couldn't set up WALProvider, check config key " + key); 156 LOG.debug("Exception details for failure to load WALProvider.", exception); 157 throw new IOException("couldn't set up WALProvider", exception); 158 } 159 } 160 161 /** 162 * @param conf must not be null, will keep a reference to read params in later reader/writer 163 * instances. 164 * @param listeners may be null. will be given to all created wals (and not meta-wals) 165 * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations 166 * to make a directory 167 */ WALFactory(final Configuration conf, final List<WALActionsListener> listeners, final String factoryId)168 public WALFactory(final Configuration conf, final List<WALActionsListener> listeners, 169 final String factoryId) throws IOException { 170 // until we've moved reader/writer construction down into providers, this initialization must 171 // happen prior to provider initialization, in case they need to instantiate a reader/writer. 172 timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); 173 /* TODO Both of these are probably specific to the fs wal provider */ 174 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, 175 DefaultWALProvider.Reader.class); 176 this.conf = conf; 177 this.factoryId = factoryId; 178 // end required early initialization 179 if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { 180 provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, listeners, null); 181 } else { 182 // special handling of existing configuration behavior. 183 LOG.warn("Running with WAL disabled."); 184 provider = new DisabledWALProvider(); 185 provider.init(this, conf, null, factoryId); 186 } 187 } 188 189 /** 190 * Shutdown all WALs and clean up any underlying storage. 191 * Use only when you will not need to replay and edits that have gone to any wals from this 192 * factory. 193 */ close()194 public void close() throws IOException { 195 final WALProvider metaProvider = this.metaProvider.get(); 196 if (null != metaProvider) { 197 metaProvider.close(); 198 } 199 // close is called on a WALFactory with null provider in the case of contention handling 200 // within the getInstance method. 201 if (null != provider) { 202 provider.close(); 203 } 204 } 205 206 /** 207 * Tell the underlying WAL providers to shut down, but do not clean up underlying storage. 208 * If you are not ending cleanly and will need to replay edits from this factory's wals, 209 * use this method if you can as it will try to leave things as tidy as possible. 210 */ shutdown()211 public void shutdown() throws IOException { 212 IOException exception = null; 213 final WALProvider metaProvider = this.metaProvider.get(); 214 if (null != metaProvider) { 215 try { 216 metaProvider.shutdown(); 217 } catch(IOException ioe) { 218 exception = ioe; 219 } 220 } 221 provider.shutdown(); 222 if (null != exception) { 223 throw exception; 224 } 225 } 226 227 /** 228 * @param identifier may not be null, contents will not be altered 229 */ getWAL(final byte[] identifier)230 public WAL getWAL(final byte[] identifier) throws IOException { 231 return provider.getWAL(identifier); 232 } 233 234 /** 235 * @param identifier may not be null, contents will not be altered 236 */ getMetaWAL(final byte[] identifier)237 public WAL getMetaWAL(final byte[] identifier) throws IOException { 238 WALProvider metaProvider = this.metaProvider.get(); 239 if (null == metaProvider) { 240 final WALProvider temp = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER, 241 Collections.<WALActionsListener>singletonList(new MetricsWAL()), 242 DefaultWALProvider.META_WAL_PROVIDER_ID); 243 if (this.metaProvider.compareAndSet(null, temp)) { 244 metaProvider = temp; 245 } else { 246 // reference must now be to a provider created in another thread. 247 temp.close(); 248 metaProvider = this.metaProvider.get(); 249 } 250 } 251 return metaProvider.getWAL(identifier); 252 } 253 createReader(final FileSystem fs, final Path path)254 public Reader createReader(final FileSystem fs, final Path path) throws IOException { 255 return createReader(fs, path, (CancelableProgressable)null); 256 } 257 258 /** 259 * Create a reader for the WAL. If you are reading from a file that's being written to and need 260 * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method 261 * then just seek back to the last known good position. 262 * @return A WAL reader. Close when done with it. 263 * @throws IOException 264 */ createReader(final FileSystem fs, final Path path, CancelableProgressable reporter)265 public Reader createReader(final FileSystem fs, final Path path, 266 CancelableProgressable reporter) throws IOException { 267 return createReader(fs, path, reporter, true); 268 } 269 createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, boolean allowCustom)270 public Reader createReader(final FileSystem fs, final Path path, 271 CancelableProgressable reporter, boolean allowCustom) 272 throws IOException { 273 Class<? extends DefaultWALProvider.Reader> lrClass = 274 allowCustom ? logReaderClass : ProtobufLogReader.class; 275 276 try { 277 // A wal file could be under recovery, so it may take several 278 // tries to get it open. Instead of claiming it is corrupted, retry 279 // to open it up to 5 minutes by default. 280 long startWaiting = EnvironmentEdgeManager.currentTime(); 281 long openTimeout = timeoutMillis + startWaiting; 282 int nbAttempt = 0; 283 FSDataInputStream stream = null; 284 while (true) { 285 try { 286 if (lrClass != ProtobufLogReader.class) { 287 // User is overriding the WAL reader, let them. 288 DefaultWALProvider.Reader reader = lrClass.newInstance(); 289 reader.init(fs, path, conf, null); 290 return reader; 291 } else { 292 stream = fs.open(path); 293 // Note that zero-length file will fail to read PB magic, and attempt to create 294 // a non-PB reader and fail the same way existing code expects it to. If we get 295 // rid of the old reader entirely, we need to handle 0-size files differently from 296 // merely non-PB files. 297 byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length]; 298 boolean isPbWal = (stream.read(magic) == magic.length) 299 && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC); 300 DefaultWALProvider.Reader reader = 301 isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader(); 302 reader.init(fs, path, conf, stream); 303 return reader; 304 } 305 } catch (IOException e) { 306 try { 307 if (stream != null) { 308 stream.close(); 309 } 310 } catch (IOException exception) { 311 LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); 312 LOG.debug("exception details", exception); 313 } 314 String msg = e.getMessage(); 315 if (msg != null && (msg.contains("Cannot obtain block length") 316 || msg.contains("Could not obtain the last block") 317 || msg.matches("Blocklist for [^ ]* has changed.*"))) { 318 if (++nbAttempt == 1) { 319 LOG.warn("Lease should have recovered. This is not expected. Will retry", e); 320 } 321 if (reporter != null && !reporter.progress()) { 322 throw new InterruptedIOException("Operation is cancelled"); 323 } 324 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { 325 LOG.error("Can't open after " + nbAttempt + " attempts and " 326 + (EnvironmentEdgeManager.currentTime() - startWaiting) 327 + "ms " + " for " + path); 328 } else { 329 try { 330 Thread.sleep(nbAttempt < 3 ? 500 : 1000); 331 continue; // retry 332 } catch (InterruptedException ie) { 333 InterruptedIOException iioe = new InterruptedIOException(); 334 iioe.initCause(ie); 335 throw iioe; 336 } 337 } 338 throw new LeaseNotRecoveredException(e); 339 } else { 340 throw e; 341 } 342 } 343 } 344 } catch (IOException ie) { 345 throw ie; 346 } catch (Exception e) { 347 throw new IOException("Cannot get log reader", e); 348 } 349 } 350 351 /** 352 * Create a writer for the WAL. 353 * should be package-private. public only for tests and 354 * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 355 * @return A WAL writer. Close when done with it. 356 * @throws IOException 357 */ createWALWriter(final FileSystem fs, final Path path)358 public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { 359 return DefaultWALProvider.createWriter(conf, fs, path, false); 360 } 361 362 /** 363 * should be package-private, visible for recovery testing. 364 * @return an overwritable writer for recovered edits. caller should close. 365 */ 366 @VisibleForTesting createRecoveredEditsWriter(final FileSystem fs, final Path path)367 public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) 368 throws IOException { 369 return DefaultWALProvider.createWriter(conf, fs, path, true); 370 } 371 372 // These static methods are currently used where it's impractical to 373 // untangle the reliance on state in the filesystem. They rely on singleton 374 // WALFactory that just provides Reader / Writers. 375 // For now, first Configuration object wins. Practically this just impacts the reader/writer class 376 private static final AtomicReference<WALFactory> singleton = new AtomicReference<WALFactory>(); 377 private static final String SINGLETON_ID = WALFactory.class.getName(); 378 379 // public only for FSHLog and UpgradeTo96 getInstance(Configuration configuration)380 public static WALFactory getInstance(Configuration configuration) { 381 WALFactory factory = singleton.get(); 382 if (null == factory) { 383 WALFactory temp = new WALFactory(configuration); 384 if (singleton.compareAndSet(null, temp)) { 385 factory = temp; 386 } else { 387 // someone else beat us to initializing 388 try { 389 temp.close(); 390 } catch (IOException exception) { 391 LOG.debug("failed to close temporary singleton. ignoring.", exception); 392 } 393 factory = singleton.get(); 394 } 395 } 396 return factory; 397 } 398 399 /** 400 * Create a reader for the given path, accept custom reader classes from conf. 401 * If you already have a WALFactory, you should favor the instance method. 402 * @return a WAL Reader, caller must close. 403 */ createReader(final FileSystem fs, final Path path, final Configuration configuration)404 public static Reader createReader(final FileSystem fs, final Path path, 405 final Configuration configuration) throws IOException { 406 return getInstance(configuration).createReader(fs, path); 407 } 408 409 /** 410 * Create a reader for the given path, accept custom reader classes from conf. 411 * If you already have a WALFactory, you should favor the instance method. 412 * @return a WAL Reader, caller must close. 413 */ createReader(final FileSystem fs, final Path path, final Configuration configuration, final CancelableProgressable reporter)414 static Reader createReader(final FileSystem fs, final Path path, 415 final Configuration configuration, final CancelableProgressable reporter) throws IOException { 416 return getInstance(configuration).createReader(fs, path, reporter); 417 } 418 419 /** 420 * Create a reader for the given path, ignore custom reader classes from conf. 421 * If you already have a WALFactory, you should favor the instance method. 422 * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} 423 * @return a WAL Reader, caller must close. 424 */ createReaderIgnoreCustomClass(final FileSystem fs, final Path path, final Configuration configuration)425 public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path, 426 final Configuration configuration) throws IOException { 427 return getInstance(configuration).createReader(fs, path, null, false); 428 } 429 430 /** 431 * If you already have a WALFactory, you should favor the instance method. 432 * @return a Writer that will overwrite files. Caller must close. 433 */ createRecoveredEditsWriter(final FileSystem fs, final Path path, final Configuration configuration)434 static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, 435 final Configuration configuration) 436 throws IOException { 437 return DefaultWALProvider.createWriter(configuration, fs, path, true); 438 } 439 440 /** 441 * If you already have a WALFactory, you should favor the instance method. 442 * @return a writer that won't overwrite files. Caller must close. 443 */ 444 @VisibleForTesting createWALWriter(final FileSystem fs, final Path path, final Configuration configuration)445 public static Writer createWALWriter(final FileSystem fs, final Path path, 446 final Configuration configuration) 447 throws IOException { 448 return DefaultWALProvider.createWriter(configuration, fs, path, false); 449 } 450 } 451