1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 20 package org.apache.hadoop.hbase; 21 22 import java.text.MessageFormat; 23 24 import junit.framework.Assert; 25 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 import org.apache.hadoop.hbase.classification.InterfaceAudience; 29 import org.apache.hadoop.conf.Configuration; 30 31 /** 32 * A class that provides a standard waitFor pattern 33 * See details at https://issues.apache.org/jira/browse/HBASE-7384 34 */ 35 @InterfaceAudience.Private 36 public final class Waiter { 37 38 private static final Log LOG = LogFactory.getLog(Waiter.class); 39 40 /** 41 * System property name whose value is a scale factor to increase time out values dynamically used 42 * in {@link #sleep(Configuration, long)}, {@link #waitFor(Configuration, long, Predicate)}, 43 * {@link #waitFor(Configuration, long, long, Predicate)}, and 44 * {@link #waitFor(Configuration, long, long, boolean, Predicate)} method 45 * <p/> 46 * The actual time out value will equal to hbase.test.wait.for.ratio * passed-in timeout 47 */ 48 public static final String HBASE_TEST_WAIT_FOR_RATIO = "hbase.test.wait.for.ratio"; 49 50 private static float HBASE_WAIT_FOR_RATIO_DEFAULT = 1; 51 52 private static float waitForRatio = -1; 53 Waiter()54 private Waiter() { 55 } 56 57 /** 58 * Returns the 'wait for ratio' used in the {@link #sleep(Configuration, long)}, 59 * {@link #waitFor(Configuration, long, Predicate)}, 60 * {@link #waitFor(Configuration, long, long, Predicate)} and 61 * {@link #waitFor(Configuration, long, long, boolean, Predicate)} methods of the class 62 * <p/> 63 * This is useful to dynamically adjust max time out values when same test cases run in different 64 * test machine settings without recompiling & re-deploying code. 65 * <p/> 66 * The value is obtained from the Java System property or configuration setting 67 * <code>hbase.test.wait.for.ratio</code> which defaults to <code>1</code>. 68 * @param conf the configuration 69 * @return the 'wait for ratio' for the current test run. 70 */ getWaitForRatio(Configuration conf)71 public static float getWaitForRatio(Configuration conf) { 72 if (waitForRatio < 0) { 73 // System property takes precedence over configuration setting 74 if (System.getProperty(HBASE_TEST_WAIT_FOR_RATIO) != null) { 75 waitForRatio = Float.parseFloat(System.getProperty(HBASE_TEST_WAIT_FOR_RATIO)); 76 } else { 77 waitForRatio = conf.getFloat(HBASE_TEST_WAIT_FOR_RATIO, HBASE_WAIT_FOR_RATIO_DEFAULT); 78 } 79 } 80 return waitForRatio; 81 } 82 83 /** 84 * A predicate 'closure' used by the {@link Waiter#waitFor(Configuration, long, Predicate)} and 85 * {@link Waiter#waitFor(Configuration, long, Predicate)} and 86 * {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate) methods. 87 */ 88 @InterfaceAudience.Private 89 public interface Predicate<E extends Exception> { 90 91 /** 92 * Perform a predicate evaluation. 93 * @return the boolean result of the evaluation. 94 * @throws Exception thrown if the predicate evaluation could not evaluate. 95 */ evaluate()96 boolean evaluate() throws E; 97 98 } 99 100 /** 101 * A mixin interface, can be used with {@link Waiter} to explain failed state. 102 */ 103 @InterfaceAudience.Private 104 public interface ExplainingPredicate<E extends Exception> extends Predicate<E> { 105 106 /** 107 * Perform a predicate evaluation. 108 * 109 * @return explanation of failed state 110 */ explainFailure()111 String explainFailure() throws E; 112 113 } 114 115 /** 116 * Makes the current thread sleep for the duration equal to the specified time in milliseconds 117 * multiplied by the {@link #getWaitForRatio(Configuration)}. 118 * @param conf the configuration 119 * @param time the number of milliseconds to sleep. 120 */ sleep(Configuration conf, long time)121 public static void sleep(Configuration conf, long time) { 122 try { 123 Thread.sleep((long) (getWaitForRatio(conf) * time)); 124 } catch (InterruptedException ex) { 125 LOG.warn(MessageFormat.format("Sleep interrupted, {0}", ex.toString())); 126 } 127 } 128 129 /** 130 * Waits up to the duration equal to the specified timeout multiplied by the 131 * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become 132 * <code>true</code>, failing the test if the timeout is reached and the Predicate is still 133 * <code>false</code>. 134 * <p/> 135 * @param conf the configuration 136 * @param timeout the timeout in milliseconds to wait for the predicate. 137 * @param predicate the predicate to evaluate. 138 * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or 139 * wait is interrupted otherwise <code>-1</code> when times out 140 */ waitFor(Configuration conf, long timeout, Predicate<E> predicate)141 public static <E extends Exception> long waitFor(Configuration conf, long timeout, 142 Predicate<E> predicate) throws E { 143 return waitFor(conf, timeout, 100, true, predicate); 144 } 145 146 /** 147 * Waits up to the duration equal to the specified timeout multiplied by the 148 * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become 149 * <code>true</code>, failing the test if the timeout is reached and the Predicate is still 150 * <code>false</code>. 151 * <p/> 152 * @param conf the configuration 153 * @param timeout the max timeout in milliseconds to wait for the predicate. 154 * @param interval the interval in milliseconds to evaluate predicate. 155 * @param predicate the predicate to evaluate. 156 * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or 157 * wait is interrupted otherwise <code>-1</code> when times out 158 */ waitFor(Configuration conf, long timeout, long interval, Predicate<E> predicate)159 public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval, 160 Predicate<E> predicate) throws E { 161 return waitFor(conf, timeout, interval, true, predicate); 162 } 163 164 /** 165 * Waits up to the duration equal to the specified timeout multiplied by the 166 * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become 167 * <code>true</code>, failing the test if the timeout is reached, the Predicate is still 168 * <code>false</code> and failIfTimeout is set as <code>true</code>. 169 * <p/> 170 * @param conf the configuration 171 * @param timeout the timeout in milliseconds to wait for the predicate. 172 * @param interval the interval in milliseconds to evaluate predicate. 173 * @param failIfTimeout indicates if should fail current test case when times out. 174 * @param predicate the predicate to evaluate. 175 * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or 176 * wait is interrupted otherwise <code>-1</code> when times out 177 */ waitFor(Configuration conf, long timeout, long interval, boolean failIfTimeout, Predicate<E> predicate)178 public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval, 179 boolean failIfTimeout, Predicate<E> predicate) throws E { 180 long started = System.currentTimeMillis(); 181 long adjustedTimeout = (long) (getWaitForRatio(conf) * timeout); 182 long mustEnd = started + adjustedTimeout; 183 long remainderWait = 0; 184 long sleepInterval = 0; 185 Boolean eval = false; 186 Boolean interrupted = false; 187 188 try { 189 LOG.info(MessageFormat.format("Waiting up to [{0}] milli-secs(wait.for.ratio=[{1}])", 190 adjustedTimeout, getWaitForRatio(conf))); 191 while (!(eval = predicate.evaluate()) 192 && (remainderWait = mustEnd - System.currentTimeMillis()) > 0) { 193 try { 194 // handle tail case when remainder wait is less than one interval 195 sleepInterval = (remainderWait > interval) ? interval : remainderWait; 196 Thread.sleep(sleepInterval); 197 } catch (InterruptedException e) { 198 eval = predicate.evaluate(); 199 interrupted = true; 200 break; 201 } 202 } 203 if (!eval) { 204 if (interrupted) { 205 LOG.warn(MessageFormat.format("Waiting interrupted after [{0}] msec", 206 System.currentTimeMillis() - started)); 207 } else if (failIfTimeout) { 208 String msg = getExplanation(predicate); 209 Assert.fail(MessageFormat 210 .format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg); 211 } else { 212 String msg = getExplanation(predicate); 213 LOG.warn( 214 MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg); 215 } 216 } 217 return (eval || interrupted) ? (System.currentTimeMillis() - started) : -1; 218 } catch (Exception ex) { 219 throw new RuntimeException(ex); 220 } 221 } 222 getExplanation(Predicate explain)223 public static String getExplanation(Predicate explain) { 224 if (explain instanceof ExplainingPredicate) { 225 try { 226 return " " + ((ExplainingPredicate) explain).explainFailure(); 227 } catch (Exception e) { 228 LOG.error("Failed to get explanation, ", e); 229 return e.getMessage(); 230 } 231 } else { 232 return ""; 233 } 234 } 235 236 } 237