1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.namenode.ha;
19 
20 import static org.apache.hadoop.util.Time.monotonicNow;
21 
22 import java.io.IOException;
23 import java.net.URI;
24 import java.net.URL;
25 import java.security.PrivilegedAction;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.ThreadFactory;
32 
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.classification.InterfaceAudience;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.ha.ServiceFailedException;
38 import org.apache.hadoop.hdfs.DFSUtil;
39 import org.apache.hadoop.hdfs.HAUtil;
40 import org.apache.hadoop.hdfs.server.namenode.CheckpointConf;
41 import org.apache.hadoop.hdfs.server.namenode.FSImage;
42 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
43 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
44 import org.apache.hadoop.hdfs.server.namenode.NameNode;
45 import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
46 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
47 import org.apache.hadoop.hdfs.util.Canceler;
48 import org.apache.hadoop.security.SecurityUtil;
49 import org.apache.hadoop.security.UserGroupInformation;
50 
51 import com.google.common.annotations.VisibleForTesting;
52 import com.google.common.base.Preconditions;
53 import com.google.common.util.concurrent.ThreadFactoryBuilder;
54 
55 /**
56  * Thread which runs inside the NN when it's in Standby state,
57  * periodically waking up to take a checkpoint of the namespace.
58  * When it takes a checkpoint, it saves it to its local
59  * storage and then uploads it to the remote NameNode.
60  */
61 @InterfaceAudience.Private
62 public class StandbyCheckpointer {
63   private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class);
64   private static final long PREVENT_AFTER_CANCEL_MS = 2*60*1000L;
65   private final CheckpointConf checkpointConf;
66   private final Configuration conf;
67   private final FSNamesystem namesystem;
68   private long lastCheckpointTime;
69   private final CheckpointerThread thread;
70   private final ThreadFactory uploadThreadFactory;
71   private URL activeNNAddress;
72   private URL myNNAddress;
73 
74   private final Object cancelLock = new Object();
75   private Canceler canceler;
76 
77   // Keep track of how many checkpoints were canceled.
78   // This is for use in tests.
79   private static int canceledCount = 0;
80 
StandbyCheckpointer(Configuration conf, FSNamesystem ns)81   public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
82       throws IOException {
83     this.namesystem = ns;
84     this.conf = conf;
85     this.checkpointConf = new CheckpointConf(conf);
86     this.thread = new CheckpointerThread();
87     this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
88         .setNameFormat("TransferFsImageUpload-%d").build();
89 
90     setNameNodeAddresses(conf);
91   }
92 
93   /**
94    * Determine the address of the NN we are checkpointing
95    * as well as our own HTTP address from the configuration.
96    * @throws IOException
97    */
setNameNodeAddresses(Configuration conf)98   private void setNameNodeAddresses(Configuration conf) throws IOException {
99     // Look up our own address.
100     myNNAddress = getHttpAddress(conf);
101 
102     // Look up the active node's address
103     Configuration confForActive = HAUtil.getConfForOtherNode(conf);
104     activeNNAddress = getHttpAddress(confForActive);
105 
106     // Sanity-check.
107     Preconditions.checkArgument(checkAddress(activeNNAddress),
108         "Bad address for active NN: %s", activeNNAddress);
109     Preconditions.checkArgument(checkAddress(myNNAddress),
110         "Bad address for standby NN: %s", myNNAddress);
111   }
112 
getHttpAddress(Configuration conf)113   private URL getHttpAddress(Configuration conf) throws IOException {
114     final String scheme = DFSUtil.getHttpClientScheme(conf);
115     String defaultHost = NameNode.getServiceAddress(conf, true).getHostName();
116     URI addr = DFSUtil.getInfoServerWithDefaultHost(defaultHost, conf, scheme);
117     return addr.toURL();
118   }
119 
120   /**
121    * Ensure that the given address is valid and has a port
122    * specified.
123    */
checkAddress(URL addr)124   private static boolean checkAddress(URL addr) {
125     return addr.getPort() != 0;
126   }
127 
start()128   public void start() {
129     LOG.info("Starting standby checkpoint thread...\n" +
130         "Checkpointing active NN at " + activeNNAddress + "\n" +
131         "Serving checkpoints at " + myNNAddress);
132     thread.start();
133   }
134 
stop()135   public void stop() throws IOException {
136     cancelAndPreventCheckpoints("Stopping checkpointer");
137     thread.setShouldRun(false);
138     thread.interrupt();
139     try {
140       thread.join();
141     } catch (InterruptedException e) {
142       LOG.warn("Edit log tailer thread exited with an exception");
143       throw new IOException(e);
144     }
145   }
146 
triggerRollbackCheckpoint()147   public void triggerRollbackCheckpoint() {
148     thread.interrupt();
149   }
150 
doCheckpoint()151   private void doCheckpoint() throws InterruptedException, IOException {
152     assert canceler != null;
153     final long txid;
154     final NameNodeFile imageType;
155 
156     // Acquire cpLock to make sure no one is modifying the name system.
157     // It does not need the full namesystem write lock, since the only thing
158     // that modifies namesystem on standby node is edit log replaying.
159     namesystem.cpLockInterruptibly();
160     try {
161       assert namesystem.getEditLog().isOpenForRead() :
162         "Standby Checkpointer should only attempt a checkpoint when " +
163         "NN is in standby mode, but the edit logs are in an unexpected state";
164 
165       FSImage img = namesystem.getFSImage();
166 
167       long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();
168       long thisCheckpointTxId = img.getLastAppliedOrWrittenTxId();
169       assert thisCheckpointTxId >= prevCheckpointTxId;
170       if (thisCheckpointTxId == prevCheckpointTxId) {
171         LOG.info("A checkpoint was triggered but the Standby Node has not " +
172             "received any transactions since the last checkpoint at txid " +
173             thisCheckpointTxId + ". Skipping...");
174         return;
175       }
176 
177       if (namesystem.isRollingUpgrade()
178           && !namesystem.getFSImage().hasRollbackFSImage()) {
179         // if we will do rolling upgrade but have not created the rollback image
180         // yet, name this checkpoint as fsimage_rollback
181         imageType = NameNodeFile.IMAGE_ROLLBACK;
182       } else {
183         imageType = NameNodeFile.IMAGE;
184       }
185       img.saveNamespace(namesystem, imageType, canceler);
186       txid = img.getStorage().getMostRecentCheckpointTxId();
187       assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
188         thisCheckpointTxId + " but instead saved at txid=" + txid;
189 
190       // Save the legacy OIV image, if the output dir is defined.
191       String outputDir = checkpointConf.getLegacyOivImageDir();
192       if (outputDir != null && !outputDir.isEmpty()) {
193         img.saveLegacyOIVImage(namesystem, outputDir, canceler);
194       }
195     } finally {
196       namesystem.cpUnlock();
197     }
198 
199     // Upload the saved checkpoint back to the active
200     // Do this in a separate thread to avoid blocking transition to active
201     // See HDFS-4816
202     ExecutorService executor =
203         Executors.newSingleThreadExecutor(uploadThreadFactory);
204     Future<Void> upload = executor.submit(new Callable<Void>() {
205       @Override
206       public Void call() throws IOException {
207         TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
208             namesystem.getFSImage().getStorage(), imageType, txid, canceler);
209         return null;
210       }
211     });
212     executor.shutdown();
213     try {
214       upload.get();
215     } catch (InterruptedException e) {
216       // The background thread may be blocked waiting in the throttler, so
217       // interrupt it.
218       upload.cancel(true);
219       throw e;
220     } catch (ExecutionException e) {
221       throw new IOException("Exception during image upload: " + e.getMessage(),
222           e.getCause());
223     }
224   }
225 
226   /**
227    * Cancel any checkpoint that's currently being made,
228    * and prevent any new checkpoints from starting for the next
229    * minute or so.
230    */
cancelAndPreventCheckpoints(String msg)231   public void cancelAndPreventCheckpoints(String msg) throws ServiceFailedException {
232     synchronized (cancelLock) {
233       // The checkpointer thread takes this lock and checks if checkpointing is
234       // postponed.
235       thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS);
236 
237       // Before beginning a checkpoint, the checkpointer thread
238       // takes this lock, and creates a canceler object.
239       // If the canceler is non-null, then a checkpoint is in
240       // progress and we need to cancel it. If it's null, then
241       // the operation has not started, meaning that the above
242       // time-based prevention will take effect.
243       if (canceler != null) {
244         canceler.cancel(msg);
245       }
246     }
247   }
248 
249   @VisibleForTesting
getCanceledCount()250   static int getCanceledCount() {
251     return canceledCount;
252   }
253 
countUncheckpointedTxns()254   private long countUncheckpointedTxns() {
255     FSImage img = namesystem.getFSImage();
256     return img.getLastAppliedOrWrittenTxId() -
257       img.getStorage().getMostRecentCheckpointTxId();
258   }
259 
260   private class CheckpointerThread extends Thread {
261     private volatile boolean shouldRun = true;
262     private volatile long preventCheckpointsUntil = 0;
263 
CheckpointerThread()264     private CheckpointerThread() {
265       super("Standby State Checkpointer");
266     }
267 
setShouldRun(boolean shouldRun)268     private void setShouldRun(boolean shouldRun) {
269       this.shouldRun = shouldRun;
270     }
271 
272     @Override
run()273     public void run() {
274       // We have to make sure we're logged in as far as JAAS
275       // is concerned, in order to use kerberized SSL properly.
276       SecurityUtil.doAsLoginUserOrFatal(
277           new PrivilegedAction<Object>() {
278           @Override
279           public Object run() {
280             doWork();
281             return null;
282           }
283         });
284     }
285 
286     /**
287      * Prevent checkpoints from occurring for some time period
288      * in the future. This is used when preparing to enter active
289      * mode. We need to not only cancel any concurrent checkpoint,
290      * but also prevent any checkpoints from racing to start just
291      * after the cancel call.
292      *
293      * @param delayMs the number of MS for which checkpoints will be
294      * prevented
295      */
preventCheckpointsFor(long delayMs)296     private void preventCheckpointsFor(long delayMs) {
297       preventCheckpointsUntil = monotonicNow() + delayMs;
298     }
299 
doWork()300     private void doWork() {
301       final long checkPeriod = 1000 * checkpointConf.getCheckPeriod();
302       // Reset checkpoint time so that we don't always checkpoint
303       // on startup.
304       lastCheckpointTime = monotonicNow();
305       while (shouldRun) {
306         boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();
307         if (!needRollbackCheckpoint) {
308           try {
309             Thread.sleep(checkPeriod);
310           } catch (InterruptedException ie) {
311           }
312           if (!shouldRun) {
313             break;
314           }
315         }
316         try {
317           // We may have lost our ticket since last checkpoint, log in again, just in case
318           if (UserGroupInformation.isSecurityEnabled()) {
319             UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
320           }
321 
322           final long now = monotonicNow();
323           final long uncheckpointed = countUncheckpointedTxns();
324           final long secsSinceLast = (now - lastCheckpointTime) / 1000;
325 
326           boolean needCheckpoint = needRollbackCheckpoint;
327           if (needCheckpoint) {
328             LOG.info("Triggering a rollback fsimage for rolling upgrade.");
329           } else if (uncheckpointed >= checkpointConf.getTxnCount()) {
330             LOG.info("Triggering checkpoint because there have been " +
331                 uncheckpointed + " txns since the last checkpoint, which " +
332                 "exceeds the configured threshold " +
333                 checkpointConf.getTxnCount());
334             needCheckpoint = true;
335           } else if (secsSinceLast >= checkpointConf.getPeriod()) {
336             LOG.info("Triggering checkpoint because it has been " +
337                 secsSinceLast + " seconds since the last checkpoint, which " +
338                 "exceeds the configured interval " + checkpointConf.getPeriod());
339             needCheckpoint = true;
340           }
341 
342           synchronized (cancelLock) {
343             if (now < preventCheckpointsUntil) {
344               LOG.info("But skipping this checkpoint since we are about to failover!");
345               canceledCount++;
346               continue;
347             }
348             assert canceler == null;
349             canceler = new Canceler();
350           }
351 
352           if (needCheckpoint) {
353             doCheckpoint();
354             // reset needRollbackCheckpoint to false only when we finish a ckpt
355             // for rollback image
356             if (needRollbackCheckpoint
357                 && namesystem.getFSImage().hasRollbackFSImage()) {
358               namesystem.setCreatedRollbackImages(true);
359               namesystem.setNeedRollbackFsImage(false);
360             }
361             lastCheckpointTime = now;
362           }
363         } catch (SaveNamespaceCancelledException ce) {
364           LOG.info("Checkpoint was cancelled: " + ce.getMessage());
365           canceledCount++;
366         } catch (InterruptedException ie) {
367           LOG.info("Interrupted during checkpointing", ie);
368           // Probably requested shutdown.
369           continue;
370         } catch (Throwable t) {
371           LOG.error("Exception in doCheckpoint", t);
372         } finally {
373           synchronized (cancelLock) {
374             canceler = null;
375           }
376         }
377       }
378     }
379   }
380 
381   @VisibleForTesting
getActiveNNAddress()382   URL getActiveNNAddress() {
383     return activeNNAddress;
384   }
385 }
386