1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.namenode.ha; 19 20 import static org.apache.hadoop.util.Time.monotonicNow; 21 22 import java.io.IOException; 23 import java.net.URI; 24 import java.net.URL; 25 import java.security.PrivilegedAction; 26 import java.util.concurrent.Callable; 27 import java.util.concurrent.ExecutionException; 28 import java.util.concurrent.ExecutorService; 29 import java.util.concurrent.Executors; 30 import java.util.concurrent.Future; 31 import java.util.concurrent.ThreadFactory; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.hadoop.classification.InterfaceAudience; 36 import org.apache.hadoop.conf.Configuration; 37 import org.apache.hadoop.ha.ServiceFailedException; 38 import org.apache.hadoop.hdfs.DFSUtil; 39 import org.apache.hadoop.hdfs.HAUtil; 40 import org.apache.hadoop.hdfs.server.namenode.CheckpointConf; 41 import org.apache.hadoop.hdfs.server.namenode.FSImage; 42 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; 43 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; 44 import org.apache.hadoop.hdfs.server.namenode.NameNode; 45 import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException; 46 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; 47 import org.apache.hadoop.hdfs.util.Canceler; 48 import org.apache.hadoop.security.SecurityUtil; 49 import org.apache.hadoop.security.UserGroupInformation; 50 51 import com.google.common.annotations.VisibleForTesting; 52 import com.google.common.base.Preconditions; 53 import com.google.common.util.concurrent.ThreadFactoryBuilder; 54 55 /** 56 * Thread which runs inside the NN when it's in Standby state, 57 * periodically waking up to take a checkpoint of the namespace. 58 * When it takes a checkpoint, it saves it to its local 59 * storage and then uploads it to the remote NameNode. 60 */ 61 @InterfaceAudience.Private 62 public class StandbyCheckpointer { 63 private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class); 64 private static final long PREVENT_AFTER_CANCEL_MS = 2*60*1000L; 65 private final CheckpointConf checkpointConf; 66 private final Configuration conf; 67 private final FSNamesystem namesystem; 68 private long lastCheckpointTime; 69 private final CheckpointerThread thread; 70 private final ThreadFactory uploadThreadFactory; 71 private URL activeNNAddress; 72 private URL myNNAddress; 73 74 private final Object cancelLock = new Object(); 75 private Canceler canceler; 76 77 // Keep track of how many checkpoints were canceled. 78 // This is for use in tests. 79 private static int canceledCount = 0; 80 StandbyCheckpointer(Configuration conf, FSNamesystem ns)81 public StandbyCheckpointer(Configuration conf, FSNamesystem ns) 82 throws IOException { 83 this.namesystem = ns; 84 this.conf = conf; 85 this.checkpointConf = new CheckpointConf(conf); 86 this.thread = new CheckpointerThread(); 87 this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true) 88 .setNameFormat("TransferFsImageUpload-%d").build(); 89 90 setNameNodeAddresses(conf); 91 } 92 93 /** 94 * Determine the address of the NN we are checkpointing 95 * as well as our own HTTP address from the configuration. 96 * @throws IOException 97 */ setNameNodeAddresses(Configuration conf)98 private void setNameNodeAddresses(Configuration conf) throws IOException { 99 // Look up our own address. 100 myNNAddress = getHttpAddress(conf); 101 102 // Look up the active node's address 103 Configuration confForActive = HAUtil.getConfForOtherNode(conf); 104 activeNNAddress = getHttpAddress(confForActive); 105 106 // Sanity-check. 107 Preconditions.checkArgument(checkAddress(activeNNAddress), 108 "Bad address for active NN: %s", activeNNAddress); 109 Preconditions.checkArgument(checkAddress(myNNAddress), 110 "Bad address for standby NN: %s", myNNAddress); 111 } 112 getHttpAddress(Configuration conf)113 private URL getHttpAddress(Configuration conf) throws IOException { 114 final String scheme = DFSUtil.getHttpClientScheme(conf); 115 String defaultHost = NameNode.getServiceAddress(conf, true).getHostName(); 116 URI addr = DFSUtil.getInfoServerWithDefaultHost(defaultHost, conf, scheme); 117 return addr.toURL(); 118 } 119 120 /** 121 * Ensure that the given address is valid and has a port 122 * specified. 123 */ checkAddress(URL addr)124 private static boolean checkAddress(URL addr) { 125 return addr.getPort() != 0; 126 } 127 start()128 public void start() { 129 LOG.info("Starting standby checkpoint thread...\n" + 130 "Checkpointing active NN at " + activeNNAddress + "\n" + 131 "Serving checkpoints at " + myNNAddress); 132 thread.start(); 133 } 134 stop()135 public void stop() throws IOException { 136 cancelAndPreventCheckpoints("Stopping checkpointer"); 137 thread.setShouldRun(false); 138 thread.interrupt(); 139 try { 140 thread.join(); 141 } catch (InterruptedException e) { 142 LOG.warn("Edit log tailer thread exited with an exception"); 143 throw new IOException(e); 144 } 145 } 146 triggerRollbackCheckpoint()147 public void triggerRollbackCheckpoint() { 148 thread.interrupt(); 149 } 150 doCheckpoint()151 private void doCheckpoint() throws InterruptedException, IOException { 152 assert canceler != null; 153 final long txid; 154 final NameNodeFile imageType; 155 156 // Acquire cpLock to make sure no one is modifying the name system. 157 // It does not need the full namesystem write lock, since the only thing 158 // that modifies namesystem on standby node is edit log replaying. 159 namesystem.cpLockInterruptibly(); 160 try { 161 assert namesystem.getEditLog().isOpenForRead() : 162 "Standby Checkpointer should only attempt a checkpoint when " + 163 "NN is in standby mode, but the edit logs are in an unexpected state"; 164 165 FSImage img = namesystem.getFSImage(); 166 167 long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId(); 168 long thisCheckpointTxId = img.getLastAppliedOrWrittenTxId(); 169 assert thisCheckpointTxId >= prevCheckpointTxId; 170 if (thisCheckpointTxId == prevCheckpointTxId) { 171 LOG.info("A checkpoint was triggered but the Standby Node has not " + 172 "received any transactions since the last checkpoint at txid " + 173 thisCheckpointTxId + ". Skipping..."); 174 return; 175 } 176 177 if (namesystem.isRollingUpgrade() 178 && !namesystem.getFSImage().hasRollbackFSImage()) { 179 // if we will do rolling upgrade but have not created the rollback image 180 // yet, name this checkpoint as fsimage_rollback 181 imageType = NameNodeFile.IMAGE_ROLLBACK; 182 } else { 183 imageType = NameNodeFile.IMAGE; 184 } 185 img.saveNamespace(namesystem, imageType, canceler); 186 txid = img.getStorage().getMostRecentCheckpointTxId(); 187 assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" + 188 thisCheckpointTxId + " but instead saved at txid=" + txid; 189 190 // Save the legacy OIV image, if the output dir is defined. 191 String outputDir = checkpointConf.getLegacyOivImageDir(); 192 if (outputDir != null && !outputDir.isEmpty()) { 193 img.saveLegacyOIVImage(namesystem, outputDir, canceler); 194 } 195 } finally { 196 namesystem.cpUnlock(); 197 } 198 199 // Upload the saved checkpoint back to the active 200 // Do this in a separate thread to avoid blocking transition to active 201 // See HDFS-4816 202 ExecutorService executor = 203 Executors.newSingleThreadExecutor(uploadThreadFactory); 204 Future<Void> upload = executor.submit(new Callable<Void>() { 205 @Override 206 public Void call() throws IOException { 207 TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, 208 namesystem.getFSImage().getStorage(), imageType, txid, canceler); 209 return null; 210 } 211 }); 212 executor.shutdown(); 213 try { 214 upload.get(); 215 } catch (InterruptedException e) { 216 // The background thread may be blocked waiting in the throttler, so 217 // interrupt it. 218 upload.cancel(true); 219 throw e; 220 } catch (ExecutionException e) { 221 throw new IOException("Exception during image upload: " + e.getMessage(), 222 e.getCause()); 223 } 224 } 225 226 /** 227 * Cancel any checkpoint that's currently being made, 228 * and prevent any new checkpoints from starting for the next 229 * minute or so. 230 */ cancelAndPreventCheckpoints(String msg)231 public void cancelAndPreventCheckpoints(String msg) throws ServiceFailedException { 232 synchronized (cancelLock) { 233 // The checkpointer thread takes this lock and checks if checkpointing is 234 // postponed. 235 thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS); 236 237 // Before beginning a checkpoint, the checkpointer thread 238 // takes this lock, and creates a canceler object. 239 // If the canceler is non-null, then a checkpoint is in 240 // progress and we need to cancel it. If it's null, then 241 // the operation has not started, meaning that the above 242 // time-based prevention will take effect. 243 if (canceler != null) { 244 canceler.cancel(msg); 245 } 246 } 247 } 248 249 @VisibleForTesting getCanceledCount()250 static int getCanceledCount() { 251 return canceledCount; 252 } 253 countUncheckpointedTxns()254 private long countUncheckpointedTxns() { 255 FSImage img = namesystem.getFSImage(); 256 return img.getLastAppliedOrWrittenTxId() - 257 img.getStorage().getMostRecentCheckpointTxId(); 258 } 259 260 private class CheckpointerThread extends Thread { 261 private volatile boolean shouldRun = true; 262 private volatile long preventCheckpointsUntil = 0; 263 CheckpointerThread()264 private CheckpointerThread() { 265 super("Standby State Checkpointer"); 266 } 267 setShouldRun(boolean shouldRun)268 private void setShouldRun(boolean shouldRun) { 269 this.shouldRun = shouldRun; 270 } 271 272 @Override run()273 public void run() { 274 // We have to make sure we're logged in as far as JAAS 275 // is concerned, in order to use kerberized SSL properly. 276 SecurityUtil.doAsLoginUserOrFatal( 277 new PrivilegedAction<Object>() { 278 @Override 279 public Object run() { 280 doWork(); 281 return null; 282 } 283 }); 284 } 285 286 /** 287 * Prevent checkpoints from occurring for some time period 288 * in the future. This is used when preparing to enter active 289 * mode. We need to not only cancel any concurrent checkpoint, 290 * but also prevent any checkpoints from racing to start just 291 * after the cancel call. 292 * 293 * @param delayMs the number of MS for which checkpoints will be 294 * prevented 295 */ preventCheckpointsFor(long delayMs)296 private void preventCheckpointsFor(long delayMs) { 297 preventCheckpointsUntil = monotonicNow() + delayMs; 298 } 299 doWork()300 private void doWork() { 301 final long checkPeriod = 1000 * checkpointConf.getCheckPeriod(); 302 // Reset checkpoint time so that we don't always checkpoint 303 // on startup. 304 lastCheckpointTime = monotonicNow(); 305 while (shouldRun) { 306 boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage(); 307 if (!needRollbackCheckpoint) { 308 try { 309 Thread.sleep(checkPeriod); 310 } catch (InterruptedException ie) { 311 } 312 if (!shouldRun) { 313 break; 314 } 315 } 316 try { 317 // We may have lost our ticket since last checkpoint, log in again, just in case 318 if (UserGroupInformation.isSecurityEnabled()) { 319 UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab(); 320 } 321 322 final long now = monotonicNow(); 323 final long uncheckpointed = countUncheckpointedTxns(); 324 final long secsSinceLast = (now - lastCheckpointTime) / 1000; 325 326 boolean needCheckpoint = needRollbackCheckpoint; 327 if (needCheckpoint) { 328 LOG.info("Triggering a rollback fsimage for rolling upgrade."); 329 } else if (uncheckpointed >= checkpointConf.getTxnCount()) { 330 LOG.info("Triggering checkpoint because there have been " + 331 uncheckpointed + " txns since the last checkpoint, which " + 332 "exceeds the configured threshold " + 333 checkpointConf.getTxnCount()); 334 needCheckpoint = true; 335 } else if (secsSinceLast >= checkpointConf.getPeriod()) { 336 LOG.info("Triggering checkpoint because it has been " + 337 secsSinceLast + " seconds since the last checkpoint, which " + 338 "exceeds the configured interval " + checkpointConf.getPeriod()); 339 needCheckpoint = true; 340 } 341 342 synchronized (cancelLock) { 343 if (now < preventCheckpointsUntil) { 344 LOG.info("But skipping this checkpoint since we are about to failover!"); 345 canceledCount++; 346 continue; 347 } 348 assert canceler == null; 349 canceler = new Canceler(); 350 } 351 352 if (needCheckpoint) { 353 doCheckpoint(); 354 // reset needRollbackCheckpoint to false only when we finish a ckpt 355 // for rollback image 356 if (needRollbackCheckpoint 357 && namesystem.getFSImage().hasRollbackFSImage()) { 358 namesystem.setCreatedRollbackImages(true); 359 namesystem.setNeedRollbackFsImage(false); 360 } 361 lastCheckpointTime = now; 362 } 363 } catch (SaveNamespaceCancelledException ce) { 364 LOG.info("Checkpoint was cancelled: " + ce.getMessage()); 365 canceledCount++; 366 } catch (InterruptedException ie) { 367 LOG.info("Interrupted during checkpointing", ie); 368 // Probably requested shutdown. 369 continue; 370 } catch (Throwable t) { 371 LOG.error("Exception in doCheckpoint", t); 372 } finally { 373 synchronized (cancelLock) { 374 canceler = null; 375 } 376 } 377 } 378 } 379 } 380 381 @VisibleForTesting getActiveNNAddress()382 URL getActiveNNAddress() { 383 return activeNNAddress; 384 } 385 } 386