1#Licensed to the Apache Software Foundation (ASF) under one
2#or more contributor license agreements.  See the NOTICE file
3#distributed with this work for additional information
4#regarding copyright ownership.  The ASF licenses this file
5#to you under the Apache License, Version 2.0 (the
6#"License"); you may not use this file except in compliance
7#with the License.  You may obtain a copy of the License at
8
9#     http://www.apache.org/licenses/LICENSE-2.0
10
11#Unless required by applicable law or agreed to in writing, software
12#distributed under the License is distributed on an "AS IS" BASIS,
13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#See the License for the specific language governing permissions and
15#limitations under the License.
16import errno, sys, os, traceback, stat, socket, re, warnings, signal
17
18from hodlib.Common.tcp import tcpSocket, tcpError
19from hodlib.Common.threads import simpleCommand
20
21setUGV   = { 'S_ISUID' : 2, 'S_ISGID' : 1, 'S_ISVTX' : 0 }
22reEscapeSeq = r"\\(.)?"
23reEscapeSeq = re.compile(reEscapeSeq)
24
25HOD_INTERRUPTED_CODE = 127
26HOD_INTERRUPTED_MESG = "Hod interrupted. Cleaning up and exiting"
27TORQUE_USER_LIMITS_COMMENT_FIELD = "User-limits exceeded. " + \
28        "Requested:([0-9]*) Used:([0-9]*) MaxLimit:([0-9]*)"
29TORQUE_USER_LIMITS_EXCEEDED_MSG = "Requested number of nodes exceeded " + \
30                                  "maximum user limits. "
31
32class AlarmException(Exception):
33    def __init__(self, msg=''):
34        self.message = msg
35        Exception.__init__(self, msg)
36
37    def __repr__(self):
38        return self.message
39
40def isProcessRunning(pid):
41    '''Check if a process is running, by sending it a 0 signal, and checking for errors'''
42    # This method is documented in some email threads on the python mailing list.
43    # For e.g.: http://mail.python.org/pipermail/python-list/2002-May/144522.html
44    try:
45      os.kill(pid, 0)
46      return True
47    except OSError, err:
48      return err.errno == errno.EPERM
49
50def untar(file, targetDir):
51    status = False
52    command = 'tar -C %s -zxf %s' % (targetDir, file)
53    commandObj = simpleCommand('untar', command)
54    commandObj.start()
55    commandObj.wait()
56    commandObj.join()
57    if commandObj.exit_code() == 0:
58        status = True
59
60    return status
61
62def tar(tarFile, tarDirectory, tarList):
63    currentDir = os.getcwd()
64    os.chdir(tarDirectory)
65    status = False
66    command = 'tar -czf %s ' % (tarFile)
67
68    for file in tarList:
69        command = "%s%s " % (command, file)
70
71    commandObj = simpleCommand('tar', command)
72    commandObj.start()
73    commandObj.wait()
74    commandObj.join()
75    if commandObj.exit_code() == 0:
76        status = True
77    else:
78        status = commandObj.exit_status_string()
79
80    os.chdir(currentDir)
81
82    return status
83
84def to_http_url(list):
85    """convert [hostname, port]  to a http url"""
86    str = ''
87    str = "http://%s:%s" % (list[0], list[1])
88
89    return str
90
91def get_exception_string():
92    (type, value, tb) = sys.exc_info()
93    exceptList = traceback.format_exception(type, value, tb)
94    exceptString = ''
95    for line in exceptList:
96        exceptString = "%s%s" % (exceptString, line)
97
98    return exceptString
99
100def get_exception_error_string():
101  (type, value, tb) = sys.exc_info()
102  if value:
103    exceptString = "%s %s" % (type, value)
104  else:
105    exceptString = type
106
107  return exceptString
108
109def check_timestamp(timeStamp):
110    """ Checks the validity of a timeStamp.
111
112        timeStamp - (YYYY-MM-DD HH:MM:SS in UTC)
113
114        returns True or False
115    """
116    isValid = True
117
118    try:
119        timeStruct = time.strptime(timeStamp, "%Y-%m-%d %H:%M:%S")
120    except:
121        isValid = False
122
123    return isValid
124
125def sig_wrapper(sigNum, handler, *args):
126  if args:
127      handler(args)
128  else:
129      handler()
130
131def get_perms(filename):
132    mode = stat.S_IMODE(os.stat(filename)[stat.ST_MODE])
133    permsString = ''
134    permSet = 0
135    place = 2
136    for who in "USR", "GRP", "OTH":
137        for what in "R", "W", "X":
138            if mode & getattr(stat,"S_I"+what+who):
139                permSet = permSet + 2**place
140            place = place - 1
141
142        permsString = "%s%s" % (permsString, permSet)
143        permSet = 0
144        place = 2
145
146    permSet = 0
147    for permFlag in setUGV.keys():
148        if mode & getattr(stat, permFlag):
149            permSet = permSet + 2**setUGV[permFlag]
150
151    permsString = "%s%s" % (permSet, permsString)
152
153    return permsString
154
155def local_fqdn():
156    """Return a system's true FQDN rather than any aliases, which are
157       occasionally returned by socket.gethostname."""
158
159    fqdn = None
160    me = os.uname()[1]
161    nameInfo=socket.gethostbyname_ex(me)
162    nameInfo[1].append(nameInfo[0])
163    for name in nameInfo[1]:
164        if name.count(".") and name.startswith(me):
165            fqdn = name
166    if fqdn == None:
167        fqdn = me
168    return(fqdn)
169
170def need_to_allocate(allocated, config, command):
171    status = True
172
173    if allocated.isSet():
174        status = False
175    elif re.search("\s*dfs.*$", command) and \
176        config['gridservice-hdfs']['external']:
177        status = False
178    elif config['gridservice-mapred']['external']:
179        status = False
180
181    return status
182
183def filter_warnings():
184    warnings.filterwarnings('ignore',
185        message=".*?'with' will become a reserved keyword.*")
186
187def args_to_string(list):
188  """return a string argument space seperated"""
189  arg = ''
190  for item in list:
191    arg = "%s%s " % (arg, item)
192  return arg[:-1]
193
194def replace_escapes(object):
195  """ replace any escaped character. e.g \, with , \= with = and so on """
196  # here object is either a config object or a options object
197  for section in object._mySections:
198    for option in object._configDef[section].keys():
199      if object[section].has_key(option):
200        if object._configDef[section][option]['type'] == 'keyval':
201          keyValDict = object[section][option]
202          object[section][option] = {}
203          for (key,value) in keyValDict.iteritems():
204            match = reEscapeSeq.search(value)
205            if match:
206              value = reEscapeSeq.sub(r"\1", value)
207            object[section][option][key] = value
208
209def hadoopVersion(hadoopDir, java_home, log):
210  # Determine the version of hadoop being used by executing the
211  # hadoop version command. Code earlier in idleTracker.py
212  hadoopVersion = { 'major' : None, 'minor' : None }
213  hadoopPath = os.path.join(hadoopDir, 'bin', 'hadoop')
214  cmd = "%s version" % hadoopPath
215  log.debug('Executing command %s to find hadoop version' % cmd)
216  env = os.environ
217  env['JAVA_HOME'] = java_home
218  hadoopVerCmd = simpleCommand('HadoopVersion', cmd, env)
219  hadoopVerCmd.start()
220  hadoopVerCmd.wait()
221  hadoopVerCmd.join()
222  if hadoopVerCmd.exit_code() == 0:
223    verLine = hadoopVerCmd.output()[0]
224    log.debug('Version from hadoop command: %s' % verLine)
225    hadoopVerRegExp = re.compile("Hadoop ([0-9]+)\.([0-9]+).*")
226    verMatch = hadoopVerRegExp.match(verLine)
227    if verMatch != None:
228      hadoopVersion['major'] = verMatch.group(1)
229      hadoopVersion['minor'] = verMatch.group(2)
230  return hadoopVersion
231
232
233def get_cluster_status(hdfsAddress, mapredAddress):
234  """Determine the status of the cluster based on socket availability
235     of HDFS and Map/Reduce."""
236  status = 0
237
238  mapredSocket = tcpSocket(mapredAddress)
239  try:
240    mapredSocket.open()
241    mapredSocket.close()
242  except tcpError:
243    status = 14
244
245  hdfsSocket = tcpSocket(hdfsAddress)
246  try:
247    hdfsSocket.open()
248    hdfsSocket.close()
249  except tcpError:
250    if status > 0:
251      status = 10
252    else:
253      status = 13
254
255  return status
256
257def parseEquals(list):
258  # takes in a list of keyval pairs e.g ['a=b','c=d'] and returns a
259  # dict e.g {'a'='b','c'='d'}. Used in GridService/{mapred.py/hdfs.py} and
260  # HodRing/hodring.py. No need for specially treating escaped =. as in \=,
261  # since all keys are generated by hod and don't contain such anomalies
262  dict = {}
263  for elems in list:
264    splits = elems.split('=')
265    dict[splits[0]] = splits[1]
266  return dict
267
268def getMapredSystemDirectory(mrSysDirRoot, userid, jobid):
269  return os.path.join(mrSysDirRoot, userid, 'mapredsystem', jobid)
270
271class HodInterrupt:
272  def __init__(self):
273    self.HodInterruptFlag = False
274    self.log = None
275
276  def set_log(self, log):
277    self.log = log
278
279  def init_signals(self):
280
281    def sigStop(sigNum, handler):
282      sig_wrapper(sigNum, self.setFlag)
283
284    signal.signal(signal.SIGTERM, sigStop) # 15 : software termination signal
285    signal.signal(signal.SIGQUIT, sigStop) # 3  : Quit program
286    signal.signal(signal.SIGINT, sigStop)  # 2 ^C : Interrupt program
287
288    def sig_wrapper(sigNum, handler, *args):
289      self.log.critical("Caught signal %s." % sigNum )
290
291      if args:
292          handler(args)
293      else:
294          handler()
295
296  def setFlag(self, val = True):
297    self.HodInterruptFlag = val
298
299  def isSet(self):
300    return self.HodInterruptFlag
301
302class HodInterruptException(Exception):
303  def __init__(self, value = ""):
304    self.value = value
305
306  def __str__(self):
307    return repr(self.value)
308
309hodInterrupt = HodInterrupt()
310