1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.io.retry;
20 
21 import static org.apache.hadoop.io.retry.RetryPolicies.RETRY_FOREVER;
22 import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_THEN_FAIL;
23 import static org.apache.hadoop.io.retry.RetryPolicies.retryByException;
24 import static org.apache.hadoop.io.retry.RetryPolicies.retryByRemoteException;
25 import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
26 import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep;
27 import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
28 import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry;
29 import static org.junit.Assert.*;
30 
31 import java.util.Collections;
32 import java.util.Map;
33 import java.util.concurrent.Callable;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicReference;
40 
41 import org.apache.hadoop.io.retry.UnreliableInterface.FatalException;
42 import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
43 import org.apache.hadoop.ipc.ProtocolTranslator;
44 import org.apache.hadoop.ipc.RemoteException;
45 import org.junit.Before;
46 import org.junit.Test;
47 
48 import java.lang.reflect.UndeclaredThrowableException;
49 
50 public class TestRetryProxy {
51 
52   private UnreliableImplementation unreliableImpl;
53 
54   @Before
setUp()55   public void setUp() throws Exception {
56     unreliableImpl = new UnreliableImplementation();
57   }
58 
59   @Test
testTryOnceThenFail()60   public void testTryOnceThenFail() throws UnreliableException {
61     UnreliableInterface unreliable = (UnreliableInterface)
62       RetryProxy.create(UnreliableInterface.class, unreliableImpl, TRY_ONCE_THEN_FAIL);
63     unreliable.alwaysSucceeds();
64     try {
65       unreliable.failsOnceThenSucceeds();
66       fail("Should fail");
67     } catch (UnreliableException e) {
68       // expected
69     }
70   }
71 
72   /**
73    * Test for {@link RetryInvocationHandler#isRpcInvocation(Object)}
74    */
75   @Test
testRpcInvocation()76   public void testRpcInvocation() throws Exception {
77     // For a proxy method should return true
78     final UnreliableInterface unreliable = (UnreliableInterface)
79       RetryProxy.create(UnreliableInterface.class, unreliableImpl, RETRY_FOREVER);
80     assertTrue(RetryInvocationHandler.isRpcInvocation(unreliable));
81 
82     // Embed the proxy in ProtocolTranslator
83     ProtocolTranslator xlator = new ProtocolTranslator() {
84       int count = 0;
85       @Override
86       public Object getUnderlyingProxyObject() {
87         count++;
88         return unreliable;
89       }
90       @Override
91       public String toString() {
92         return "" + count;
93       }
94     };
95 
96     // For a proxy wrapped in ProtocolTranslator method should return true
97     assertTrue(RetryInvocationHandler.isRpcInvocation(xlator));
98     // Ensure underlying proxy was looked at
99     assertEquals(xlator.toString(), "1");
100 
101     // For non-proxy the method must return false
102     assertFalse(RetryInvocationHandler.isRpcInvocation(new Object()));
103   }
104 
105   @Test
testRetryForever()106   public void testRetryForever() throws UnreliableException {
107     UnreliableInterface unreliable = (UnreliableInterface)
108       RetryProxy.create(UnreliableInterface.class, unreliableImpl, RETRY_FOREVER);
109     unreliable.alwaysSucceeds();
110     unreliable.failsOnceThenSucceeds();
111     unreliable.failsTenTimesThenSucceeds();
112   }
113 
114   @Test
testRetryUpToMaximumCountWithFixedSleep()115   public void testRetryUpToMaximumCountWithFixedSleep() throws UnreliableException {
116     UnreliableInterface unreliable = (UnreliableInterface)
117       RetryProxy.create(UnreliableInterface.class, unreliableImpl,
118                         retryUpToMaximumCountWithFixedSleep(8, 1, TimeUnit.NANOSECONDS));
119     unreliable.alwaysSucceeds();
120     unreliable.failsOnceThenSucceeds();
121     try {
122       unreliable.failsTenTimesThenSucceeds();
123       fail("Should fail");
124     } catch (UnreliableException e) {
125       // expected
126     }
127   }
128 
129   @Test
testRetryUpToMaximumTimeWithFixedSleep()130   public void testRetryUpToMaximumTimeWithFixedSleep() throws UnreliableException {
131     UnreliableInterface unreliable = (UnreliableInterface)
132       RetryProxy.create(UnreliableInterface.class, unreliableImpl,
133                         retryUpToMaximumTimeWithFixedSleep(80, 10, TimeUnit.NANOSECONDS));
134     unreliable.alwaysSucceeds();
135     unreliable.failsOnceThenSucceeds();
136     try {
137       unreliable.failsTenTimesThenSucceeds();
138       fail("Should fail");
139     } catch (UnreliableException e) {
140       // expected
141     }
142   }
143 
144   @Test
testRetryUpToMaximumCountWithProportionalSleep()145   public void testRetryUpToMaximumCountWithProportionalSleep() throws UnreliableException {
146     UnreliableInterface unreliable = (UnreliableInterface)
147       RetryProxy.create(UnreliableInterface.class, unreliableImpl,
148                         retryUpToMaximumCountWithProportionalSleep(8, 1, TimeUnit.NANOSECONDS));
149     unreliable.alwaysSucceeds();
150     unreliable.failsOnceThenSucceeds();
151     try {
152       unreliable.failsTenTimesThenSucceeds();
153       fail("Should fail");
154     } catch (UnreliableException e) {
155       // expected
156     }
157   }
158 
159   @Test
testExponentialRetry()160   public void testExponentialRetry() throws UnreliableException {
161     UnreliableInterface unreliable = (UnreliableInterface)
162       RetryProxy.create(UnreliableInterface.class, unreliableImpl,
163                         exponentialBackoffRetry(5, 1L, TimeUnit.NANOSECONDS));
164     unreliable.alwaysSucceeds();
165     unreliable.failsOnceThenSucceeds();
166     try {
167       unreliable.failsTenTimesThenSucceeds();
168       fail("Should fail");
169     } catch (UnreliableException e) {
170       // expected
171     }
172   }
173 
174   @Test
testRetryByException()175   public void testRetryByException() throws UnreliableException {
176     Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
177       Collections.<Class<? extends Exception>, RetryPolicy>singletonMap(FatalException.class, TRY_ONCE_THEN_FAIL);
178 
179     UnreliableInterface unreliable = (UnreliableInterface)
180       RetryProxy.create(UnreliableInterface.class, unreliableImpl,
181                         retryByException(RETRY_FOREVER, exceptionToPolicyMap));
182     unreliable.failsOnceThenSucceeds();
183     try {
184       unreliable.alwaysFailsWithFatalException();
185       fail("Should fail");
186     } catch (FatalException e) {
187       // expected
188     }
189   }
190 
191   @Test
testRetryByRemoteException()192   public void testRetryByRemoteException() {
193     Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
194       Collections.<Class<? extends Exception>, RetryPolicy>singletonMap(FatalException.class, TRY_ONCE_THEN_FAIL);
195 
196     UnreliableInterface unreliable = (UnreliableInterface)
197       RetryProxy.create(UnreliableInterface.class, unreliableImpl,
198                         retryByRemoteException(RETRY_FOREVER, exceptionToPolicyMap));
199     try {
200       unreliable.alwaysFailsWithRemoteFatalException();
201       fail("Should fail");
202     } catch (RemoteException e) {
203       // expected
204     }
205   }
206 
207   @Test
testRetryInterruptible()208   public void testRetryInterruptible() throws Throwable {
209     final UnreliableInterface unreliable = (UnreliableInterface)
210         RetryProxy.create(UnreliableInterface.class, unreliableImpl,
211             retryUpToMaximumTimeWithFixedSleep(10, 10, TimeUnit.SECONDS));
212 
213     final CountDownLatch latch = new CountDownLatch(1);
214     final AtomicReference<Thread> futureThread = new AtomicReference<Thread>();
215     ExecutorService exec = Executors.newSingleThreadExecutor();
216     Future<Throwable> future = exec.submit(new Callable<Throwable>(){
217       @Override
218       public Throwable call() throws Exception {
219         futureThread.set(Thread.currentThread());
220         latch.countDown();
221         try {
222           unreliable.alwaysFailsWithFatalException();
223         } catch (UndeclaredThrowableException ute) {
224           return ute.getCause();
225         }
226         return null;
227       }
228     });
229     latch.await();
230     Thread.sleep(1000); // time to fail and sleep
231     assertTrue(futureThread.get().isAlive());
232     futureThread.get().interrupt();
233     Throwable e = future.get(1, TimeUnit.SECONDS); // should return immediately
234     assertNotNull(e);
235     assertEquals(InterruptedException.class, e.getClass());
236     assertEquals("sleep interrupted", e.getMessage());
237   }
238 }
239