1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.tools.rumen; 19 20 import java.util.Set; 21 import java.util.HashSet; 22 import java.util.Properties; 23 import java.util.StringTokenizer; 24 25 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; 26 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent; 27 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; 28 import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent; 29 import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent; 30 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; 31 32 /** 33 * Building the cluster topology. 34 */ 35 public class TopologyBuilder { 36 private Set<ParsedHost> allHosts = new HashSet<ParsedHost>(); 37 38 /** 39 * Process one {@link HistoryEvent} 40 * 41 * @param event 42 * The {@link HistoryEvent} to be processed. 43 */ process(HistoryEvent event)44 public void process(HistoryEvent event) { 45 if (event instanceof TaskAttemptFinishedEvent) { 46 processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event); 47 } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) { 48 processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event); 49 } else if (event instanceof TaskStartedEvent) { 50 processTaskStartedEvent((TaskStartedEvent) event); 51 } else if (event instanceof MapAttemptFinishedEvent) { 52 processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event); 53 } else if (event instanceof ReduceAttemptFinishedEvent) { 54 processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event); 55 } 56 57 // I do NOT expect these if statements to be exhaustive. 58 } 59 60 /** 61 * Process a collection of JobConf {@link Properties}. We do not restrict it 62 * to be called once. 63 * 64 * @param conf 65 * The job conf properties to be added. 66 */ process(Properties conf)67 public void process(Properties conf) { 68 // no code 69 } 70 71 /** 72 * Request the builder to build the final object. Once called, the 73 * {@link TopologyBuilder} would accept no more events or job-conf properties. 74 * 75 * @return Parsed {@link LoggedNetworkTopology} object. 76 */ build()77 public LoggedNetworkTopology build() { 78 return new LoggedNetworkTopology(allHosts); 79 } 80 processTaskStartedEvent(TaskStartedEvent event)81 private void processTaskStartedEvent(TaskStartedEvent event) { 82 preferredLocationForSplits(event.getSplitLocations()); 83 } 84 processTaskAttemptUnsuccessfulCompletionEvent( TaskAttemptUnsuccessfulCompletionEvent event)85 private void processTaskAttemptUnsuccessfulCompletionEvent( 86 TaskAttemptUnsuccessfulCompletionEvent event) { 87 recordParsedHost(event.getHostname(), event.getRackName()); 88 } 89 processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event)90 private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) { 91 recordParsedHost(event.getHostname(), event.getRackName()); 92 } 93 processMapAttemptFinishedEvent(MapAttemptFinishedEvent event)94 private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) { 95 recordParsedHost(event.getHostname(), event.getRackName()); 96 } 97 processReduceAttemptFinishedEvent(ReduceAttemptFinishedEvent event)98 private void processReduceAttemptFinishedEvent(ReduceAttemptFinishedEvent event) { 99 recordParsedHost(event.getHostname(), event.getRackName()); 100 } 101 recordParsedHost(String hostName, String rackName)102 private void recordParsedHost(String hostName, String rackName) { 103 if (hostName == null) { 104 return; 105 } 106 ParsedHost result = null; 107 if (rackName == null) { 108 result = ParsedHost.parse(hostName); 109 } else { 110 result = new ParsedHost(rackName, hostName); 111 } 112 113 114 if (result != null && !allHosts.contains(result)) { 115 allHosts.add(result); 116 } 117 } 118 recordParsedHost(String nodeName)119 private void recordParsedHost(String nodeName) { 120 ParsedHost result = ParsedHost.parse(nodeName); 121 122 if (result != null && !allHosts.contains(result)) { 123 allHosts.add(result); 124 } 125 } 126 preferredLocationForSplits(String splits)127 private void preferredLocationForSplits(String splits) { 128 if (splits != null) { 129 StringTokenizer tok = new StringTokenizer(splits, ",", false); 130 131 while (tok.hasMoreTokens()) { 132 String nextSplit = tok.nextToken(); 133 134 recordParsedHost(nextSplit); 135 } 136 } 137 } 138 } 139