1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.client; 20 21 import static org.junit.Assert.assertEquals; 22 import static org.junit.Assert.assertFalse; 23 import static org.junit.Assert.assertTrue; 24 import static org.junit.Assert.assertNull; 25 import static org.junit.Assert.fail; 26 27 import java.io.IOException; 28 import java.net.HttpURLConnection; 29 import java.net.URL; 30 31 import javax.servlet.http.HttpServletResponse; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.hadoop.conf.Configuration; 36 import org.apache.hadoop.ha.ClientBaseWithFixes; 37 import org.apache.hadoop.ha.HAServiceProtocol; 38 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; 39 import org.apache.hadoop.service.Service.STATE; 40 import org.apache.hadoop.yarn.api.records.ApplicationId; 41 import org.apache.hadoop.yarn.client.api.YarnClient; 42 import org.apache.hadoop.yarn.conf.HAUtil; 43 import org.apache.hadoop.yarn.conf.YarnConfiguration; 44 import org.apache.hadoop.yarn.exceptions.YarnException; 45 import org.apache.hadoop.yarn.server.MiniYARNCluster; 46 import org.apache.hadoop.yarn.server.resourcemanager.AdminService; 47 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; 48 import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; 49 import org.apache.hadoop.yarn.webapp.YarnWebParams; 50 import org.junit.After; 51 import org.junit.Assert; 52 import org.junit.Before; 53 import org.junit.Test; 54 55 public class TestRMFailover extends ClientBaseWithFixes { 56 private static final Log LOG = 57 LogFactory.getLog(TestRMFailover.class.getName()); 58 private static final HAServiceProtocol.StateChangeRequestInfo req = 59 new HAServiceProtocol.StateChangeRequestInfo( 60 HAServiceProtocol.RequestSource.REQUEST_BY_USER); 61 62 private static final String RM1_NODE_ID = "rm1"; 63 private static final int RM1_PORT_BASE = 10000; 64 private static final String RM2_NODE_ID = "rm2"; 65 private static final int RM2_PORT_BASE = 20000; 66 67 private Configuration conf; 68 private MiniYARNCluster cluster; 69 private ApplicationId fakeAppId; 70 71 setConfForRM(String rmId, String prefix, String value)72 private void setConfForRM(String rmId, String prefix, String value) { 73 conf.set(HAUtil.addSuffix(prefix, rmId), value); 74 } 75 setRpcAddressForRM(String rmId, int base)76 private void setRpcAddressForRM(String rmId, int base) { 77 setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" + 78 (base + YarnConfiguration.DEFAULT_RM_PORT)); 79 setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" + 80 (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT)); 81 setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" + 82 (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT)); 83 setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "0.0.0.0:" + 84 (base + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT)); 85 setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" + 86 (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT)); 87 setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" + 88 (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT)); 89 } 90 91 @Before setup()92 public void setup() throws IOException { 93 fakeAppId = ApplicationId.newInstance(System.currentTimeMillis(), 0); 94 conf = new YarnConfiguration(); 95 conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); 96 conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); 97 setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE); 98 setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE); 99 100 conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); 101 102 conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); 103 conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); 104 105 cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 1, 1, 1); 106 } 107 108 @After teardown()109 public void teardown() { 110 cluster.stop(); 111 } 112 verifyClientConnection()113 private void verifyClientConnection() { 114 int numRetries = 3; 115 while(numRetries-- > 0) { 116 Configuration conf = new YarnConfiguration(this.conf); 117 YarnClient client = YarnClient.createYarnClient(); 118 client.init(conf); 119 client.start(); 120 try { 121 client.getApplications(); 122 return; 123 } catch (Exception e) { 124 LOG.error(e); 125 } finally { 126 client.stop(); 127 } 128 } 129 fail("Client couldn't connect to the Active RM"); 130 } 131 verifyConnections()132 private void verifyConnections() throws InterruptedException, YarnException { 133 assertTrue("NMs failed to connect to the RM", 134 cluster.waitForNodeManagersToConnect(20000)); 135 verifyClientConnection(); 136 } 137 getAdminService(int index)138 private AdminService getAdminService(int index) { 139 return cluster.getResourceManager(index).getRMContext().getRMAdminService(); 140 } 141 explicitFailover()142 private void explicitFailover() throws IOException { 143 int activeRMIndex = cluster.getActiveRMIndex(); 144 int newActiveRMIndex = (activeRMIndex + 1) % 2; 145 getAdminService(activeRMIndex).transitionToStandby(req); 146 getAdminService(newActiveRMIndex).transitionToActive(req); 147 assertEquals("Failover failed", newActiveRMIndex, cluster.getActiveRMIndex()); 148 } 149 failover()150 private void failover() 151 throws IOException, InterruptedException, YarnException { 152 int activeRMIndex = cluster.getActiveRMIndex(); 153 cluster.stopResourceManager(activeRMIndex); 154 assertEquals("Failover failed", 155 (activeRMIndex + 1) % 2, cluster.getActiveRMIndex()); 156 cluster.restartResourceManager(activeRMIndex); 157 } 158 159 @Test testExplicitFailover()160 public void testExplicitFailover() 161 throws YarnException, InterruptedException, IOException { 162 conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); 163 cluster.init(conf); 164 cluster.start(); 165 getAdminService(0).transitionToActive(req); 166 assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); 167 verifyConnections(); 168 169 explicitFailover(); 170 verifyConnections(); 171 172 explicitFailover(); 173 verifyConnections(); 174 } 175 176 @Test testAutomaticFailover()177 public void testAutomaticFailover() 178 throws YarnException, InterruptedException, IOException { 179 conf.set(YarnConfiguration.RM_CLUSTER_ID, "yarn-test-cluster"); 180 conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); 181 conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 2000); 182 183 cluster.init(conf); 184 cluster.start(); 185 assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); 186 verifyConnections(); 187 188 failover(); 189 verifyConnections(); 190 191 failover(); 192 verifyConnections(); 193 194 // Make the current Active handle an RMFatalEvent, 195 // so it transitions to standby. 196 ResourceManager rm = cluster.getResourceManager( 197 cluster.getActiveRMIndex()); 198 rm.handleTransitionToStandBy(); 199 int maxWaitingAttempts = 2000; 200 while (maxWaitingAttempts-- > 0 ) { 201 if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) { 202 break; 203 } 204 Thread.sleep(1); 205 } 206 Assert.assertFalse("RM didn't transition to Standby ", 207 maxWaitingAttempts == 0); 208 verifyConnections(); 209 } 210 211 @Test testWebAppProxyInStandAloneMode()212 public void testWebAppProxyInStandAloneMode() throws YarnException, 213 InterruptedException, IOException { 214 conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); 215 WebAppProxyServer webAppProxyServer = new WebAppProxyServer(); 216 try { 217 conf.set(YarnConfiguration.PROXY_ADDRESS, "0.0.0.0:9099"); 218 cluster.init(conf); 219 cluster.start(); 220 getAdminService(0).transitionToActive(req); 221 assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); 222 verifyConnections(); 223 webAppProxyServer.init(conf); 224 225 // Start webAppProxyServer 226 Assert.assertEquals(STATE.INITED, webAppProxyServer.getServiceState()); 227 webAppProxyServer.start(); 228 Assert.assertEquals(STATE.STARTED, webAppProxyServer.getServiceState()); 229 230 // send httpRequest with fakeApplicationId 231 // expect to get "Not Found" response and 404 response code 232 URL wrongUrl = new URL("http://0.0.0.0:9099/proxy/" + fakeAppId); 233 HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl 234 .openConnection(); 235 236 proxyConn.connect(); 237 verifyResponse(proxyConn); 238 239 explicitFailover(); 240 verifyConnections(); 241 proxyConn.connect(); 242 verifyResponse(proxyConn); 243 } finally { 244 webAppProxyServer.stop(); 245 } 246 } 247 248 @Test testEmbeddedWebAppProxy()249 public void testEmbeddedWebAppProxy() throws YarnException, 250 InterruptedException, IOException { 251 conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); 252 cluster.init(conf); 253 cluster.start(); 254 getAdminService(0).transitionToActive(req); 255 assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); 256 verifyConnections(); 257 258 // send httpRequest with fakeApplicationId 259 // expect to get "Not Found" response and 404 response code 260 URL wrongUrl = new URL("http://0.0.0.0:18088/proxy/" + fakeAppId); 261 HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl 262 .openConnection(); 263 264 proxyConn.connect(); 265 verifyResponse(proxyConn); 266 267 explicitFailover(); 268 verifyConnections(); 269 proxyConn.connect(); 270 verifyResponse(proxyConn); 271 } 272 verifyResponse(HttpURLConnection response)273 private void verifyResponse(HttpURLConnection response) 274 throws IOException { 275 assertEquals("Not Found", response.getResponseMessage()); 276 assertEquals(404, response.getResponseCode()); 277 } 278 279 @Test testRMWebAppRedirect()280 public void testRMWebAppRedirect() throws YarnException, 281 InterruptedException, IOException { 282 cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 0, 1, 1); 283 conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); 284 285 cluster.init(conf); 286 cluster.start(); 287 getAdminService(0).transitionToActive(req); 288 String rm1Url = "http://0.0.0.0:18088"; 289 String rm2Url = "http://0.0.0.0:28088"; 290 291 String redirectURL = getRedirectURL(rm2Url); 292 // if uri is null, RMWebAppFilter will append a slash at the trail of the redirection url 293 assertEquals(redirectURL,rm1Url+"/"); 294 295 redirectURL = getRedirectURL(rm2Url + "/metrics"); 296 assertEquals(redirectURL,rm1Url + "/metrics"); 297 298 redirectURL = getRedirectURL(rm2Url + "/jmx"); 299 assertEquals(redirectURL,rm1Url + "/jmx"); 300 301 // standby RM links /conf, /stacks, /logLevel, /static, /logs, 302 // /cluster/cluster as well as webService 303 // /ws/v1/cluster/info should not be redirected to active RM 304 redirectURL = getRedirectURL(rm2Url + "/cluster/cluster"); 305 assertNull(redirectURL); 306 307 redirectURL = getRedirectURL(rm2Url + "/conf"); 308 assertNull(redirectURL); 309 310 redirectURL = getRedirectURL(rm2Url + "/stacks"); 311 assertNull(redirectURL); 312 313 redirectURL = getRedirectURL(rm2Url + "/logLevel"); 314 assertNull(redirectURL); 315 316 redirectURL = getRedirectURL(rm2Url + "/static"); 317 assertNull(redirectURL); 318 319 redirectURL = getRedirectURL(rm2Url + "/logs"); 320 assertNull(redirectURL); 321 322 redirectURL = getRedirectURL(rm2Url + "/ws/v1/cluster/info"); 323 assertNull(redirectURL); 324 325 redirectURL = getRedirectURL(rm2Url + "/ws/v1/cluster/apps"); 326 assertEquals(redirectURL, rm1Url + "/ws/v1/cluster/apps"); 327 328 redirectURL = getRedirectURL(rm2Url + "/proxy/" + fakeAppId); 329 assertNull(redirectURL); 330 331 // transit the active RM to standby 332 // Both of RMs are in standby mode 333 getAdminService(0).transitionToStandby(req); 334 // RM2 is expected to send the httpRequest to itself. 335 // The Header Field: Refresh is expected to be set. 336 redirectURL = getRefreshURL(rm2Url); 337 assertTrue(redirectURL != null 338 && redirectURL.contains(YarnWebParams.NEXT_REFRESH_INTERVAL) 339 && redirectURL.contains(rm2Url)); 340 341 } 342 343 // set up http connection with the given url and get the redirection url from the response 344 // return null if the url is not redirected getRedirectURL(String url)345 static String getRedirectURL(String url) { 346 String redirectUrl = null; 347 try { 348 HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection(); 349 // do not automatically follow the redirection 350 // otherwise we get too many redirections exception 351 conn.setInstanceFollowRedirects(false); 352 if(conn.getResponseCode() == HttpServletResponse.SC_TEMPORARY_REDIRECT) 353 redirectUrl = conn.getHeaderField("Location"); 354 } catch (Exception e) { 355 // throw new RuntimeException(e); 356 } 357 return redirectUrl; 358 } 359 getRefreshURL(String url)360 static String getRefreshURL(String url) { 361 String redirectUrl = null; 362 try { 363 HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection(); 364 // do not automatically follow the redirection 365 // otherwise we get too many redirections exception 366 conn.setInstanceFollowRedirects(false); 367 redirectUrl = conn.getHeaderField("Refresh"); 368 } catch (Exception e) { 369 // throw new RuntimeException(e); 370 } 371 return redirectUrl; 372 } 373 } 374