1#Licensed to the Apache Software Foundation (ASF) under one 2#or more contributor license agreements. See the NOTICE file 3#distributed with this work for additional information 4#regarding copyright ownership. The ASF licenses this file 5#to you under the Apache License, Version 2.0 (the 6#"License"); you may not use this file except in compliance 7#with the License. You may obtain a copy of the License at 8 9# http://www.apache.org/licenses/LICENSE-2.0 10 11#Unless required by applicable law or agreed to in writing, software 12#distributed under the License is distributed on an "AS IS" BASIS, 13#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14#See the License for the specific language governing permissions and 15#limitations under the License. 16import errno, sys, os, traceback, stat, socket, re, warnings, signal 17 18from hodlib.Common.tcp import tcpSocket, tcpError 19from hodlib.Common.threads import simpleCommand 20 21setUGV = { 'S_ISUID' : 2, 'S_ISGID' : 1, 'S_ISVTX' : 0 } 22reEscapeSeq = r"\\(.)?" 23reEscapeSeq = re.compile(reEscapeSeq) 24 25HOD_INTERRUPTED_CODE = 127 26HOD_INTERRUPTED_MESG = "Hod interrupted. Cleaning up and exiting" 27TORQUE_USER_LIMITS_COMMENT_FIELD = "User-limits exceeded. " + \ 28 "Requested:([0-9]*) Used:([0-9]*) MaxLimit:([0-9]*)" 29TORQUE_USER_LIMITS_EXCEEDED_MSG = "Requested number of nodes exceeded " + \ 30 "maximum user limits. " 31 32class AlarmException(Exception): 33 def __init__(self, msg=''): 34 self.message = msg 35 Exception.__init__(self, msg) 36 37 def __repr__(self): 38 return self.message 39 40def isProcessRunning(pid): 41 '''Check if a process is running, by sending it a 0 signal, and checking for errors''' 42 # This method is documented in some email threads on the python mailing list. 43 # For e.g.: http://mail.python.org/pipermail/python-list/2002-May/144522.html 44 try: 45 os.kill(pid, 0) 46 return True 47 except OSError, err: 48 return err.errno == errno.EPERM 49 50def untar(file, targetDir): 51 status = False 52 command = 'tar -C %s -zxf %s' % (targetDir, file) 53 commandObj = simpleCommand('untar', command) 54 commandObj.start() 55 commandObj.wait() 56 commandObj.join() 57 if commandObj.exit_code() == 0: 58 status = True 59 60 return status 61 62def tar(tarFile, tarDirectory, tarList): 63 currentDir = os.getcwd() 64 os.chdir(tarDirectory) 65 status = False 66 command = 'tar -czf %s ' % (tarFile) 67 68 for file in tarList: 69 command = "%s%s " % (command, file) 70 71 commandObj = simpleCommand('tar', command) 72 commandObj.start() 73 commandObj.wait() 74 commandObj.join() 75 if commandObj.exit_code() == 0: 76 status = True 77 else: 78 status = commandObj.exit_status_string() 79 80 os.chdir(currentDir) 81 82 return status 83 84def to_http_url(list): 85 """convert [hostname, port] to a http url""" 86 str = '' 87 str = "http://%s:%s" % (list[0], list[1]) 88 89 return str 90 91def get_exception_string(): 92 (type, value, tb) = sys.exc_info() 93 exceptList = traceback.format_exception(type, value, tb) 94 exceptString = '' 95 for line in exceptList: 96 exceptString = "%s%s" % (exceptString, line) 97 98 return exceptString 99 100def get_exception_error_string(): 101 (type, value, tb) = sys.exc_info() 102 if value: 103 exceptString = "%s %s" % (type, value) 104 else: 105 exceptString = type 106 107 return exceptString 108 109def check_timestamp(timeStamp): 110 """ Checks the validity of a timeStamp. 111 112 timeStamp - (YYYY-MM-DD HH:MM:SS in UTC) 113 114 returns True or False 115 """ 116 isValid = True 117 118 try: 119 timeStruct = time.strptime(timeStamp, "%Y-%m-%d %H:%M:%S") 120 except: 121 isValid = False 122 123 return isValid 124 125def sig_wrapper(sigNum, handler, *args): 126 if args: 127 handler(args) 128 else: 129 handler() 130 131def get_perms(filename): 132 mode = stat.S_IMODE(os.stat(filename)[stat.ST_MODE]) 133 permsString = '' 134 permSet = 0 135 place = 2 136 for who in "USR", "GRP", "OTH": 137 for what in "R", "W", "X": 138 if mode & getattr(stat,"S_I"+what+who): 139 permSet = permSet + 2**place 140 place = place - 1 141 142 permsString = "%s%s" % (permsString, permSet) 143 permSet = 0 144 place = 2 145 146 permSet = 0 147 for permFlag in setUGV.keys(): 148 if mode & getattr(stat, permFlag): 149 permSet = permSet + 2**setUGV[permFlag] 150 151 permsString = "%s%s" % (permSet, permsString) 152 153 return permsString 154 155def local_fqdn(): 156 """Return a system's true FQDN rather than any aliases, which are 157 occasionally returned by socket.gethostname.""" 158 159 fqdn = None 160 me = os.uname()[1] 161 nameInfo=socket.gethostbyname_ex(me) 162 nameInfo[1].append(nameInfo[0]) 163 for name in nameInfo[1]: 164 if name.count(".") and name.startswith(me): 165 fqdn = name 166 if fqdn == None: 167 fqdn = me 168 return(fqdn) 169 170def need_to_allocate(allocated, config, command): 171 status = True 172 173 if allocated.isSet(): 174 status = False 175 elif re.search("\s*dfs.*$", command) and \ 176 config['gridservice-hdfs']['external']: 177 status = False 178 elif config['gridservice-mapred']['external']: 179 status = False 180 181 return status 182 183def filter_warnings(): 184 warnings.filterwarnings('ignore', 185 message=".*?'with' will become a reserved keyword.*") 186 187def args_to_string(list): 188 """return a string argument space seperated""" 189 arg = '' 190 for item in list: 191 arg = "%s%s " % (arg, item) 192 return arg[:-1] 193 194def replace_escapes(object): 195 """ replace any escaped character. e.g \, with , \= with = and so on """ 196 # here object is either a config object or a options object 197 for section in object._mySections: 198 for option in object._configDef[section].keys(): 199 if object[section].has_key(option): 200 if object._configDef[section][option]['type'] == 'keyval': 201 keyValDict = object[section][option] 202 object[section][option] = {} 203 for (key,value) in keyValDict.iteritems(): 204 match = reEscapeSeq.search(value) 205 if match: 206 value = reEscapeSeq.sub(r"\1", value) 207 object[section][option][key] = value 208 209def hadoopVersion(hadoopDir, java_home, log): 210 # Determine the version of hadoop being used by executing the 211 # hadoop version command. Code earlier in idleTracker.py 212 hadoopVersion = { 'major' : None, 'minor' : None } 213 hadoopPath = os.path.join(hadoopDir, 'bin', 'hadoop') 214 cmd = "%s version" % hadoopPath 215 log.debug('Executing command %s to find hadoop version' % cmd) 216 env = os.environ 217 env['JAVA_HOME'] = java_home 218 hadoopVerCmd = simpleCommand('HadoopVersion', cmd, env) 219 hadoopVerCmd.start() 220 hadoopVerCmd.wait() 221 hadoopVerCmd.join() 222 if hadoopVerCmd.exit_code() == 0: 223 verLine = hadoopVerCmd.output()[0] 224 log.debug('Version from hadoop command: %s' % verLine) 225 hadoopVerRegExp = re.compile("Hadoop ([0-9]+)\.([0-9]+).*") 226 verMatch = hadoopVerRegExp.match(verLine) 227 if verMatch != None: 228 hadoopVersion['major'] = verMatch.group(1) 229 hadoopVersion['minor'] = verMatch.group(2) 230 return hadoopVersion 231 232 233def get_cluster_status(hdfsAddress, mapredAddress): 234 """Determine the status of the cluster based on socket availability 235 of HDFS and Map/Reduce.""" 236 status = 0 237 238 mapredSocket = tcpSocket(mapredAddress) 239 try: 240 mapredSocket.open() 241 mapredSocket.close() 242 except tcpError: 243 status = 14 244 245 hdfsSocket = tcpSocket(hdfsAddress) 246 try: 247 hdfsSocket.open() 248 hdfsSocket.close() 249 except tcpError: 250 if status > 0: 251 status = 10 252 else: 253 status = 13 254 255 return status 256 257def parseEquals(list): 258 # takes in a list of keyval pairs e.g ['a=b','c=d'] and returns a 259 # dict e.g {'a'='b','c'='d'}. Used in GridService/{mapred.py/hdfs.py} and 260 # HodRing/hodring.py. No need for specially treating escaped =. as in \=, 261 # since all keys are generated by hod and don't contain such anomalies 262 dict = {} 263 for elems in list: 264 splits = elems.split('=') 265 dict[splits[0]] = splits[1] 266 return dict 267 268def getMapredSystemDirectory(mrSysDirRoot, userid, jobid): 269 return os.path.join(mrSysDirRoot, userid, 'mapredsystem', jobid) 270 271class HodInterrupt: 272 def __init__(self): 273 self.HodInterruptFlag = False 274 self.log = None 275 276 def set_log(self, log): 277 self.log = log 278 279 def init_signals(self): 280 281 def sigStop(sigNum, handler): 282 sig_wrapper(sigNum, self.setFlag) 283 284 signal.signal(signal.SIGTERM, sigStop) # 15 : software termination signal 285 signal.signal(signal.SIGQUIT, sigStop) # 3 : Quit program 286 signal.signal(signal.SIGINT, sigStop) # 2 ^C : Interrupt program 287 288 def sig_wrapper(sigNum, handler, *args): 289 self.log.critical("Caught signal %s." % sigNum ) 290 291 if args: 292 handler(args) 293 else: 294 handler() 295 296 def setFlag(self, val = True): 297 self.HodInterruptFlag = val 298 299 def isSet(self): 300 return self.HodInterruptFlag 301 302class HodInterruptException(Exception): 303 def __init__(self, value = ""): 304 self.value = value 305 306 def __str__(self): 307 return repr(self.value) 308 309hodInterrupt = HodInterrupt() 310