1# coding: utf-8 2# flake8: noqa 3""" 4Error handlers for errors originating from the Submission systems. 5""" 6import re 7import abc 8 9from abc import ABCMeta, abstractmethod 10 11__author__ = "Michiel van Setten" 12__copyright__ = " " 13__version__ = "0.9" 14__maintainer__ = "Michiel van Setten" 15__email__ = "mjvansetten@gmail.com" 16__date__ = "May 2014" 17 18__all_errors__ = ['SubmitError', 'FullQueueError', 'DiskError', 'TimeCancelError', 'MemoryCancelError', 19 'NodeFailureError'] 20 21 22class CorrectorProtocolScheduler(metaclass=ABCMeta): 23 """ 24 Abstract class to define the protocol / interface for correction operators. The client code qadapters / submission 25 script generator method / ... should implement these methods. 26 """ 27 28 @property 29 @abc.abstractmethod 30 def name(self): 31 return str() 32 33 @abstractmethod 34 def exclude_nodes(self, nodes): 35 """ 36 Method to exclude certain nodes from being used in the calculation. It is called when a calculation seemed to 37 have been crashed due to a hardware failure at the nodes specified. 38 39 nodes: list of node numbers that were found to cause problems 40 41 returns True if the memory could be increased False otherwise 42 """ 43 44 @abstractmethod 45 def increase_mem(self): 46 """ 47 Method to increase then memory in the calculation. It is called when a calculation seemed to have been crashed 48 due to a insufficient memory. 49 50 returns True if the memory could be increased False otherwise 51 """ 52 53 @abstractmethod 54 def increase_time(self): 55 """ 56 Method to increase te time for the calculation. It is called when a calculation seemed to 57 have been crashed due to a time limit. 58 59 returns True if the memory could be increased False otherwise 60 """ 61 62 @abstractmethod 63 def increase_cpus(self): 64 """ 65 Method to increse the number of cpus being used in the calculation. It is called when a calculation seemed to 66 have been crashed due to time or memory limits being broken. 67 68 returns True if the memory could be increased False otherwise 69 """ 70 71 72class CorrectorProtocolApplication(metaclass=ABCMeta): 73 """ 74 Abstract class to define the protocol / interface for correction operators. The client code quadapters / submission 75 script generator method / ... should implement these methods. 76 """ 77 78 @property 79 @abc.abstractmethod 80 def name(self): 81 return str() 82 83 @abstractmethod 84 def decrease_mem(self): 85 """ 86 Method to increase then memory in the calculation. It is called when a calculation seemed to have been crashed 87 due to a insufficient memory. 88 89 returns True if the memory could be increased False otherwise 90 """ 91 92 @abstractmethod 93 def speed_up(self): 94 """ 95 Method to speed_up the calculation. It is called when a calculation seemed to time limits being broken. 96 97 returns True if the memory could be increased False otherwise 98 """ 99 100 101class AbstractError(metaclass=ABCMeta): 102 """ 103 Error base class 104 """ 105 106 def __init__(self, errmsg, meta_data): 107 self.errmsg = errmsg 108 self.meta_data = meta_data if meta_data is not None else {} 109 110 def __str__(self): 111 _message = '%s %s\n' \ 112 ' error message : %s \n' \ 113 ' meta data : %s' % (self.name, self.__doc__, self.errmsg, str(self.meta_data)) 114 return _message 115 116 @property 117 def name(self): 118 return self.__class__.__name__ 119 120 @property 121 def scheduler_adapter_solutions(self): 122 """ 123 to be implemented by concrete errors returning a list of tuples defining corrections. The First element of the 124 tuple should be a string of one of the methods in CorrectorProtocolScheduler, the second element should 125 contain the arguments. 126 """ 127 return [] 128 129 @property 130 def application_adapter_solutions(self): 131 """ 132 to be implemented by concrete errors returning a list of tuples defining corrections. The First element of the 133 tuple should be a string of one of the methods in CorrectorProtocolApplication, the second element should 134 contain the arguments. 135 """ 136 return [] 137 138 def last_resort_solution(self): 139 """ 140 what to do if every thing else fails... 141 """ 142 print('non of the defined solutions for %s returned success...' % self.name) 143 return 144 145 146class SubmitError(AbstractError): 147 """ 148 Errors occurring at submission. The limits on the cluster may have changed. 149 """ 150 151 152class FullQueueError(AbstractError): 153 """ 154 Errors occurring at submission. To many jobs in the queue / total cpus / .. . 155 """ 156 157 158class DiskError(AbstractError): 159 """ 160 Errors involving problems writing to disk. 161 """ 162 163 164class TimeCancelError(AbstractError): 165 """ 166 Error due to exceeding the time limit for the job. 167 .limit will return a list of limits that were broken, None if it could not be determined. 168 """ 169 170 @property 171 def limit(self): 172 return self.meta_data.get('broken_limit') 173 174 @property 175 def scheduler_adapter_solutions(self): 176 return [(CorrectorProtocolScheduler.increase_time,)] 177 178 @property 179 def application_adapter_solutions(self): 180 return [(CorrectorProtocolApplication.speed_up,)] 181 182 183class MemoryCancelError(AbstractError): 184 """ 185 Error due to exceeding the memory limit for the job. 186 .limit will return a list of limits that were broken, None if it could not be determined. 187 """ 188 189 @property 190 def limit(self): 191 return self.meta_data.get('broken_limit') 192 193 @property 194 def scheduler_adapter_solutions(self): 195 return [(CorrectorProtocolScheduler.increase_mem,)] 196 197 @property 198 def application_adapter_solutions(self): 199 return [(CorrectorProtocolApplication.decrease_mem,)] 200 201 202class MasterProcessMemoryCancelError(AbstractError): 203 """ 204 Error due to exceeding the memory limit for the job on the master node. 205 """ 206 207 208class SlaveProcessMemoryCancelError(AbstractError): 209 """ 210 Error due to exceeding the memory limit for the job on a node different from the master. 211 """ 212 213 214class NodeFailureError(AbstractError): 215 """ 216 Error due the hardware failure of a specific node. 217 .node will return a list of problematic nodes, None if it could not be determined. 218 """ 219 220 @property 221 def nodes(self): 222 return self.meta_data.get('nodes') 223 224 @property 225 def scheduler_adapter_solutions(self): 226 return [(CorrectorProtocolScheduler.exclude_nodes, [self.nodes])] 227 228 229class AbstractErrorParser(metaclass=ABCMeta): 230 """ 231 Abstract class for parsing errors originating from the scheduler system and error that are not reported by the 232 program itself, i.e. segmentation faults. 233 234 A concrete implementation of this class for a specific scheduler needs a class attribute ERRORS for containing a 235 dictionary specifying error: 236 237 ERRORS = {ErrorClass: { 238 'file_specifier' : { 239 'string': "the string to be looked for", 240 'meta_filter': "string specifing the regular expression to obtain the meta data" 241 } 242 } 243 244 """ 245 def __init__(self, err_file, out_file=None, run_err_file=None, batch_err_file=None): 246 self.files = {'err': err_file, 'out': out_file, 'run_err': run_err_file, 'batch_err': batch_err_file} 247 self.errors = [] 248 249 @property 250 @abc.abstractmethod 251 def error_definitions(self): 252 return dict() 253 254 @staticmethod 255 def extract_metadata(lines, meta_filter): 256 meta_dict = {} 257 for key in meta_filter.keys(): 258 values = [] 259 for line in lines: 260 match = re.match(meta_filter[key][0], line) 261 if match is not None: 262 values.append(re.match(meta_filter[key][0], line).group(meta_filter[key][1])) 263 values = sorted(set(values)) 264 meta_dict.update({key: values}) 265 return meta_dict 266 267 def parse_single(self, errmsg): 268 """ 269 Parse the provided files for the corresponding strings. 270 """ 271 found = False 272 message = None 273 metadata = None 274 for k in errmsg.keys(): 275 if self.files[k] is not None: 276 #print('parsing ', self.files[k], ' for ', errmsg[k]['string']) 277 try: 278 with open(self.files[k], mode='r') as f: 279 lines = f.read().split('\n') 280 for line in lines: 281 if errmsg[k]['string'] in line: 282 message = line 283 found = True 284 if found: 285 metadata = self.extract_metadata(lines, errmsg[k]['meta_filter']) 286 except (IOError, OSError): 287 print(self.files[k], 'not found') 288 pass 289 except TypeError: 290 print('type error', self.files[k], ' has type ', self.files[k].cls(), ' should be string.') 291 pass 292 293 return found, message, metadata 294 295 def parse(self): 296 """ 297 Parse for the occurens of all errors defined in ERRORS 298 """ 299 errors_tested = 0 300 for error in self.error_definitions: 301 errors_tested += 1 302 result = self.parse_single(self.error_definitions[error]) 303 if result[0]: 304 self.errors.append(error(result[1], result[2])) 305 if len(self.errors) > 0: 306 print('QUEUE_ERROR FOUND') 307 for error in self.errors: 308 print(error) 309 310 return errors_tested 311 312 313class SlurmErrorParser(AbstractErrorParser): 314 """ 315 Implementation of the error definitions for the Slurm scheduler 316 """ 317 @property 318 def error_definitions(self): 319 return { 320 SubmitError: { 321 'batch_err': { 322 'string': "Batch job submission failed", 323 'meta_filter': {} 324 } 325 }, 326 FullQueueError: { 327 'batch_err': { 328 'string': "Job violates accounting/QOS policy", 329 'meta_filter': {} 330 } 331 }, 332 MemoryCancelError: { 333 'err': { 334 'string': "Exceeded job memory limit", 335 'meta_filter': {} 336 } 337 }, 338#slurmstepd: error: *** JOB 1803480 CANCELLED AT 2015-12-16T14:57:32 DUE TO TIME LIMIT on lmWn009 *** 339#slurmstepd: error: *** JOB 1803712 CANCELLED AT 2015-12-17T15:21:41 DUE TO TIME LIMIT on lmWn001 *** 340 TimeCancelError: { 341 'err': { 342 'string': "DUE TO TIME LIMIT", 343 'meta_filter': { 344 'time_of_cancel': [r"(.*)JOB (\d+) CANCELLED AT (\S*) DUE TO TIME LIMIT(.*)", 3] 345 } 346 } 347 }, 348 NodeFailureError: { 349 'run_err': { 350 'string': "can't open /dev/ipath, network down", 351 'meta_filter': { 352 'nodes': [r"node(\d+)\.(\d+)can't open (\S*), network down \(err=26\)", 1] 353 } 354 } 355 }, 356 AbstractError: { 357 'out': { 358 'string': "a string to be found", 359 'meta_filter': {} 360 } 361 } 362 } 363 364 365class PBSErrorParser(AbstractErrorParser): 366 """ 367 Implementation for the PBS scheduler 368 PBS: job killed: walltime 932 exceeded limit 900 369 PBS: job killed: walltime 46 exceeded limit 30 370 PBS: job killed: vmem 2085244kb exceeded limit 1945600kb 371 """ 372 373 @property 374 def error_definitions(self): 375 return { 376 TimeCancelError: { 377 'out': { 378 'string': "job killed: walltime", 379 'meta_filter': { 380 'broken_limit': [r"=>> PBS: job killed: walltime (\d+) exceeded limit (\d+)", 2] 381 } 382 } 383 }, 384 AbstractError: { 385 'out': { 386 'string': "a string to be found", 387 'meta_filter': {} 388 } 389 }, 390 MemoryCancelError: { 391 'out': { 392 'string': "job killed: vmem", 393 'meta_filter': { 394 'broken_limit': [r"(.*)job killed: vmem (\d+)kb exceeded limit (\d+)kb", 3] 395 } 396 } 397 } 398 } 399 400 401ALL_PARSERS = {'slurm': SlurmErrorParser, 'pbspro': PBSErrorParser, 'torque': PBSErrorParser} 402 403 404def get_parser(scheduler, err_file, out_file=None, run_err_file=None, batch_err_file=None): 405 """ 406 Factory function to provide the parser for the specified scheduler. If the scheduler is not implemented None is 407 returned. The files, string, correspond to file names of the out and err files: 408 err_file stderr of the scheduler 409 out_file stdout of the scheduler 410 run_err_file stderr of the application 411 batch_err_file stderr of the submission 412 413 Returns: 414 None if scheduler is not supported. 415 """ 416 cls = ALL_PARSERS.get(scheduler) 417 return cls if cls is None else cls(err_file, out_file, run_err_file, batch_err_file) 418 419 420if __name__ == "__main__": 421 my_parser = get_parser('pbs', err_file='queue.err', out_file='queue.out', run_err_file='run.err', 422 batch_err_file='sbatch.err') 423 my_parser.parse() 424 print('parser.errors', my_parser.errors) 425 for my_error in my_parser.errors: 426 print(my_error) 427