1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.mapred;
20 
21 import org.mortbay.jetty.Server;
22 import org.mortbay.jetty.servlet.Context;
23 import org.mortbay.jetty.servlet.ServletHolder;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.io.Text;
27 import org.apache.hadoop.io.IntWritable;
28 
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31 import javax.servlet.http.HttpServlet;
32 import javax.servlet.ServletException;
33 import java.io.IOException;
34 import java.io.DataOutputStream;
35 import java.util.Date;
36 
37 /**
38  * Base class to test Job end notification in local and cluster mode.
39  *
40  * Starts up hadoop on Local or Cluster mode (by extending of the
41  * HadoopTestCase class) and it starts a servlet engine that hosts
42  * a servlet that will receive the notification of job finalization.
43  *
44  * The notification servlet returns a HTTP 400 the first time is called
45  * and a HTTP 200 the second time, thus testing retry.
46  *
47  * In both cases local file system is used (this is irrelevant for
48  * the tested functionality)
49  *
50  *
51  */
52 public abstract class NotificationTestCase extends HadoopTestCase {
53 
stdPrintln(String s)54   private static void stdPrintln(String s) {
55     //System.out.println(s);
56   }
57 
NotificationTestCase(int mode)58   protected NotificationTestCase(int mode) throws IOException {
59     super(mode, HadoopTestCase.LOCAL_FS, 1, 1);
60   }
61 
62   private int port;
63   private String contextPath = "/notification";
64   private Class servletClass = NotificationServlet.class;
65   private String servletPath = "/mapred";
66   private Server webServer;
67 
startHttpServer()68   private void startHttpServer() throws Exception {
69 
70     // Create the webServer
71     if (webServer != null) {
72       webServer.stop();
73       webServer = null;
74     }
75     webServer = new Server(0);
76 
77     Context context = new Context(webServer, contextPath);
78 
79     // create servlet handler
80     context.addServlet(new ServletHolder(new NotificationServlet()),
81                        servletPath);
82 
83     // Start webServer
84     webServer.start();
85     port = webServer.getConnectors()[0].getLocalPort();
86 
87   }
88 
stopHttpServer()89   private void stopHttpServer() throws Exception {
90     if (webServer != null) {
91       webServer.stop();
92       webServer.destroy();
93       webServer = null;
94     }
95   }
96 
97   public static class NotificationServlet extends HttpServlet {
98     public static volatile int counter = 0;
99 
doGet(HttpServletRequest req, HttpServletResponse res)100     protected void doGet(HttpServletRequest req, HttpServletResponse res)
101       throws ServletException, IOException {
102       switch (counter) {
103         case 0:
104         {
105           assertTrue(req.getQueryString().contains("SUCCEEDED"));
106         }
107         break;
108         case 2:
109         {
110           assertTrue(req.getQueryString().contains("KILLED"));
111         }
112         break;
113         case 4:
114         {
115           assertTrue(req.getQueryString().contains("FAILED"));
116         }
117         break;
118       }
119       if (counter % 2 == 0) {
120         stdPrintln((new Date()).toString() +
121                    "Receiving First notification for [" + req.getQueryString() +
122                    "], returning error");
123         res.sendError(HttpServletResponse.SC_BAD_REQUEST, "forcing error");
124       }
125       else {
126         stdPrintln((new Date()).toString() +
127                    "Receiving Second notification for [" + req.getQueryString() +
128                    "], returning OK");
129         res.setStatus(HttpServletResponse.SC_OK);
130       }
131       counter++;
132     }
133   }
134 
getNotificationUrlTemplate()135   private String getNotificationUrlTemplate() {
136     return "http://localhost:" + port + contextPath + servletPath +
137       "?jobId=$jobId&jobStatus=$jobStatus";
138   }
139 
createJobConf()140   protected JobConf createJobConf() {
141     JobConf conf = super.createJobConf();
142     conf.setJobEndNotificationURI(getNotificationUrlTemplate());
143     conf.setInt("job.end.retry.attempts", 3);
144     conf.setInt("job.end.retry.interval", 200);
145     return conf;
146   }
147 
148 
setUp()149   protected void setUp() throws Exception {
150     super.setUp();
151     startHttpServer();
152   }
153 
tearDown()154   protected void tearDown() throws Exception {
155     stopHttpServer();
156     super.tearDown();
157   }
158 
testMR()159   public void testMR() throws Exception {
160     System.out.println(launchWordCount(this.createJobConf(),
161                                        "a b c d e f g h", 1, 1));
162     boolean keepTrying = true;
163     for (int tries = 0; tries < 30 && keepTrying; tries++) {
164       Thread.sleep(50);
165       keepTrying = !(NotificationServlet.counter == 2);
166     }
167     assertEquals(2, NotificationServlet.counter);
168 
169     Path inDir = new Path("notificationjob/input");
170     Path outDir = new Path("notificationjob/output");
171 
172     // Hack for local FS that does not have the concept of a 'mounting point'
173     if (isLocalFS()) {
174       String localPathRoot = System.getProperty("test.build.data","/tmp")
175         .toString().replace(' ', '+');;
176       inDir = new Path(localPathRoot, inDir);
177       outDir = new Path(localPathRoot, outDir);
178     }
179 
180     // run a job with KILLED status
181     System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir,
182                                                 outDir).getID());
183     keepTrying = true;
184     for (int tries = 0; tries < 30 && keepTrying; tries++) {
185       Thread.sleep(50);
186       keepTrying = !(NotificationServlet.counter == 4);
187     }
188     assertEquals(4, NotificationServlet.counter);
189 
190     // run a job with FAILED status
191     System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir,
192                                                 outDir).getID());
193     keepTrying = true;
194     for (int tries = 0; tries < 30 && keepTrying; tries++) {
195       Thread.sleep(50);
196       keepTrying = !(NotificationServlet.counter == 6);
197     }
198     assertEquals(6, NotificationServlet.counter);
199   }
200 
launchWordCount(JobConf conf, String input, int numMaps, int numReduces)201   private String launchWordCount(JobConf conf,
202                                  String input,
203                                  int numMaps,
204                                  int numReduces) throws IOException {
205     Path inDir = new Path("testing/wc/input");
206     Path outDir = new Path("testing/wc/output");
207 
208     // Hack for local FS that does not have the concept of a 'mounting point'
209     if (isLocalFS()) {
210       String localPathRoot = System.getProperty("test.build.data","/tmp")
211         .toString().replace(' ', '+');;
212       inDir = new Path(localPathRoot, inDir);
213       outDir = new Path(localPathRoot, outDir);
214     }
215 
216     FileSystem fs = FileSystem.get(conf);
217     fs.delete(outDir, true);
218     if (!fs.mkdirs(inDir)) {
219       throw new IOException("Mkdirs failed to create " + inDir.toString());
220     }
221     {
222       DataOutputStream file = fs.create(new Path(inDir, "part-0"));
223       file.writeBytes(input);
224       file.close();
225     }
226     conf.setJobName("wordcount");
227     conf.setInputFormat(TextInputFormat.class);
228 
229     // the keys are words (strings)
230     conf.setOutputKeyClass(Text.class);
231     // the values are counts (ints)
232     conf.setOutputValueClass(IntWritable.class);
233 
234     conf.setMapperClass(WordCount.MapClass.class);
235     conf.setCombinerClass(WordCount.Reduce.class);
236     conf.setReducerClass(WordCount.Reduce.class);
237 
238     FileInputFormat.setInputPaths(conf, inDir);
239     FileOutputFormat.setOutputPath(conf, outDir);
240     conf.setNumMapTasks(numMaps);
241     conf.setNumReduceTasks(numReduces);
242     JobClient.runJob(conf);
243     return TestMiniMRWithDFS.readOutput(outDir, conf);
244   }
245 
246 }
247