1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.client;
20 
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.fail;
26 
27 import java.io.IOException;
28 import java.net.HttpURLConnection;
29 import java.net.URL;
30 
31 import javax.servlet.http.HttpServletResponse;
32 
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.ha.ClientBaseWithFixes;
37 import org.apache.hadoop.ha.HAServiceProtocol;
38 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
39 import org.apache.hadoop.service.Service.STATE;
40 import org.apache.hadoop.yarn.api.records.ApplicationId;
41 import org.apache.hadoop.yarn.client.api.YarnClient;
42 import org.apache.hadoop.yarn.conf.HAUtil;
43 import org.apache.hadoop.yarn.conf.YarnConfiguration;
44 import org.apache.hadoop.yarn.exceptions.YarnException;
45 import org.apache.hadoop.yarn.server.MiniYARNCluster;
46 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
47 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
48 import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
49 import org.apache.hadoop.yarn.webapp.YarnWebParams;
50 import org.junit.After;
51 import org.junit.Assert;
52 import org.junit.Before;
53 import org.junit.Test;
54 
55 public class TestRMFailover extends ClientBaseWithFixes {
56   private static final Log LOG =
57       LogFactory.getLog(TestRMFailover.class.getName());
58   private static final HAServiceProtocol.StateChangeRequestInfo req =
59       new HAServiceProtocol.StateChangeRequestInfo(
60           HAServiceProtocol.RequestSource.REQUEST_BY_USER);
61 
62   private static final String RM1_NODE_ID = "rm1";
63   private static final int RM1_PORT_BASE = 10000;
64   private static final String RM2_NODE_ID = "rm2";
65   private static final int RM2_PORT_BASE = 20000;
66 
67   private Configuration conf;
68   private MiniYARNCluster cluster;
69   private ApplicationId fakeAppId;
70 
71 
setConfForRM(String rmId, String prefix, String value)72   private void setConfForRM(String rmId, String prefix, String value) {
73     conf.set(HAUtil.addSuffix(prefix, rmId), value);
74   }
75 
setRpcAddressForRM(String rmId, int base)76   private void setRpcAddressForRM(String rmId, int base) {
77     setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" +
78         (base + YarnConfiguration.DEFAULT_RM_PORT));
79     setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" +
80         (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT));
81     setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" +
82         (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
83     setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "0.0.0.0:" +
84         (base + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT));
85     setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" +
86         (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT));
87     setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" +
88         (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT));
89   }
90 
91   @Before
setup()92   public void setup() throws IOException {
93     fakeAppId = ApplicationId.newInstance(System.currentTimeMillis(), 0);
94     conf = new YarnConfiguration();
95     conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
96     conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
97     setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE);
98     setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
99 
100     conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
101 
102     conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
103     conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
104 
105     cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 1, 1, 1);
106   }
107 
108   @After
teardown()109   public void teardown() {
110     cluster.stop();
111   }
112 
verifyClientConnection()113   private void verifyClientConnection() {
114     int numRetries = 3;
115     while(numRetries-- > 0) {
116       Configuration conf = new YarnConfiguration(this.conf);
117       YarnClient client = YarnClient.createYarnClient();
118       client.init(conf);
119       client.start();
120       try {
121         client.getApplications();
122         return;
123       } catch (Exception e) {
124         LOG.error(e);
125       } finally {
126         client.stop();
127       }
128     }
129     fail("Client couldn't connect to the Active RM");
130   }
131 
verifyConnections()132   private void verifyConnections() throws InterruptedException, YarnException {
133     assertTrue("NMs failed to connect to the RM",
134         cluster.waitForNodeManagersToConnect(20000));
135     verifyClientConnection();
136   }
137 
getAdminService(int index)138   private AdminService getAdminService(int index) {
139     return cluster.getResourceManager(index).getRMContext().getRMAdminService();
140   }
141 
explicitFailover()142   private void explicitFailover() throws IOException {
143     int activeRMIndex = cluster.getActiveRMIndex();
144     int newActiveRMIndex = (activeRMIndex + 1) % 2;
145     getAdminService(activeRMIndex).transitionToStandby(req);
146     getAdminService(newActiveRMIndex).transitionToActive(req);
147     assertEquals("Failover failed", newActiveRMIndex, cluster.getActiveRMIndex());
148   }
149 
failover()150   private void failover()
151       throws IOException, InterruptedException, YarnException {
152     int activeRMIndex = cluster.getActiveRMIndex();
153     cluster.stopResourceManager(activeRMIndex);
154     assertEquals("Failover failed",
155         (activeRMIndex + 1) % 2, cluster.getActiveRMIndex());
156     cluster.restartResourceManager(activeRMIndex);
157   }
158 
159   @Test
testExplicitFailover()160   public void testExplicitFailover()
161       throws YarnException, InterruptedException, IOException {
162     conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
163     cluster.init(conf);
164     cluster.start();
165     getAdminService(0).transitionToActive(req);
166     assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
167     verifyConnections();
168 
169     explicitFailover();
170     verifyConnections();
171 
172     explicitFailover();
173     verifyConnections();
174   }
175 
176   @Test
testAutomaticFailover()177   public void testAutomaticFailover()
178       throws YarnException, InterruptedException, IOException {
179     conf.set(YarnConfiguration.RM_CLUSTER_ID, "yarn-test-cluster");
180     conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
181     conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 2000);
182 
183     cluster.init(conf);
184     cluster.start();
185     assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
186     verifyConnections();
187 
188     failover();
189     verifyConnections();
190 
191     failover();
192     verifyConnections();
193 
194     // Make the current Active handle an RMFatalEvent,
195     // so it transitions to standby.
196     ResourceManager rm = cluster.getResourceManager(
197         cluster.getActiveRMIndex());
198     rm.handleTransitionToStandBy();
199     int maxWaitingAttempts = 2000;
200     while (maxWaitingAttempts-- > 0 ) {
201       if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) {
202         break;
203       }
204       Thread.sleep(1);
205     }
206     Assert.assertFalse("RM didn't transition to Standby ",
207         maxWaitingAttempts == 0);
208     verifyConnections();
209   }
210 
211   @Test
testWebAppProxyInStandAloneMode()212   public void testWebAppProxyInStandAloneMode() throws YarnException,
213       InterruptedException, IOException {
214     conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
215     WebAppProxyServer webAppProxyServer = new WebAppProxyServer();
216     try {
217       conf.set(YarnConfiguration.PROXY_ADDRESS, "0.0.0.0:9099");
218       cluster.init(conf);
219       cluster.start();
220       getAdminService(0).transitionToActive(req);
221       assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
222       verifyConnections();
223       webAppProxyServer.init(conf);
224 
225       // Start webAppProxyServer
226       Assert.assertEquals(STATE.INITED, webAppProxyServer.getServiceState());
227       webAppProxyServer.start();
228       Assert.assertEquals(STATE.STARTED, webAppProxyServer.getServiceState());
229 
230       // send httpRequest with fakeApplicationId
231       // expect to get "Not Found" response and 404 response code
232       URL wrongUrl = new URL("http://0.0.0.0:9099/proxy/" + fakeAppId);
233       HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl
234           .openConnection();
235 
236       proxyConn.connect();
237       verifyResponse(proxyConn);
238 
239       explicitFailover();
240       verifyConnections();
241       proxyConn.connect();
242       verifyResponse(proxyConn);
243     } finally {
244       webAppProxyServer.stop();
245     }
246   }
247 
248   @Test
testEmbeddedWebAppProxy()249   public void testEmbeddedWebAppProxy() throws YarnException,
250       InterruptedException, IOException {
251     conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
252     cluster.init(conf);
253     cluster.start();
254     getAdminService(0).transitionToActive(req);
255     assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
256     verifyConnections();
257 
258     // send httpRequest with fakeApplicationId
259     // expect to get "Not Found" response and 404 response code
260     URL wrongUrl = new URL("http://0.0.0.0:18088/proxy/" + fakeAppId);
261     HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl
262         .openConnection();
263 
264     proxyConn.connect();
265     verifyResponse(proxyConn);
266 
267     explicitFailover();
268     verifyConnections();
269     proxyConn.connect();
270     verifyResponse(proxyConn);
271   }
272 
verifyResponse(HttpURLConnection response)273   private void verifyResponse(HttpURLConnection response)
274       throws IOException {
275     assertEquals("Not Found", response.getResponseMessage());
276     assertEquals(404, response.getResponseCode());
277   }
278 
279   @Test
testRMWebAppRedirect()280   public void testRMWebAppRedirect() throws YarnException,
281       InterruptedException, IOException {
282     cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 0, 1, 1);
283     conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
284 
285     cluster.init(conf);
286     cluster.start();
287     getAdminService(0).transitionToActive(req);
288     String rm1Url = "http://0.0.0.0:18088";
289     String rm2Url = "http://0.0.0.0:28088";
290 
291     String redirectURL = getRedirectURL(rm2Url);
292     // if uri is null, RMWebAppFilter will append a slash at the trail of the redirection url
293     assertEquals(redirectURL,rm1Url+"/");
294 
295     redirectURL = getRedirectURL(rm2Url + "/metrics");
296     assertEquals(redirectURL,rm1Url + "/metrics");
297 
298     redirectURL = getRedirectURL(rm2Url + "/jmx");
299     assertEquals(redirectURL,rm1Url + "/jmx");
300 
301     // standby RM links /conf, /stacks, /logLevel, /static, /logs,
302     // /cluster/cluster as well as webService
303     // /ws/v1/cluster/info should not be redirected to active RM
304     redirectURL = getRedirectURL(rm2Url + "/cluster/cluster");
305     assertNull(redirectURL);
306 
307     redirectURL = getRedirectURL(rm2Url + "/conf");
308     assertNull(redirectURL);
309 
310     redirectURL = getRedirectURL(rm2Url + "/stacks");
311     assertNull(redirectURL);
312 
313     redirectURL = getRedirectURL(rm2Url + "/logLevel");
314     assertNull(redirectURL);
315 
316     redirectURL = getRedirectURL(rm2Url + "/static");
317     assertNull(redirectURL);
318 
319     redirectURL = getRedirectURL(rm2Url + "/logs");
320     assertNull(redirectURL);
321 
322     redirectURL = getRedirectURL(rm2Url + "/ws/v1/cluster/info");
323     assertNull(redirectURL);
324 
325     redirectURL = getRedirectURL(rm2Url + "/ws/v1/cluster/apps");
326     assertEquals(redirectURL, rm1Url + "/ws/v1/cluster/apps");
327 
328     redirectURL = getRedirectURL(rm2Url + "/proxy/" + fakeAppId);
329     assertNull(redirectURL);
330 
331     // transit the active RM to standby
332     // Both of RMs are in standby mode
333     getAdminService(0).transitionToStandby(req);
334     // RM2 is expected to send the httpRequest to itself.
335     // The Header Field: Refresh is expected to be set.
336     redirectURL = getRefreshURL(rm2Url);
337     assertTrue(redirectURL != null
338         && redirectURL.contains(YarnWebParams.NEXT_REFRESH_INTERVAL)
339         && redirectURL.contains(rm2Url));
340 
341   }
342 
343   // set up http connection with the given url and get the redirection url from the response
344   // return null if the url is not redirected
getRedirectURL(String url)345   static String getRedirectURL(String url) {
346     String redirectUrl = null;
347     try {
348       HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
349       // do not automatically follow the redirection
350       // otherwise we get too many redirections exception
351       conn.setInstanceFollowRedirects(false);
352       if(conn.getResponseCode() == HttpServletResponse.SC_TEMPORARY_REDIRECT)
353         redirectUrl = conn.getHeaderField("Location");
354     } catch (Exception e) {
355       // throw new RuntimeException(e);
356     }
357     return redirectUrl;
358   }
359 
getRefreshURL(String url)360   static String getRefreshURL(String url) {
361     String redirectUrl = null;
362     try {
363       HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
364       // do not automatically follow the redirection
365       // otherwise we get too many redirections exception
366       conn.setInstanceFollowRedirects(false);
367       redirectUrl = conn.getHeaderField("Refresh");
368     } catch (Exception e) {
369       // throw new RuntimeException(e);
370     }
371     return redirectUrl;
372   }
373 }
374