1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16"""define MapReduce as subclass of Service"""
17
18# -*- python -*-
19
20import os, copy, time
21
22from service import *
23from hodlib.Hod.nodePool import *
24from hodlib.Common.desc import CommandDesc
25from hodlib.Common.util import get_exception_string, parseEquals
26
27class MapReduceExternal(MasterSlave):
28  """dummy proxy to external MapReduce instance"""
29
30  def __init__(self, serviceDesc, workDirs, version):
31    MasterSlave.__init__(self, serviceDesc, workDirs,None)
32    self.launchedMaster = True
33    self.masterInitialized = True
34    self.version = version
35
36  def getMasterRequest(self):
37    return None
38
39  def getMasterCommands(self, serviceDict):
40    return []
41
42  def getAdminCommands(self, serviceDict):
43    return []
44
45  def getWorkerCommands(self, serviceDict):
46    return []
47
48  def getMasterAddrs(self):
49    attrs = self.serviceDesc.getfinalAttrs()
50    addr = attrs['mapred.job.tracker']
51    return [addr]
52
53  def needsMore(self):
54    return 0
55
56  def needsLess(self):
57    return 0
58
59  def setMasterParams(self, dict):
60    self.serviceDesc['final-attrs']['mapred.job.tracker'] = "%s:%s" % (dict['host'],
61      dict['tracker_port'])
62
63    if self.version < 16:
64      self.serviceDesc.dict['final-attrs']['mapred.job.tracker.info.port'] = \
65                                      str(self.serviceDesc.dict['info_port'])
66    else:
67      # After Hadoop-2185
68      self.serviceDesc['final-attrs']['mapred.job.tracker.http.address'] = \
69        "%s:%s" %(dict['host'], dict['info_port'])
70
71  def getInfoAddrs(self):
72    attrs = self.serviceDesc.getfinalAttrs()
73    if self.version < 16:
74      addr = attrs['mapred.job.tracker']
75      k,v = addr.split( ":")
76      infoaddr = k + ':' + attrs['mapred.job.tracker.info.port']
77    else:
78      # After Hadoop-2185
79      # Note: earlier,we never respected mapred.job.tracker.http.address
80      infoaddr = attrs['mapred.job.tracker.http.address']
81    return [infoaddr]
82
83class MapReduce(MasterSlave):
84
85  def __init__(self, serviceDesc, workDirs,required_node, version,
86                workers_per_ring = 1):
87    MasterSlave.__init__(self, serviceDesc, workDirs,required_node)
88
89    self.masterNode = None
90    self.masterAddr = None
91    self.infoAddr = None
92    self.workers = []
93    self.required_node = required_node
94    self.version = version
95    self.workers_per_ring = workers_per_ring
96
97  def isLaunchable(self, serviceDict):
98    hdfs = serviceDict['hdfs']
99    if (hdfs.isMasterInitialized()):
100      return True
101    return False
102
103  def getMasterRequest(self):
104    req = NodeRequest(1, [], False)
105    return req
106
107  def getMasterCommands(self, serviceDict):
108
109    hdfs = serviceDict['hdfs']
110
111    cmdDesc = self._getJobTrackerCommand(hdfs)
112    return [cmdDesc]
113
114  def getAdminCommands(self, serviceDict):
115    return []
116
117  def getWorkerCommands(self, serviceDict):
118
119    hdfs = serviceDict['hdfs']
120
121    workerCmds = []
122    for id in range(1, self.workers_per_ring + 1):
123      workerCmds.append(self._getTaskTrackerCommand(str(id), hdfs))
124
125    return workerCmds
126
127  def setMasterNodes(self, list):
128    node = list[0]
129    self.masterNode = node
130
131  def getMasterAddrs(self):
132    return [self.masterAddr]
133
134  def getInfoAddrs(self):
135    return [self.infoAddr]
136
137  def getWorkers(self):
138    return self.workers
139
140  def requiredNode(self):
141    return self.required_host
142
143  def setMasterParams(self, list):
144    dict = self._parseEquals(list)
145    self.masterAddr = dict['mapred.job.tracker']
146    k,v = self.masterAddr.split(":")
147    self.masterNode = k
148    if self.version < 16:
149      self.infoAddr = self.masterNode + ':' + dict['mapred.job.tracker.info.port']
150    else:
151      # After Hadoop-2185
152      self.infoAddr = dict['mapred.job.tracker.http.address']
153
154  def _parseEquals(self, list):
155    return parseEquals(list)
156
157  def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
158    local = []
159    system = None
160    temp = None
161    hadooptmpdir = None
162    dfsclient = []
163
164    for p in parentDirs:
165      workDirs.append(p)
166      workDirs.append(os.path.join(p, subDir))
167      dir = os.path.join(p, subDir, 'mapred-local')
168      local.append(dir)
169      if not system:
170        system = os.path.join(p, subDir, 'mapred-system')
171      if not temp:
172        temp = os.path.join(p, subDir, 'mapred-temp')
173      if not hadooptmpdir:
174        # Not used currently, generating hadooptmpdir just in case
175        hadooptmpdir = os.path.join(p, subDir, 'hadoop-tmp')
176      dfsclientdir = os.path.join(p, subDir, 'dfs-client')
177      dfsclient.append(dfsclientdir)
178      workDirs.append(dfsclientdir)
179    # FIXME!! use csv
180    attrs['mapred.local.dir'] = ','.join(local)
181    attrs['mapred.system.dir'] = 'fillindir'
182    attrs['mapred.temp.dir'] = temp
183    attrs['hadoop.tmp.dir'] = hadooptmpdir
184
185
186    envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
187
188
189  def _getJobTrackerCommand(self, hdfs):
190    sd = self.serviceDesc
191
192    parentDirs = self.workDirs
193    workDirs = []
194    attrs = sd.getfinalAttrs().copy()
195    envs = sd.getEnvs().copy()
196
197    if 'mapred.job.tracker' not in attrs:
198      attrs['mapred.job.tracker'] = 'fillinhostport'
199
200    if self.version < 16:
201      if 'mapred.job.tracker.info.port' not in attrs:
202        attrs['mapred.job.tracker.info.port'] = 'fillinport'
203    else:
204      # Addressing Hadoop-2185,
205      if 'mapred.job.tracker.http.address' not in attrs:
206        attrs['mapred.job.tracker.http.address'] = 'fillinhostport'
207
208    attrs['fs.default.name'] = hdfs.getMasterAddrs()[0]
209
210    self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-jt')
211
212    dict = { 'name' : 'jobtracker' }
213    dict['version'] = self.version
214    dict['program'] = os.path.join('bin', 'hadoop')
215    dict['argv'] = ['jobtracker']
216    dict['envs'] = envs
217    dict['pkgdirs'] = sd.getPkgDirs()
218    dict['workdirs'] = workDirs
219    dict['final-attrs'] = attrs
220    dict['attrs'] = sd.getAttrs()
221    cmd = CommandDesc(dict)
222    return cmd
223
224  def _getTaskTrackerCommand(self, id, hdfs):
225
226    sd = self.serviceDesc
227
228    parentDirs = self.workDirs
229    workDirs = []
230    attrs = sd.getfinalAttrs().copy()
231    envs = sd.getEnvs().copy()
232    jt = self.masterAddr
233
234    if jt == None:
235      raise ValueError, "Can't get job tracker address"
236
237    attrs['mapred.job.tracker'] = jt
238    attrs['fs.default.name'] = hdfs.getMasterAddrs()[0]
239
240    if self.version < 16:
241      if 'tasktracker.http.port' not in attrs:
242        attrs['tasktracker.http.port'] = 'fillinport'
243      # earlier to 16, tasktrackers always took ephemeral port 0 for
244      # tasktracker.report.bindAddress
245    else:
246      # Adding the following. Hadoop-2185
247      if 'mapred.task.tracker.report.address' not in attrs:
248        attrs['mapred.task.tracker.report.address'] = 'fillinhostport'
249      if 'mapred.task.tracker.http.address' not in attrs:
250        attrs['mapred.task.tracker.http.address'] = 'fillinhostport'
251
252    # unique parentDirs in case of multiple tasktrackers per hodring
253    pd = []
254    for dir in parentDirs:
255      dir = dir + "-" + id
256      pd.append(dir)
257    parentDirs = pd
258    # end of unique workdirs
259
260    self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-tt')
261
262    dict = { 'name' : 'tasktracker' }
263    dict['program'] = os.path.join('bin', 'hadoop')
264    dict['argv'] = ['tasktracker']
265    dict['envs'] = envs
266    dict['pkgdirs'] = sd.getPkgDirs()
267    dict['workdirs'] = workDirs
268    dict['final-attrs'] = attrs
269    dict['attrs'] = sd.getAttrs()
270    cmd = CommandDesc(dict)
271    return cmd
272
273