1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 
20 package org.apache.hadoop.hbase;
21 
22 import java.text.MessageFormat;
23 
24 import junit.framework.Assert;
25 
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 
31 /**
32  * A class that provides a standard waitFor pattern
33  * See details at https://issues.apache.org/jira/browse/HBASE-7384
34  */
35 @InterfaceAudience.Private
36 public final class Waiter {
37 
38   private static final Log LOG = LogFactory.getLog(Waiter.class);
39 
40   /**
41    * System property name whose value is a scale factor to increase time out values dynamically used
42    * in {@link #sleep(Configuration, long)}, {@link #waitFor(Configuration, long, Predicate)},
43    * {@link #waitFor(Configuration, long, long, Predicate)}, and
44    * {@link #waitFor(Configuration, long, long, boolean, Predicate)} method
45    * <p/>
46    * The actual time out value will equal to hbase.test.wait.for.ratio * passed-in timeout
47    */
48   public static final String HBASE_TEST_WAIT_FOR_RATIO = "hbase.test.wait.for.ratio";
49 
50   private static float HBASE_WAIT_FOR_RATIO_DEFAULT = 1;
51 
52   private static float waitForRatio = -1;
53 
Waiter()54   private Waiter() {
55   }
56 
57   /**
58    * Returns the 'wait for ratio' used in the {@link #sleep(Configuration, long)},
59    * {@link #waitFor(Configuration, long, Predicate)},
60    * {@link #waitFor(Configuration, long, long, Predicate)} and
61    * {@link #waitFor(Configuration, long, long, boolean, Predicate)} methods of the class
62    * <p/>
63    * This is useful to dynamically adjust max time out values when same test cases run in different
64    * test machine settings without recompiling & re-deploying code.
65    * <p/>
66    * The value is obtained from the Java System property or configuration setting
67    * <code>hbase.test.wait.for.ratio</code> which defaults to <code>1</code>.
68    * @param conf the configuration
69    * @return the 'wait for ratio' for the current test run.
70    */
getWaitForRatio(Configuration conf)71   public static float getWaitForRatio(Configuration conf) {
72     if (waitForRatio < 0) {
73       // System property takes precedence over configuration setting
74       if (System.getProperty(HBASE_TEST_WAIT_FOR_RATIO) != null) {
75         waitForRatio = Float.parseFloat(System.getProperty(HBASE_TEST_WAIT_FOR_RATIO));
76       } else {
77         waitForRatio = conf.getFloat(HBASE_TEST_WAIT_FOR_RATIO, HBASE_WAIT_FOR_RATIO_DEFAULT);
78       }
79     }
80     return waitForRatio;
81   }
82 
83   /**
84    * A predicate 'closure' used by the {@link Waiter#waitFor(Configuration, long, Predicate)} and
85    * {@link Waiter#waitFor(Configuration, long, Predicate)} and
86    * {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate) methods.
87    */
88   @InterfaceAudience.Private
89   public interface Predicate<E extends Exception> {
90 
91     /**
92      * Perform a predicate evaluation.
93      * @return the boolean result of the evaluation.
94      * @throws Exception thrown if the predicate evaluation could not evaluate.
95      */
evaluate()96     boolean evaluate() throws E;
97 
98   }
99 
100   /**
101    * A mixin interface, can be used with {@link Waiter} to explain failed state.
102    */
103   @InterfaceAudience.Private
104   public interface ExplainingPredicate<E extends Exception> extends Predicate<E> {
105 
106     /**
107      * Perform a predicate evaluation.
108      *
109      * @return explanation of failed state
110      */
explainFailure()111     String explainFailure() throws E;
112 
113   }
114 
115   /**
116    * Makes the current thread sleep for the duration equal to the specified time in milliseconds
117    * multiplied by the {@link #getWaitForRatio(Configuration)}.
118    * @param conf the configuration
119    * @param time the number of milliseconds to sleep.
120    */
sleep(Configuration conf, long time)121   public static void sleep(Configuration conf, long time) {
122     try {
123       Thread.sleep((long) (getWaitForRatio(conf) * time));
124     } catch (InterruptedException ex) {
125       LOG.warn(MessageFormat.format("Sleep interrupted, {0}", ex.toString()));
126     }
127   }
128 
129   /**
130    * Waits up to the duration equal to the specified timeout multiplied by the
131    * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
132    * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
133    * <code>false</code>.
134    * <p/>
135    * @param conf the configuration
136    * @param timeout the timeout in milliseconds to wait for the predicate.
137    * @param predicate the predicate to evaluate.
138    * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
139    *         wait is interrupted otherwise <code>-1</code> when times out
140    */
waitFor(Configuration conf, long timeout, Predicate<E> predicate)141   public static <E extends Exception> long waitFor(Configuration conf, long timeout,
142       Predicate<E> predicate) throws E {
143     return waitFor(conf, timeout, 100, true, predicate);
144   }
145 
146   /**
147    * Waits up to the duration equal to the specified timeout multiplied by the
148    * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
149    * <code>true</code>, failing the test if the timeout is reached and the Predicate is still
150    * <code>false</code>.
151    * <p/>
152    * @param conf the configuration
153    * @param timeout the max timeout in milliseconds to wait for the predicate.
154    * @param interval the interval in milliseconds to evaluate predicate.
155    * @param predicate the predicate to evaluate.
156    * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
157    *         wait is interrupted otherwise <code>-1</code> when times out
158    */
waitFor(Configuration conf, long timeout, long interval, Predicate<E> predicate)159   public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
160       Predicate<E> predicate) throws E {
161     return waitFor(conf, timeout, interval, true, predicate);
162   }
163 
164   /**
165    * Waits up to the duration equal to the specified timeout multiplied by the
166    * {@link #getWaitForRatio(Configuration)} for the given {@link Predicate} to become
167    * <code>true</code>, failing the test if the timeout is reached, the Predicate is still
168    * <code>false</code> and failIfTimeout is set as <code>true</code>.
169    * <p/>
170    * @param conf the configuration
171    * @param timeout the timeout in milliseconds to wait for the predicate.
172    * @param interval the interval in milliseconds to evaluate predicate.
173    * @param failIfTimeout indicates if should fail current test case when times out.
174    * @param predicate the predicate to evaluate.
175    * @return the effective wait, in milli-seconds until the predicate becomes <code>true</code> or
176    *         wait is interrupted otherwise <code>-1</code> when times out
177    */
waitFor(Configuration conf, long timeout, long interval, boolean failIfTimeout, Predicate<E> predicate)178   public static <E extends Exception> long waitFor(Configuration conf, long timeout, long interval,
179       boolean failIfTimeout, Predicate<E> predicate) throws E {
180     long started = System.currentTimeMillis();
181     long adjustedTimeout = (long) (getWaitForRatio(conf) * timeout);
182     long mustEnd = started + adjustedTimeout;
183     long remainderWait = 0;
184     long sleepInterval = 0;
185     Boolean eval = false;
186     Boolean interrupted = false;
187 
188     try {
189       LOG.info(MessageFormat.format("Waiting up to [{0}] milli-secs(wait.for.ratio=[{1}])",
190         adjustedTimeout, getWaitForRatio(conf)));
191       while (!(eval = predicate.evaluate())
192               && (remainderWait = mustEnd - System.currentTimeMillis()) > 0) {
193         try {
194           // handle tail case when remainder wait is less than one interval
195           sleepInterval = (remainderWait > interval) ? interval : remainderWait;
196           Thread.sleep(sleepInterval);
197         } catch (InterruptedException e) {
198           eval = predicate.evaluate();
199           interrupted = true;
200           break;
201         }
202       }
203       if (!eval) {
204         if (interrupted) {
205           LOG.warn(MessageFormat.format("Waiting interrupted after [{0}] msec",
206             System.currentTimeMillis() - started));
207         } else if (failIfTimeout) {
208           String msg = getExplanation(predicate);
209           Assert.fail(MessageFormat
210               .format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
211         } else {
212           String msg = getExplanation(predicate);
213           LOG.warn(
214               MessageFormat.format("Waiting timed out after [{0}] msec", adjustedTimeout) + msg);
215         }
216       }
217       return (eval || interrupted) ? (System.currentTimeMillis() - started) : -1;
218     } catch (Exception ex) {
219       throw new RuntimeException(ex);
220     }
221   }
222 
getExplanation(Predicate explain)223   public static String getExplanation(Predicate explain) {
224     if (explain instanceof ExplainingPredicate) {
225       try {
226         return " " + ((ExplainingPredicate) explain).explainFailure();
227       } catch (Exception e) {
228         LOG.error("Failed to get explanation, ", e);
229         return e.getMessage();
230       }
231     } else {
232       return "";
233     }
234   }
235 
236 }
237