1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 
20 
21 package org.apache.hadoop.hbase.wal;
22 
23 import java.io.IOException;
24 import java.util.Arrays;
25 import java.io.InterruptedIOException;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.concurrent.atomic.AtomicReference;
29 
30 import com.google.common.annotations.VisibleForTesting;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FSDataInputStream;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.wal.WAL.Reader;
40 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
41 import org.apache.hadoop.hbase.util.CancelableProgressable;
42 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
43 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
44 
45 // imports for things that haven't moved from regionserver.wal yet.
46 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
47 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
48 import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader;
49 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
50 
51 /**
52  * Entry point for users of the Write Ahead Log.
53  * Acts as the shim between internal use and the particular WALProvider we use to handle wal
54  * requests.
55  *
56  * Configure which provider gets used with the configuration setting "hbase.wal.provider". Available
57  * implementations:
58  * <ul>
59  *   <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently
60  *                                  "filesystem"</li>
61  *   <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop
62  *                             FileSystem interface, normally HDFS.</li>
63  *   <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region
64  *                           server.</li>
65  * </ul>
66  *
67  * Alternatively, you may provide a custome implementation of {@link WALProvider} by class name.
68  */
69 @InterfaceAudience.Private
70 public class WALFactory {
71 
72   private static final Log LOG = LogFactory.getLog(WALFactory.class);
73 
74   /**
75    * Maps between configuration names for providers and implementation classes.
76    */
77   static enum Providers {
78     defaultProvider(DefaultWALProvider.class),
79     filesystem(DefaultWALProvider.class),
80     multiwal(BoundedRegionGroupingProvider.class);
81 
82     Class<? extends WALProvider> clazz;
Providers(Class<? extends WALProvider> clazz)83     Providers(Class<? extends WALProvider> clazz) {
84       this.clazz = clazz;
85     }
86   }
87 
88   public static final String WAL_PROVIDER = "hbase.wal.provider";
89   static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
90 
91   static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
92   static final String DEFAULT_META_WAL_PROVIDER = Providers.defaultProvider.name();
93 
94   final String factoryId;
95   final WALProvider provider;
96   // The meta updates are written to a different wal. If this
97   // regionserver holds meta regions, then this ref will be non-null.
98   // lazily intialized; most RegionServers don't deal with META
99   final AtomicReference<WALProvider> metaProvider = new AtomicReference<WALProvider>();
100 
101   /**
102    * Configuration-specified WAL Reader used when a custom reader is requested
103    */
104   private final Class<? extends DefaultWALProvider.Reader> logReaderClass;
105 
106   /**
107    * How long to attempt opening in-recovery wals
108    */
109   private final int timeoutMillis;
110 
111   private final Configuration conf;
112 
113   // Used for the singleton WALFactory, see below.
WALFactory(Configuration conf)114   private WALFactory(Configuration conf) {
115     // this code is duplicated here so we can keep our members final.
116     // until we've moved reader/writer construction down into providers, this initialization must
117     // happen prior to provider initialization, in case they need to instantiate a reader/writer.
118     timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
119     /* TODO Both of these are probably specific to the fs wal provider */
120     logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
121         DefaultWALProvider.Reader.class);
122     this.conf = conf;
123     // end required early initialization
124 
125     // this instance can't create wals, just reader/writers.
126     provider = null;
127     factoryId = SINGLETON_ID;
128   }
129 
130   /**
131    * instantiate a provider from a config property.
132    * requires conf to have already been set (as well as anything the provider might need to read).
133    */
getProvider(final String key, final String defaultValue, final List<WALActionsListener> listeners, final String providerId)134   WALProvider getProvider(final String key, final String defaultValue,
135       final List<WALActionsListener> listeners, final String providerId) throws IOException {
136     Class<? extends WALProvider> clazz;
137     try {
138       clazz = Providers.valueOf(conf.get(key, defaultValue)).clazz;
139     } catch (IllegalArgumentException exception) {
140       // Fall back to them specifying a class name
141       // Note that the passed default class shouldn't actually be used, since the above only fails
142       // when there is a config value present.
143       clazz = conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
144     }
145     LOG.info("Instantiating WALProvider of type " + clazz);
146     try {
147       final WALProvider result = clazz.newInstance();
148       result.init(this, conf, listeners, providerId);
149       return result;
150     } catch (InstantiationException exception) {
151       LOG.error("couldn't set up WALProvider, check config key " + key);
152       LOG.debug("Exception details for failure to load WALProvider.", exception);
153       throw new IOException("couldn't set up WALProvider", exception);
154     } catch (IllegalAccessException exception) {
155       LOG.error("couldn't set up WALProvider, check config key " + key);
156       LOG.debug("Exception details for failure to load WALProvider.", exception);
157       throw new IOException("couldn't set up WALProvider", exception);
158     }
159   }
160 
161   /**
162    * @param conf must not be null, will keep a reference to read params in later reader/writer
163    *     instances.
164    * @param listeners may be null. will be given to all created wals (and not meta-wals)
165    * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
166    *     to make a directory
167    */
WALFactory(final Configuration conf, final List<WALActionsListener> listeners, final String factoryId)168   public WALFactory(final Configuration conf, final List<WALActionsListener> listeners,
169       final String factoryId) throws IOException {
170     // until we've moved reader/writer construction down into providers, this initialization must
171     // happen prior to provider initialization, in case they need to instantiate a reader/writer.
172     timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
173     /* TODO Both of these are probably specific to the fs wal provider */
174     logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
175         DefaultWALProvider.Reader.class);
176     this.conf = conf;
177     this.factoryId = factoryId;
178     // end required early initialization
179     if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
180       provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, listeners, null);
181     } else {
182       // special handling of existing configuration behavior.
183       LOG.warn("Running with WAL disabled.");
184       provider = new DisabledWALProvider();
185       provider.init(this, conf, null, factoryId);
186     }
187   }
188 
189   /**
190    * Shutdown all WALs and clean up any underlying storage.
191    * Use only when you will not need to replay and edits that have gone to any wals from this
192    * factory.
193    */
close()194   public void close() throws IOException {
195     final WALProvider metaProvider = this.metaProvider.get();
196     if (null != metaProvider) {
197       metaProvider.close();
198     }
199     // close is called on a WALFactory with null provider in the case of contention handling
200     // within the getInstance method.
201     if (null != provider) {
202       provider.close();
203     }
204   }
205 
206   /**
207    * Tell the underlying WAL providers to shut down, but do not clean up underlying storage.
208    * If you are not ending cleanly and will need to replay edits from this factory's wals,
209    * use this method if you can as it will try to leave things as tidy as possible.
210    */
shutdown()211   public void shutdown() throws IOException {
212     IOException exception = null;
213     final WALProvider metaProvider = this.metaProvider.get();
214     if (null != metaProvider) {
215       try {
216         metaProvider.shutdown();
217       } catch(IOException ioe) {
218         exception = ioe;
219       }
220     }
221     provider.shutdown();
222     if (null != exception) {
223       throw exception;
224     }
225   }
226 
227   /**
228    * @param identifier may not be null, contents will not be altered
229    */
getWAL(final byte[] identifier)230   public WAL getWAL(final byte[] identifier) throws IOException {
231     return provider.getWAL(identifier);
232   }
233 
234   /**
235    * @param identifier may not be null, contents will not be altered
236    */
getMetaWAL(final byte[] identifier)237   public WAL getMetaWAL(final byte[] identifier) throws IOException {
238     WALProvider metaProvider = this.metaProvider.get();
239     if (null == metaProvider) {
240       final WALProvider temp = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER,
241           Collections.<WALActionsListener>singletonList(new MetricsWAL()),
242           DefaultWALProvider.META_WAL_PROVIDER_ID);
243       if (this.metaProvider.compareAndSet(null, temp)) {
244         metaProvider = temp;
245       } else {
246         // reference must now be to a provider created in another thread.
247         temp.close();
248         metaProvider = this.metaProvider.get();
249       }
250     }
251     return metaProvider.getWAL(identifier);
252   }
253 
createReader(final FileSystem fs, final Path path)254   public Reader createReader(final FileSystem fs, final Path path) throws IOException {
255     return createReader(fs, path, (CancelableProgressable)null);
256   }
257 
258   /**
259    * Create a reader for the WAL. If you are reading from a file that's being written to and need
260    * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method
261    * then just seek back to the last known good position.
262    * @return A WAL reader.  Close when done with it.
263    * @throws IOException
264    */
createReader(final FileSystem fs, final Path path, CancelableProgressable reporter)265   public Reader createReader(final FileSystem fs, final Path path,
266       CancelableProgressable reporter) throws IOException {
267     return createReader(fs, path, reporter, true);
268   }
269 
createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, boolean allowCustom)270   public Reader createReader(final FileSystem fs, final Path path,
271       CancelableProgressable reporter, boolean allowCustom)
272       throws IOException {
273     Class<? extends DefaultWALProvider.Reader> lrClass =
274         allowCustom ? logReaderClass : ProtobufLogReader.class;
275 
276     try {
277       // A wal file could be under recovery, so it may take several
278       // tries to get it open. Instead of claiming it is corrupted, retry
279       // to open it up to 5 minutes by default.
280       long startWaiting = EnvironmentEdgeManager.currentTime();
281       long openTimeout = timeoutMillis + startWaiting;
282       int nbAttempt = 0;
283       FSDataInputStream stream = null;
284       while (true) {
285         try {
286           if (lrClass != ProtobufLogReader.class) {
287             // User is overriding the WAL reader, let them.
288             DefaultWALProvider.Reader reader = lrClass.newInstance();
289             reader.init(fs, path, conf, null);
290             return reader;
291           } else {
292             stream = fs.open(path);
293             // Note that zero-length file will fail to read PB magic, and attempt to create
294             // a non-PB reader and fail the same way existing code expects it to. If we get
295             // rid of the old reader entirely, we need to handle 0-size files differently from
296             // merely non-PB files.
297             byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length];
298             boolean isPbWal = (stream.read(magic) == magic.length)
299                 && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC);
300             DefaultWALProvider.Reader reader =
301                 isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
302             reader.init(fs, path, conf, stream);
303             return reader;
304           }
305         } catch (IOException e) {
306           try {
307             if (stream != null) {
308               stream.close();
309             }
310           } catch (IOException exception) {
311             LOG.warn("Could not close FSDataInputStream" + exception.getMessage());
312             LOG.debug("exception details", exception);
313           }
314           String msg = e.getMessage();
315           if (msg != null && (msg.contains("Cannot obtain block length")
316               || msg.contains("Could not obtain the last block")
317               || msg.matches("Blocklist for [^ ]* has changed.*"))) {
318             if (++nbAttempt == 1) {
319               LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
320             }
321             if (reporter != null && !reporter.progress()) {
322               throw new InterruptedIOException("Operation is cancelled");
323             }
324             if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
325               LOG.error("Can't open after " + nbAttempt + " attempts and "
326                 + (EnvironmentEdgeManager.currentTime() - startWaiting)
327                 + "ms " + " for " + path);
328             } else {
329               try {
330                 Thread.sleep(nbAttempt < 3 ? 500 : 1000);
331                 continue; // retry
332               } catch (InterruptedException ie) {
333                 InterruptedIOException iioe = new InterruptedIOException();
334                 iioe.initCause(ie);
335                 throw iioe;
336               }
337             }
338             throw new LeaseNotRecoveredException(e);
339           } else {
340             throw e;
341           }
342         }
343       }
344     } catch (IOException ie) {
345       throw ie;
346     } catch (Exception e) {
347       throw new IOException("Cannot get log reader", e);
348     }
349   }
350 
351   /**
352    * Create a writer for the WAL.
353    * should be package-private. public only for tests and
354    * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
355    * @return A WAL writer.  Close when done with it.
356    * @throws IOException
357    */
createWALWriter(final FileSystem fs, final Path path)358   public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
359     return DefaultWALProvider.createWriter(conf, fs, path, false);
360   }
361 
362   /**
363    * should be package-private, visible for recovery testing.
364    * @return an overwritable writer for recovered edits. caller should close.
365    */
366   @VisibleForTesting
createRecoveredEditsWriter(final FileSystem fs, final Path path)367   public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
368       throws IOException {
369     return DefaultWALProvider.createWriter(conf, fs, path, true);
370   }
371 
372   // These static methods are currently used where it's impractical to
373   // untangle the reliance on state in the filesystem. They rely on singleton
374   // WALFactory that just provides Reader / Writers.
375   // For now, first Configuration object wins. Practically this just impacts the reader/writer class
376   private static final AtomicReference<WALFactory> singleton = new AtomicReference<WALFactory>();
377   private static final String SINGLETON_ID = WALFactory.class.getName();
378 
379   // public only for FSHLog and UpgradeTo96
getInstance(Configuration configuration)380   public static WALFactory getInstance(Configuration configuration) {
381     WALFactory factory = singleton.get();
382     if (null == factory) {
383       WALFactory temp = new WALFactory(configuration);
384       if (singleton.compareAndSet(null, temp)) {
385         factory = temp;
386       } else {
387         // someone else beat us to initializing
388         try {
389           temp.close();
390         } catch (IOException exception) {
391           LOG.debug("failed to close temporary singleton. ignoring.", exception);
392         }
393         factory = singleton.get();
394       }
395     }
396     return factory;
397   }
398 
399   /**
400    * Create a reader for the given path, accept custom reader classes from conf.
401    * If you already have a WALFactory, you should favor the instance method.
402    * @return a WAL Reader, caller must close.
403    */
createReader(final FileSystem fs, final Path path, final Configuration configuration)404   public static Reader createReader(final FileSystem fs, final Path path,
405       final Configuration configuration) throws IOException {
406     return getInstance(configuration).createReader(fs, path);
407   }
408 
409   /**
410    * Create a reader for the given path, accept custom reader classes from conf.
411    * If you already have a WALFactory, you should favor the instance method.
412    * @return a WAL Reader, caller must close.
413    */
createReader(final FileSystem fs, final Path path, final Configuration configuration, final CancelableProgressable reporter)414   static Reader createReader(final FileSystem fs, final Path path,
415       final Configuration configuration, final CancelableProgressable reporter) throws IOException {
416     return getInstance(configuration).createReader(fs, path, reporter);
417   }
418 
419   /**
420    * Create a reader for the given path, ignore custom reader classes from conf.
421    * If you already have a WALFactory, you should favor the instance method.
422    * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
423    * @return a WAL Reader, caller must close.
424    */
createReaderIgnoreCustomClass(final FileSystem fs, final Path path, final Configuration configuration)425   public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path,
426       final Configuration configuration) throws IOException {
427     return getInstance(configuration).createReader(fs, path, null, false);
428   }
429 
430   /**
431    * If you already have a WALFactory, you should favor the instance method.
432    * @return a Writer that will overwrite files. Caller must close.
433    */
createRecoveredEditsWriter(final FileSystem fs, final Path path, final Configuration configuration)434   static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
435       final Configuration configuration)
436       throws IOException {
437     return DefaultWALProvider.createWriter(configuration, fs, path, true);
438   }
439 
440   /**
441    * If you already have a WALFactory, you should favor the instance method.
442    * @return a writer that won't overwrite files. Caller must close.
443    */
444   @VisibleForTesting
createWALWriter(final FileSystem fs, final Path path, final Configuration configuration)445   public static Writer createWALWriter(final FileSystem fs, final Path path,
446       final Configuration configuration)
447       throws IOException {
448     return DefaultWALProvider.createWriter(configuration, fs, path, false);
449   }
450 }
451