1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.tools.rumen;
19 
20 import java.util.Set;
21 import java.util.HashSet;
22 import java.util.Properties;
23 import java.util.StringTokenizer;
24 
25 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
26 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
27 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
28 import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
29 import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
30 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
31 
32 /**
33  * Building the cluster topology.
34  */
35 public class TopologyBuilder {
36   private Set<ParsedHost> allHosts = new HashSet<ParsedHost>();
37 
38   /**
39    * Process one {@link HistoryEvent}
40    *
41    * @param event
42    *          The {@link HistoryEvent} to be processed.
43    */
process(HistoryEvent event)44   public void process(HistoryEvent event) {
45     if (event instanceof TaskAttemptFinishedEvent) {
46       processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
47     } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
48       processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
49     } else if (event instanceof TaskStartedEvent) {
50       processTaskStartedEvent((TaskStartedEvent) event);
51     } else if (event instanceof MapAttemptFinishedEvent) {
52       processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
53     } else if (event instanceof ReduceAttemptFinishedEvent) {
54       processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
55     }
56 
57     // I do NOT expect these if statements to be exhaustive.
58   }
59 
60   /**
61    * Process a collection of JobConf {@link Properties}. We do not restrict it
62    * to be called once.
63    *
64    * @param conf
65    *          The job conf properties to be added.
66    */
process(Properties conf)67   public void process(Properties conf) {
68     // no code
69   }
70 
71   /**
72    * Request the builder to build the final object. Once called, the
73    * {@link TopologyBuilder} would accept no more events or job-conf properties.
74    *
75    * @return Parsed {@link LoggedNetworkTopology} object.
76    */
build()77   public LoggedNetworkTopology build() {
78     return new LoggedNetworkTopology(allHosts);
79   }
80 
processTaskStartedEvent(TaskStartedEvent event)81   private void processTaskStartedEvent(TaskStartedEvent event) {
82     preferredLocationForSplits(event.getSplitLocations());
83   }
84 
processTaskAttemptUnsuccessfulCompletionEvent( TaskAttemptUnsuccessfulCompletionEvent event)85   private void processTaskAttemptUnsuccessfulCompletionEvent(
86       TaskAttemptUnsuccessfulCompletionEvent event) {
87     recordParsedHost(event.getHostname(), event.getRackName());
88   }
89 
processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event)90   private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
91     recordParsedHost(event.getHostname(), event.getRackName());
92   }
93 
processMapAttemptFinishedEvent(MapAttemptFinishedEvent event)94   private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
95     recordParsedHost(event.getHostname(), event.getRackName());
96   }
97 
processReduceAttemptFinishedEvent(ReduceAttemptFinishedEvent event)98   private void processReduceAttemptFinishedEvent(ReduceAttemptFinishedEvent event) {
99     recordParsedHost(event.getHostname(), event.getRackName());
100   }
101 
recordParsedHost(String hostName, String rackName)102   private void recordParsedHost(String hostName, String rackName) {
103     if (hostName == null) {
104       return;
105     }
106     ParsedHost result = null;
107     if (rackName == null) {
108       result = ParsedHost.parse(hostName);
109     } else {
110       result = new ParsedHost(rackName, hostName);
111     }
112 
113 
114     if (result != null && !allHosts.contains(result)) {
115       allHosts.add(result);
116     }
117   }
118 
recordParsedHost(String nodeName)119   private void recordParsedHost(String nodeName) {
120     ParsedHost result = ParsedHost.parse(nodeName);
121 
122     if (result != null && !allHosts.contains(result)) {
123       allHosts.add(result);
124     }
125   }
126 
preferredLocationForSplits(String splits)127   private void preferredLocationForSplits(String splits) {
128     if (splits != null) {
129       StringTokenizer tok = new StringTokenizer(splits, ",", false);
130 
131       while (tok.hasMoreTokens()) {
132         String nextSplit = tok.nextToken();
133 
134         recordParsedHost(nextSplit);
135       }
136     }
137   }
138 }
139