1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapred; 20 21 import org.mortbay.jetty.Server; 22 import org.mortbay.jetty.servlet.Context; 23 import org.mortbay.jetty.servlet.ServletHolder; 24 import org.apache.hadoop.fs.Path; 25 import org.apache.hadoop.fs.FileSystem; 26 import org.apache.hadoop.io.Text; 27 import org.apache.hadoop.io.IntWritable; 28 29 import javax.servlet.http.HttpServletRequest; 30 import javax.servlet.http.HttpServletResponse; 31 import javax.servlet.http.HttpServlet; 32 import javax.servlet.ServletException; 33 import java.io.IOException; 34 import java.io.DataOutputStream; 35 import java.util.Date; 36 37 /** 38 * Base class to test Job end notification in local and cluster mode. 39 * 40 * Starts up hadoop on Local or Cluster mode (by extending of the 41 * HadoopTestCase class) and it starts a servlet engine that hosts 42 * a servlet that will receive the notification of job finalization. 43 * 44 * The notification servlet returns a HTTP 400 the first time is called 45 * and a HTTP 200 the second time, thus testing retry. 46 * 47 * In both cases local file system is used (this is irrelevant for 48 * the tested functionality) 49 * 50 * 51 */ 52 public abstract class NotificationTestCase extends HadoopTestCase { 53 stdPrintln(String s)54 private static void stdPrintln(String s) { 55 //System.out.println(s); 56 } 57 NotificationTestCase(int mode)58 protected NotificationTestCase(int mode) throws IOException { 59 super(mode, HadoopTestCase.LOCAL_FS, 1, 1); 60 } 61 62 private int port; 63 private String contextPath = "/notification"; 64 private Class servletClass = NotificationServlet.class; 65 private String servletPath = "/mapred"; 66 private Server webServer; 67 startHttpServer()68 private void startHttpServer() throws Exception { 69 70 // Create the webServer 71 if (webServer != null) { 72 webServer.stop(); 73 webServer = null; 74 } 75 webServer = new Server(0); 76 77 Context context = new Context(webServer, contextPath); 78 79 // create servlet handler 80 context.addServlet(new ServletHolder(new NotificationServlet()), 81 servletPath); 82 83 // Start webServer 84 webServer.start(); 85 port = webServer.getConnectors()[0].getLocalPort(); 86 87 } 88 stopHttpServer()89 private void stopHttpServer() throws Exception { 90 if (webServer != null) { 91 webServer.stop(); 92 webServer.destroy(); 93 webServer = null; 94 } 95 } 96 97 public static class NotificationServlet extends HttpServlet { 98 public static volatile int counter = 0; 99 doGet(HttpServletRequest req, HttpServletResponse res)100 protected void doGet(HttpServletRequest req, HttpServletResponse res) 101 throws ServletException, IOException { 102 switch (counter) { 103 case 0: 104 { 105 assertTrue(req.getQueryString().contains("SUCCEEDED")); 106 } 107 break; 108 case 2: 109 { 110 assertTrue(req.getQueryString().contains("KILLED")); 111 } 112 break; 113 case 4: 114 { 115 assertTrue(req.getQueryString().contains("FAILED")); 116 } 117 break; 118 } 119 if (counter % 2 == 0) { 120 stdPrintln((new Date()).toString() + 121 "Receiving First notification for [" + req.getQueryString() + 122 "], returning error"); 123 res.sendError(HttpServletResponse.SC_BAD_REQUEST, "forcing error"); 124 } 125 else { 126 stdPrintln((new Date()).toString() + 127 "Receiving Second notification for [" + req.getQueryString() + 128 "], returning OK"); 129 res.setStatus(HttpServletResponse.SC_OK); 130 } 131 counter++; 132 } 133 } 134 getNotificationUrlTemplate()135 private String getNotificationUrlTemplate() { 136 return "http://localhost:" + port + contextPath + servletPath + 137 "?jobId=$jobId&jobStatus=$jobStatus"; 138 } 139 createJobConf()140 protected JobConf createJobConf() { 141 JobConf conf = super.createJobConf(); 142 conf.setJobEndNotificationURI(getNotificationUrlTemplate()); 143 conf.setInt("job.end.retry.attempts", 3); 144 conf.setInt("job.end.retry.interval", 200); 145 return conf; 146 } 147 148 setUp()149 protected void setUp() throws Exception { 150 super.setUp(); 151 startHttpServer(); 152 } 153 tearDown()154 protected void tearDown() throws Exception { 155 stopHttpServer(); 156 super.tearDown(); 157 } 158 testMR()159 public void testMR() throws Exception { 160 System.out.println(launchWordCount(this.createJobConf(), 161 "a b c d e f g h", 1, 1)); 162 boolean keepTrying = true; 163 for (int tries = 0; tries < 30 && keepTrying; tries++) { 164 Thread.sleep(50); 165 keepTrying = !(NotificationServlet.counter == 2); 166 } 167 assertEquals(2, NotificationServlet.counter); 168 169 Path inDir = new Path("notificationjob/input"); 170 Path outDir = new Path("notificationjob/output"); 171 172 // Hack for local FS that does not have the concept of a 'mounting point' 173 if (isLocalFS()) { 174 String localPathRoot = System.getProperty("test.build.data","/tmp") 175 .toString().replace(' ', '+');; 176 inDir = new Path(localPathRoot, inDir); 177 outDir = new Path(localPathRoot, outDir); 178 } 179 180 // run a job with KILLED status 181 System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir, 182 outDir).getID()); 183 keepTrying = true; 184 for (int tries = 0; tries < 30 && keepTrying; tries++) { 185 Thread.sleep(50); 186 keepTrying = !(NotificationServlet.counter == 4); 187 } 188 assertEquals(4, NotificationServlet.counter); 189 190 // run a job with FAILED status 191 System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir, 192 outDir).getID()); 193 keepTrying = true; 194 for (int tries = 0; tries < 30 && keepTrying; tries++) { 195 Thread.sleep(50); 196 keepTrying = !(NotificationServlet.counter == 6); 197 } 198 assertEquals(6, NotificationServlet.counter); 199 } 200 launchWordCount(JobConf conf, String input, int numMaps, int numReduces)201 private String launchWordCount(JobConf conf, 202 String input, 203 int numMaps, 204 int numReduces) throws IOException { 205 Path inDir = new Path("testing/wc/input"); 206 Path outDir = new Path("testing/wc/output"); 207 208 // Hack for local FS that does not have the concept of a 'mounting point' 209 if (isLocalFS()) { 210 String localPathRoot = System.getProperty("test.build.data","/tmp") 211 .toString().replace(' ', '+');; 212 inDir = new Path(localPathRoot, inDir); 213 outDir = new Path(localPathRoot, outDir); 214 } 215 216 FileSystem fs = FileSystem.get(conf); 217 fs.delete(outDir, true); 218 if (!fs.mkdirs(inDir)) { 219 throw new IOException("Mkdirs failed to create " + inDir.toString()); 220 } 221 { 222 DataOutputStream file = fs.create(new Path(inDir, "part-0")); 223 file.writeBytes(input); 224 file.close(); 225 } 226 conf.setJobName("wordcount"); 227 conf.setInputFormat(TextInputFormat.class); 228 229 // the keys are words (strings) 230 conf.setOutputKeyClass(Text.class); 231 // the values are counts (ints) 232 conf.setOutputValueClass(IntWritable.class); 233 234 conf.setMapperClass(WordCount.MapClass.class); 235 conf.setCombinerClass(WordCount.Reduce.class); 236 conf.setReducerClass(WordCount.Reduce.class); 237 238 FileInputFormat.setInputPaths(conf, inDir); 239 FileOutputFormat.setOutputPath(conf, outDir); 240 conf.setNumMapTasks(numMaps); 241 conf.setNumReduceTasks(numReduces); 242 JobClient.runJob(conf); 243 return TestMiniMRWithDFS.readOutput(outDir, conf); 244 } 245 246 } 247