1#Licensed to the Apache Software Foundation (ASF) under one 2#or more contributor license agreements. See the NOTICE file 3#distributed with this work for additional information 4#regarding copyright ownership. The ASF licenses this file 5#to you under the Apache License, Version 2.0 (the 6#"License"); you may not use this file except in compliance 7#with the License. You may obtain a copy of the License at 8 9# http://www.apache.org/licenses/LICENSE-2.0 10 11#Unless required by applicable law or agreed to in writing, software 12#distributed under the License is distributed on an "AS IS" BASIS, 13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14#See the License for the specific language governing permissions and 15#limitations under the License. 16"""define MapReduce as subclass of Service""" 17 18# -*- python -*- 19 20import os, copy, time 21 22from service import * 23from hodlib.Hod.nodePool import * 24from hodlib.Common.desc import CommandDesc 25from hodlib.Common.util import get_exception_string, parseEquals 26 27class MapReduceExternal(MasterSlave): 28 """dummy proxy to external MapReduce instance""" 29 30 def __init__(self, serviceDesc, workDirs, version): 31 MasterSlave.__init__(self, serviceDesc, workDirs,None) 32 self.launchedMaster = True 33 self.masterInitialized = True 34 self.version = version 35 36 def getMasterRequest(self): 37 return None 38 39 def getMasterCommands(self, serviceDict): 40 return [] 41 42 def getAdminCommands(self, serviceDict): 43 return [] 44 45 def getWorkerCommands(self, serviceDict): 46 return [] 47 48 def getMasterAddrs(self): 49 attrs = self.serviceDesc.getfinalAttrs() 50 addr = attrs['mapred.job.tracker'] 51 return [addr] 52 53 def needsMore(self): 54 return 0 55 56 def needsLess(self): 57 return 0 58 59 def setMasterParams(self, dict): 60 self.serviceDesc['final-attrs']['mapred.job.tracker'] = "%s:%s" % (dict['host'], 61 dict['tracker_port']) 62 63 if self.version < 16: 64 self.serviceDesc.dict['final-attrs']['mapred.job.tracker.info.port'] = \ 65 str(self.serviceDesc.dict['info_port']) 66 else: 67 # After Hadoop-2185 68 self.serviceDesc['final-attrs']['mapred.job.tracker.http.address'] = \ 69 "%s:%s" %(dict['host'], dict['info_port']) 70 71 def getInfoAddrs(self): 72 attrs = self.serviceDesc.getfinalAttrs() 73 if self.version < 16: 74 addr = attrs['mapred.job.tracker'] 75 k,v = addr.split( ":") 76 infoaddr = k + ':' + attrs['mapred.job.tracker.info.port'] 77 else: 78 # After Hadoop-2185 79 # Note: earlier,we never respected mapred.job.tracker.http.address 80 infoaddr = attrs['mapred.job.tracker.http.address'] 81 return [infoaddr] 82 83class MapReduce(MasterSlave): 84 85 def __init__(self, serviceDesc, workDirs,required_node, version, 86 workers_per_ring = 1): 87 MasterSlave.__init__(self, serviceDesc, workDirs,required_node) 88 89 self.masterNode = None 90 self.masterAddr = None 91 self.infoAddr = None 92 self.workers = [] 93 self.required_node = required_node 94 self.version = version 95 self.workers_per_ring = workers_per_ring 96 97 def isLaunchable(self, serviceDict): 98 hdfs = serviceDict['hdfs'] 99 if (hdfs.isMasterInitialized()): 100 return True 101 return False 102 103 def getMasterRequest(self): 104 req = NodeRequest(1, [], False) 105 return req 106 107 def getMasterCommands(self, serviceDict): 108 109 hdfs = serviceDict['hdfs'] 110 111 cmdDesc = self._getJobTrackerCommand(hdfs) 112 return [cmdDesc] 113 114 def getAdminCommands(self, serviceDict): 115 return [] 116 117 def getWorkerCommands(self, serviceDict): 118 119 hdfs = serviceDict['hdfs'] 120 121 workerCmds = [] 122 for id in range(1, self.workers_per_ring + 1): 123 workerCmds.append(self._getTaskTrackerCommand(str(id), hdfs)) 124 125 return workerCmds 126 127 def setMasterNodes(self, list): 128 node = list[0] 129 self.masterNode = node 130 131 def getMasterAddrs(self): 132 return [self.masterAddr] 133 134 def getInfoAddrs(self): 135 return [self.infoAddr] 136 137 def getWorkers(self): 138 return self.workers 139 140 def requiredNode(self): 141 return self.required_host 142 143 def setMasterParams(self, list): 144 dict = self._parseEquals(list) 145 self.masterAddr = dict['mapred.job.tracker'] 146 k,v = self.masterAddr.split(":") 147 self.masterNode = k 148 if self.version < 16: 149 self.infoAddr = self.masterNode + ':' + dict['mapred.job.tracker.info.port'] 150 else: 151 # After Hadoop-2185 152 self.infoAddr = dict['mapred.job.tracker.http.address'] 153 154 def _parseEquals(self, list): 155 return parseEquals(list) 156 157 def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir): 158 local = [] 159 system = None 160 temp = None 161 hadooptmpdir = None 162 dfsclient = [] 163 164 for p in parentDirs: 165 workDirs.append(p) 166 workDirs.append(os.path.join(p, subDir)) 167 dir = os.path.join(p, subDir, 'mapred-local') 168 local.append(dir) 169 if not system: 170 system = os.path.join(p, subDir, 'mapred-system') 171 if not temp: 172 temp = os.path.join(p, subDir, 'mapred-temp') 173 if not hadooptmpdir: 174 # Not used currently, generating hadooptmpdir just in case 175 hadooptmpdir = os.path.join(p, subDir, 'hadoop-tmp') 176 dfsclientdir = os.path.join(p, subDir, 'dfs-client') 177 dfsclient.append(dfsclientdir) 178 workDirs.append(dfsclientdir) 179 # FIXME!! use csv 180 attrs['mapred.local.dir'] = ','.join(local) 181 attrs['mapred.system.dir'] = 'fillindir' 182 attrs['mapred.temp.dir'] = temp 183 attrs['hadoop.tmp.dir'] = hadooptmpdir 184 185 186 envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA" 187 188 189 def _getJobTrackerCommand(self, hdfs): 190 sd = self.serviceDesc 191 192 parentDirs = self.workDirs 193 workDirs = [] 194 attrs = sd.getfinalAttrs().copy() 195 envs = sd.getEnvs().copy() 196 197 if 'mapred.job.tracker' not in attrs: 198 attrs['mapred.job.tracker'] = 'fillinhostport' 199 200 if self.version < 16: 201 if 'mapred.job.tracker.info.port' not in attrs: 202 attrs['mapred.job.tracker.info.port'] = 'fillinport' 203 else: 204 # Addressing Hadoop-2185, 205 if 'mapred.job.tracker.http.address' not in attrs: 206 attrs['mapred.job.tracker.http.address'] = 'fillinhostport' 207 208 attrs['fs.default.name'] = hdfs.getMasterAddrs()[0] 209 210 self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-jt') 211 212 dict = { 'name' : 'jobtracker' } 213 dict['version'] = self.version 214 dict['program'] = os.path.join('bin', 'hadoop') 215 dict['argv'] = ['jobtracker'] 216 dict['envs'] = envs 217 dict['pkgdirs'] = sd.getPkgDirs() 218 dict['workdirs'] = workDirs 219 dict['final-attrs'] = attrs 220 dict['attrs'] = sd.getAttrs() 221 cmd = CommandDesc(dict) 222 return cmd 223 224 def _getTaskTrackerCommand(self, id, hdfs): 225 226 sd = self.serviceDesc 227 228 parentDirs = self.workDirs 229 workDirs = [] 230 attrs = sd.getfinalAttrs().copy() 231 envs = sd.getEnvs().copy() 232 jt = self.masterAddr 233 234 if jt == None: 235 raise ValueError, "Can't get job tracker address" 236 237 attrs['mapred.job.tracker'] = jt 238 attrs['fs.default.name'] = hdfs.getMasterAddrs()[0] 239 240 if self.version < 16: 241 if 'tasktracker.http.port' not in attrs: 242 attrs['tasktracker.http.port'] = 'fillinport' 243 # earlier to 16, tasktrackers always took ephemeral port 0 for 244 # tasktracker.report.bindAddress 245 else: 246 # Adding the following. Hadoop-2185 247 if 'mapred.task.tracker.report.address' not in attrs: 248 attrs['mapred.task.tracker.report.address'] = 'fillinhostport' 249 if 'mapred.task.tracker.http.address' not in attrs: 250 attrs['mapred.task.tracker.http.address'] = 'fillinhostport' 251 252 # unique parentDirs in case of multiple tasktrackers per hodring 253 pd = [] 254 for dir in parentDirs: 255 dir = dir + "-" + id 256 pd.append(dir) 257 parentDirs = pd 258 # end of unique workdirs 259 260 self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-tt') 261 262 dict = { 'name' : 'tasktracker' } 263 dict['program'] = os.path.join('bin', 'hadoop') 264 dict['argv'] = ['tasktracker'] 265 dict['envs'] = envs 266 dict['pkgdirs'] = sd.getPkgDirs() 267 dict['workdirs'] = workDirs 268 dict['final-attrs'] = attrs 269 dict['attrs'] = sd.getAttrs() 270 cmd = CommandDesc(dict) 271 return cmd 272 273