1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 20 package org.apache.hadoop.hbase.client; 21 22 import java.io.IOException; 23 import java.io.InterruptedIOException; 24 import java.lang.reflect.UndeclaredThrowableException; 25 import java.net.SocketTimeoutException; 26 import java.util.ArrayList; 27 import java.util.List; 28 import java.util.concurrent.atomic.AtomicBoolean; 29 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 import org.apache.hadoop.hbase.classification.InterfaceAudience; 33 import org.apache.hadoop.hbase.DoNotRetryIOException; 34 import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; 35 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 36 import org.apache.hadoop.hbase.util.ExceptionUtil; 37 import org.apache.hadoop.ipc.RemoteException; 38 39 import com.google.protobuf.ServiceException; 40 41 /** 42 * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client 43 * threadlocal outstanding timeouts as so we don't persist too much. 44 * Dynamic rather than static so can set the generic appropriately. 45 * 46 * This object has a state. It should not be used by in parallel by different threads. 47 * Reusing it is possible however, even between multiple threads. However, the user will 48 * have to manage the synchronization on its side: there is no synchronization inside the class. 49 */ 50 @InterfaceAudience.Private 51 public class RpcRetryingCaller<T> { 52 public static final Log LOG = LogFactory.getLog(RpcRetryingCaller.class); 53 /** 54 * When we started making calls. 55 */ 56 private long globalStartTime; 57 /** 58 * Start and end times for a single call. 59 */ 60 private final static int MIN_RPC_TIMEOUT = 2000; 61 /** How many retries are allowed before we start to log */ 62 private final int startLogErrorsCnt; 63 64 private final long pause; 65 private final int retries; 66 private final AtomicBoolean cancelled = new AtomicBoolean(false); 67 private final RetryingCallerInterceptor interceptor; 68 private final RetryingCallerInterceptorContext context; 69 RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt)70 public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) { 71 this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt); 72 } 73 RpcRetryingCaller(long pause, int retries, RetryingCallerInterceptor interceptor, int startLogErrorsCnt)74 public RpcRetryingCaller(long pause, int retries, 75 RetryingCallerInterceptor interceptor, int startLogErrorsCnt) { 76 this.pause = pause; 77 this.retries = retries; 78 this.interceptor = interceptor; 79 context = interceptor.createEmptyContext(); 80 this.startLogErrorsCnt = startLogErrorsCnt; 81 } 82 getRemainingTime(int callTimeout)83 private int getRemainingTime(int callTimeout) { 84 if (callTimeout <= 0) { 85 return 0; 86 } else { 87 if (callTimeout == Integer.MAX_VALUE) return Integer.MAX_VALUE; 88 int remainingTime = (int) (callTimeout - 89 (EnvironmentEdgeManager.currentTime() - this.globalStartTime)); 90 if (remainingTime < MIN_RPC_TIMEOUT) { 91 // If there is no time left, we're trying anyway. It's too late. 92 // 0 means no timeout, and it's not the intent here. So we secure both cases by 93 // resetting to the minimum. 94 remainingTime = MIN_RPC_TIMEOUT; 95 } 96 return remainingTime; 97 } 98 } 99 cancel()100 public void cancel(){ 101 synchronized (cancelled){ 102 cancelled.set(true); 103 cancelled.notifyAll(); 104 } 105 } 106 107 /** 108 * Retries if invocation fails. 109 * @param callTimeout Timeout for this call 110 * @param callable The {@link RetryingCallable} to run. 111 * @return an object of type T 112 * @throws IOException if a remote or network exception occurs 113 * @throws RuntimeException other unspecified error 114 */ callWithRetries(RetryingCallable<T> callable, int callTimeout)115 public T callWithRetries(RetryingCallable<T> callable, int callTimeout) 116 throws IOException, RuntimeException { 117 List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = 118 new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>(); 119 this.globalStartTime = EnvironmentEdgeManager.currentTime(); 120 context.clear(); 121 for (int tries = 0;; tries++) { 122 long expectedSleep; 123 try { 124 callable.prepare(tries != 0); // if called with false, check table status on ZK 125 interceptor.intercept(context.prepare(callable, tries)); 126 return callable.call(getRemainingTime(callTimeout)); 127 } catch (PreemptiveFastFailException e) { 128 throw e; 129 } catch (Throwable t) { 130 ExceptionUtil.rethrowIfInterrupt(t); 131 if (tries > startLogErrorsCnt) { 132 LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" + 133 (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, " 134 + "cancelled=" + cancelled.get() + ", msg=" 135 + callable.getExceptionMessageAdditionalDetail()); 136 } 137 138 // translateException throws exception when should not retry: i.e. when request is bad. 139 interceptor.handleFailure(context, t); 140 t = translateException(t); 141 callable.throwable(t, retries != 1); 142 RetriesExhaustedException.ThrowableWithExtraContext qt = 143 new RetriesExhaustedException.ThrowableWithExtraContext(t, 144 EnvironmentEdgeManager.currentTime(), toString()); 145 exceptions.add(qt); 146 if (tries >= retries - 1) { 147 throw new RetriesExhaustedException(tries, exceptions); 148 } 149 // If the server is dead, we need to wait a little before retrying, to give 150 // a chance to the regions to be 151 // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time 152 expectedSleep = callable.sleep(pause, tries + 1); 153 154 // If, after the planned sleep, there won't be enough time left, we stop now. 155 long duration = singleCallDuration(expectedSleep); 156 if (duration > callTimeout) { 157 String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration + 158 ": " + callable.getExceptionMessageAdditionalDetail(); 159 throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); 160 } 161 } finally { 162 interceptor.updateFailureInfo(context); 163 } 164 try { 165 if (expectedSleep > 0) { 166 synchronized (cancelled) { 167 if (cancelled.get()) return null; 168 cancelled.wait(expectedSleep); 169 } 170 } 171 if (cancelled.get()) return null; 172 } catch (InterruptedException e) { 173 throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries); 174 } 175 } 176 } 177 178 /** 179 * @return Calculate how long a single call took 180 */ singleCallDuration(final long expectedSleep)181 private long singleCallDuration(final long expectedSleep) { 182 return (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + expectedSleep; 183 } 184 185 /** 186 * Call the server once only. 187 * {@link RetryingCallable} has a strange shape so we can do retrys. Use this invocation if you 188 * want to do a single call only (A call to {@link RetryingCallable#call(int)} will not likely 189 * succeed). 190 * @return an object of type T 191 * @throws IOException if a remote or network exception occurs 192 * @throws RuntimeException other unspecified error 193 */ callWithoutRetries(RetryingCallable<T> callable, int callTimeout)194 public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) 195 throws IOException, RuntimeException { 196 // The code of this method should be shared with withRetries. 197 this.globalStartTime = EnvironmentEdgeManager.currentTime(); 198 try { 199 callable.prepare(false); 200 return callable.call(callTimeout); 201 } catch (Throwable t) { 202 Throwable t2 = translateException(t); 203 ExceptionUtil.rethrowIfInterrupt(t2); 204 // It would be nice to clear the location cache here. 205 if (t2 instanceof IOException) { 206 throw (IOException)t2; 207 } else { 208 throw new RuntimeException(t2); 209 } 210 } 211 } 212 213 /** 214 * Get the good or the remote exception if any, throws the DoNotRetryIOException. 215 * @param t the throwable to analyze 216 * @return the translated exception, if it's not a DoNotRetryIOException 217 * @throws DoNotRetryIOException - if we find it, we throw it instead of translating. 218 */ translateException(Throwable t)219 static Throwable translateException(Throwable t) throws DoNotRetryIOException { 220 if (t instanceof UndeclaredThrowableException) { 221 if (t.getCause() != null) { 222 t = t.getCause(); 223 } 224 } 225 if (t instanceof RemoteException) { 226 t = ((RemoteException)t).unwrapRemoteException(); 227 } 228 if (t instanceof LinkageError) { 229 throw new DoNotRetryIOException(t); 230 } 231 if (t instanceof ServiceException) { 232 ServiceException se = (ServiceException)t; 233 Throwable cause = se.getCause(); 234 if (cause != null) { 235 if (cause instanceof DoNotRetryIOException) { 236 throw (DoNotRetryIOException)cause; 237 } else if (cause instanceof NeedUnmanagedConnectionException) { 238 throw new DoNotRetryIOException(cause); 239 } 240 } 241 // Don't let ServiceException out; its rpc specific. 242 t = cause; 243 // t could be a RemoteException so go aaround again. 244 translateException(t); 245 } else if (t instanceof DoNotRetryIOException) { 246 throw (DoNotRetryIOException)t; 247 } else if (t instanceof NeedUnmanagedConnectionException) { 248 throw new DoNotRetryIOException(t); 249 } 250 return t; 251 } 252 253 @Override toString()254 public String toString() { 255 return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime + 256 ", pause=" + pause + ", retries=" + retries + '}'; 257 } 258 } 259