1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hbase.procedure; 19 20 import java.io.IOException; 21 import java.util.concurrent.Callable; 22 import java.util.concurrent.CountDownLatch; 23 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 import org.apache.hadoop.hbase.errorhandling.ForeignException; 27 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 28 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener; 29 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; 30 import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector; 31 import org.apache.zookeeper.KeeperException; 32 33 /** 34 * Distributed procedure member's Subprocedure. A procedure is sarted on a ProcedureCoordinator 35 * which communicates with ProcedureMembers who create and start its part of the Procedure. This 36 * sub part is called a Subprocedure 37 * 38 * Users should subclass this and implement {@link #acquireBarrier()} (get local barrier for this 39 * member), {@link #insideBarrier()} (execute while globally barriered and release barrier) and 40 * {@link #cleanup(Exception)} (release state associated with subprocedure.) 41 * 42 * When submitted to a ProcedureMemeber, the call method is executed in a separate thread. 43 * Latches are use too block its progress and trigger continuations when barrier conditions are 44 * met. 45 * 46 * Exception that makes it out of calls to {@link #acquireBarrier()} or {@link #insideBarrier()} 47 * gets converted into {@link ForeignException}, which will get propagated to the 48 * {@link ProcedureCoordinator}. 49 * 50 * There is a category of procedure (ex: online-snapshots), and a user-specified instance-specific 51 * barrierName. (ex: snapshot121126). 52 */ 53 abstract public class Subprocedure implements Callable<Void> { 54 private static final Log LOG = LogFactory.getLog(Subprocedure.class); 55 56 // Name of the procedure 57 final private String barrierName; 58 59 // 60 // Execution state 61 // 62 63 /** wait on before allowing the in barrier phase to proceed */ 64 private final CountDownLatch inGlobalBarrier; 65 /** counted down when the Subprocedure has completed */ 66 private final CountDownLatch releasedLocalBarrier; 67 68 // 69 // Error handling 70 // 71 /** monitor to check for errors */ 72 protected final ForeignExceptionDispatcher monitor; 73 /** frequency to check for errors (ms) */ 74 protected final long wakeFrequency; 75 protected final TimeoutExceptionInjector executionTimeoutTimer; 76 protected final ProcedureMemberRpcs rpcs; 77 78 private volatile boolean complete = false; 79 80 /** 81 * @param member reference to the member managing this subprocedure 82 * @param procName name of the procedure this subprocedure is associated with 83 * @param monitor notified if there is an error in the subprocedure 84 * @param wakeFrequency time in millis to wake to check if there is an error via the monitor (in 85 * milliseconds). 86 * @param timeout time in millis that will trigger a subprocedure abort if it has not completed 87 */ Subprocedure(ProcedureMember member, String procName, ForeignExceptionDispatcher monitor, long wakeFrequency, long timeout)88 public Subprocedure(ProcedureMember member, String procName, ForeignExceptionDispatcher monitor, 89 long wakeFrequency, long timeout) { 90 // Asserts should be caught during unit testing 91 assert member != null : "procedure member should be non-null"; 92 assert member.getRpcs() != null : "rpc handlers should be non-null"; 93 assert procName != null : "procedure name should be non-null"; 94 assert monitor != null : "monitor should be non-null"; 95 96 // Default to a very large timeout 97 this.rpcs = member.getRpcs(); 98 this.barrierName = procName; 99 this.monitor = monitor; 100 // forward any failures to coordinator. Since this is a dispatcher, resend loops should not be 101 // possible. 102 this.monitor.addListener(new ForeignExceptionListener() { 103 @Override 104 public void receive(ForeignException ee) { 105 // if this is a notification from a remote source, just log 106 if (ee.isRemote()) { 107 LOG.debug("Was remote foreign exception, not redispatching error", ee); 108 return; 109 } 110 // if this is a local KeeperException, don't attempt to notify other members 111 if (ee.getCause() instanceof KeeperException) { 112 LOG.debug("Was KeeperException, not redispatching error", ee); 113 return; 114 } 115 // if it is other local error, then send it to the coordinator 116 try { 117 rpcs.sendMemberAborted(Subprocedure.this, ee); 118 } catch (IOException e) { 119 // this will fail all the running procedures, since the connection is down 120 LOG.error("Can't reach controller, not propagating error", e); 121 } 122 } 123 }); 124 125 this.wakeFrequency = wakeFrequency; 126 this.inGlobalBarrier = new CountDownLatch(1); 127 this.releasedLocalBarrier = new CountDownLatch(1); 128 129 // accept error from timer thread, this needs to be started. 130 this.executionTimeoutTimer = new TimeoutExceptionInjector(monitor, timeout); 131 } 132 getName()133 public String getName() { 134 return barrierName; 135 } 136 getMemberName()137 public String getMemberName() { 138 return rpcs.getMemberName(); 139 } 140 rethrowException()141 private void rethrowException() throws ForeignException { 142 monitor.rethrowException(); 143 } 144 145 /** 146 * Execute the Subprocedure {@link #acquireBarrier()} and {@link #insideBarrier()} methods 147 * while keeping some state for other threads to access. 148 * 149 * This would normally be executed by the ProcedureMemeber when a acquire message comes from the 150 * coordinator. Rpcs are used to spend message back to the coordinator after different phases 151 * are executed. Any exceptions caught during the execution (except for InterruptedException) get 152 * converted and propagated to coordinator via {@link ProcedureMemberRpcs#sendMemberAborted( 153 * Subprocedure, ForeignException)}. 154 */ 155 @SuppressWarnings("finally") call()156 final public Void call() { 157 LOG.debug("Starting subprocedure '" + barrierName + "' with timeout " + 158 executionTimeoutTimer.getMaxTime() + "ms"); 159 // start the execution timeout timer 160 executionTimeoutTimer.start(); 161 162 try { 163 // start by checking for error first 164 rethrowException(); 165 LOG.debug("Subprocedure '" + barrierName + "' starting 'acquire' stage"); 166 acquireBarrier(); 167 LOG.debug("Subprocedure '" + barrierName + "' locally acquired"); 168 rethrowException(); 169 170 // vote yes to coordinator about being prepared 171 rpcs.sendMemberAcquired(this); 172 LOG.debug("Subprocedure '" + barrierName + "' coordinator notified of 'acquire', waiting on" + 173 " 'reached' or 'abort' from coordinator"); 174 175 // wait for the procedure to reach global barrier before proceding 176 waitForReachedGlobalBarrier(); 177 rethrowException(); // if Coordinator aborts, will bail from here with exception 178 179 // In traditional 2PC, if a member reaches this state the TX has been committed and the 180 // member is responsible for rolling forward and recovering and completing the subsequent 181 // operations in the case of failure. It cannot rollback. 182 // 183 // This implementation is not 2PC since it can still rollback here, and thus has different 184 // semantics. 185 186 LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator."); 187 byte[] dataToCoordinator = insideBarrier(); 188 LOG.debug("Subprocedure '" + barrierName + "' locally completed"); 189 rethrowException(); 190 191 // Ack that the member has executed and released local barrier 192 rpcs.sendMemberCompleted(this, dataToCoordinator); 193 LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion"); 194 195 // make sure we didn't get an external exception 196 rethrowException(); 197 } catch (Exception e) { 198 String msg = null; 199 if (e instanceof InterruptedException) { 200 msg = "Procedure '" + barrierName + "' aborting due to interrupt!" + 201 " Likely due to pool shutdown."; 202 Thread.currentThread().interrupt(); 203 } else if (e instanceof ForeignException) { 204 msg = "Subprocedure '" + barrierName + "' aborting due to a ForeignException!"; 205 } else { 206 msg = "Subprocedure '" + barrierName + "' failed!"; 207 } 208 cancel(msg, e); 209 210 LOG.debug("Subprocedure '" + barrierName + "' running cleanup."); 211 cleanup(e); 212 } finally { 213 releasedLocalBarrier.countDown(); 214 215 // tell the timer we are done, if we get here successfully 216 executionTimeoutTimer.complete(); 217 complete = true; 218 LOG.debug("Subprocedure '" + barrierName + "' completed."); 219 return null; 220 } 221 } 222 isComplete()223 boolean isComplete() { 224 return complete; 225 } 226 227 /** 228 * exposed for testing. 229 */ getErrorCheckable()230 ForeignExceptionSnare getErrorCheckable() { 231 return this.monitor; 232 } 233 234 /** 235 * The implementation of this method should gather and hold required resources (locks, disk 236 * space, etc) to satisfy the Procedures barrier condition. For example, this would be where 237 * to make all the regions on a RS on the quiescent for an procedure that required all regions 238 * to be globally quiesed. 239 * 240 * Users should override this method. If a quiescent is not required, this is overkill but 241 * can still be used to execute a procedure on all members and to propagate any exceptions. 242 * 243 * @throws ForeignException 244 */ acquireBarrier()245 abstract public void acquireBarrier() throws ForeignException; 246 247 /** 248 * The implementation of this method should act with the assumption that the barrier condition 249 * has been satisfied. Continuing the previous example, a condition could be that all RS's 250 * globally have been quiesced, and procedures that require this precondition could be 251 * implemented here. 252 * The implementation should also collect the result of the subprocedure as data to be returned 253 * to the coordinator upon successful completion. 254 * Users should override this method. 255 * @return the data the subprocedure wants to return to coordinator side. 256 * @throws ForeignException 257 */ insideBarrier()258 abstract public byte[] insideBarrier() throws ForeignException; 259 260 /** 261 * Users should override this method. This implementation of this method should rollback and 262 * cleanup any temporary or partially completed state that the {@link #acquireBarrier()} may have 263 * created. 264 * @param e 265 */ cleanup(Exception e)266 abstract public void cleanup(Exception e); 267 268 /** 269 * Method to cancel the Subprocedure by injecting an exception from and external source. 270 * @param cause 271 */ cancel(String msg, Throwable cause)272 public void cancel(String msg, Throwable cause) { 273 LOG.error(msg, cause); 274 complete = true; 275 if (cause instanceof ForeignException) { 276 monitor.receive((ForeignException) cause); 277 } else { 278 monitor.receive(new ForeignException(getMemberName(), cause)); 279 } 280 } 281 282 /** 283 * Callback for the member rpcs to call when the global barrier has been reached. This 284 * unblocks the main subprocedure exectuion thread so that the Subprocedure's 285 * {@link #insideBarrier()} method can be run. 286 */ receiveReachedGlobalBarrier()287 public void receiveReachedGlobalBarrier() { 288 inGlobalBarrier.countDown(); 289 } 290 291 // 292 // Subprocedure Internal State interface 293 // 294 295 /** 296 * Wait for the reached global barrier notification. 297 * 298 * Package visibility for testing 299 * 300 * @throws ForeignException 301 * @throws InterruptedException 302 */ waitForReachedGlobalBarrier()303 void waitForReachedGlobalBarrier() throws ForeignException, InterruptedException { 304 Procedure.waitForLatch(inGlobalBarrier, monitor, wakeFrequency, 305 barrierName + ":remote acquired"); 306 } 307 308 /** 309 * Waits until the entire procedure has globally completed, or has been aborted. 310 * @throws ForeignException 311 * @throws InterruptedException 312 */ waitForLocallyCompleted()313 public void waitForLocallyCompleted() throws ForeignException, InterruptedException { 314 Procedure.waitForLatch(releasedLocalBarrier, monitor, wakeFrequency, 315 barrierName + ":completed"); 316 } 317 318 /** 319 * Empty Subprocedure for testing. 320 * 321 * Must be public for stubbing used in testing to work. 322 */ 323 public static class SubprocedureImpl extends Subprocedure { 324 SubprocedureImpl(ProcedureMember member, String opName, ForeignExceptionDispatcher monitor, long wakeFrequency, long timeout)325 public SubprocedureImpl(ProcedureMember member, String opName, 326 ForeignExceptionDispatcher monitor, long wakeFrequency, long timeout) { 327 super(member, opName, monitor, wakeFrequency, timeout); 328 } 329 330 @Override acquireBarrier()331 public void acquireBarrier() throws ForeignException {} 332 333 @Override insideBarrier()334 public byte[] insideBarrier() throws ForeignException { 335 return new byte[0]; 336 } 337 338 @Override cleanup(Exception e)339 public void cleanup(Exception e) {} 340 }; 341 } 342