1# coding: utf-8
2# flake8: noqa
3"""
4Error handlers for errors originating from the Submission systems.
5"""
6import re
7import abc
8
9from abc import ABCMeta, abstractmethod
10
11__author__ = "Michiel van Setten"
12__copyright__ = " "
13__version__ = "0.9"
14__maintainer__ = "Michiel van Setten"
15__email__ = "mjvansetten@gmail.com"
16__date__ = "May 2014"
17
18__all_errors__ = ['SubmitError', 'FullQueueError', 'DiskError', 'TimeCancelError', 'MemoryCancelError',
19                  'NodeFailureError']
20
21
22class CorrectorProtocolScheduler(metaclass=ABCMeta):
23    """
24    Abstract class to define the protocol / interface for correction operators. The client code qadapters / submission
25    script generator method / ... should implement these methods.
26    """
27
28    @property
29    @abc.abstractmethod
30    def name(self):
31        return str()
32
33    @abstractmethod
34    def exclude_nodes(self, nodes):
35        """
36        Method to exclude certain nodes from being used in the calculation. It is called when a calculation seemed to
37        have been crashed due to a hardware failure at the nodes specified.
38
39            nodes: list of node numbers that were found to cause problems
40
41        returns True if the memory could be increased False otherwise
42        """
43
44    @abstractmethod
45    def increase_mem(self):
46        """
47        Method to increase then memory in the calculation. It is called when a calculation seemed to have been crashed
48        due to a insufficient memory.
49
50        returns True if the memory could be increased False otherwise
51        """
52
53    @abstractmethod
54    def increase_time(self):
55        """
56        Method to increase te time for the calculation. It is called when a calculation seemed to
57        have been crashed due to a time limit.
58
59        returns True if the memory could be increased False otherwise
60        """
61
62    @abstractmethod
63    def increase_cpus(self):
64        """
65        Method to increse the number of cpus being used in the calculation. It is called when a calculation seemed to
66        have been crashed due to time or memory limits being broken.
67
68        returns True if the memory could be increased False otherwise
69        """
70
71
72class CorrectorProtocolApplication(metaclass=ABCMeta):
73    """
74    Abstract class to define the protocol / interface for correction operators. The client code quadapters / submission
75    script generator method / ... should implement these methods.
76    """
77
78    @property
79    @abc.abstractmethod
80    def name(self):
81        return str()
82
83    @abstractmethod
84    def decrease_mem(self):
85        """
86        Method to increase then memory in the calculation. It is called when a calculation seemed to have been crashed
87        due to a insufficient memory.
88
89        returns True if the memory could be increased False otherwise
90        """
91
92    @abstractmethod
93    def speed_up(self):
94        """
95        Method to speed_up the calculation. It is called when a calculation seemed to time limits being broken.
96
97        returns True if the memory could be increased False otherwise
98        """
99
100
101class AbstractError(metaclass=ABCMeta):
102    """
103    Error base class
104    """
105
106    def __init__(self, errmsg, meta_data):
107        self.errmsg = errmsg
108        self.meta_data = meta_data if meta_data is not None else {}
109
110    def __str__(self):
111        _message = '%s  %s\n' \
112                   '  error message : %s \n' \
113                   '  meta data     : %s' % (self.name, self.__doc__, self.errmsg, str(self.meta_data))
114        return _message
115
116    @property
117    def name(self):
118        return self.__class__.__name__
119
120    @property
121    def scheduler_adapter_solutions(self):
122        """
123        to be implemented by concrete errors returning a list of tuples defining corrections. The First element of the
124          tuple should be a string of one of the methods in CorrectorProtocolScheduler, the second element should
125          contain the arguments.
126        """
127        return []
128
129    @property
130    def application_adapter_solutions(self):
131        """
132        to be implemented by concrete errors returning a list of tuples defining corrections. The First element of the
133          tuple should be a string of one of the methods in CorrectorProtocolApplication, the second element should
134          contain the arguments.
135        """
136        return []
137
138    def last_resort_solution(self):
139        """
140        what to do if every thing else fails...
141        """
142        print('non of the defined solutions for %s returned success...' % self.name)
143        return
144
145
146class SubmitError(AbstractError):
147    """
148    Errors occurring at submission. The limits on the cluster may have changed.
149    """
150
151
152class FullQueueError(AbstractError):
153    """
154    Errors occurring at submission. To many jobs in the queue / total cpus / .. .
155    """
156
157
158class DiskError(AbstractError):
159    """
160    Errors involving problems writing to disk.
161    """
162
163
164class TimeCancelError(AbstractError):
165    """
166    Error due to exceeding the time limit for the job.
167      .limit will return a list of limits that were broken, None if it could not be determined.
168    """
169
170    @property
171    def limit(self):
172        return self.meta_data.get('broken_limit')
173
174    @property
175    def scheduler_adapter_solutions(self):
176        return [(CorrectorProtocolScheduler.increase_time,)]
177
178    @property
179    def application_adapter_solutions(self):
180        return [(CorrectorProtocolApplication.speed_up,)]
181
182
183class MemoryCancelError(AbstractError):
184    """
185    Error due to exceeding the memory limit for the job.
186      .limit will return a list of limits that were broken, None if it could not be determined.
187    """
188
189    @property
190    def limit(self):
191        return self.meta_data.get('broken_limit')
192
193    @property
194    def scheduler_adapter_solutions(self):
195        return [(CorrectorProtocolScheduler.increase_mem,)]
196
197    @property
198    def application_adapter_solutions(self):
199        return [(CorrectorProtocolApplication.decrease_mem,)]
200
201
202class MasterProcessMemoryCancelError(AbstractError):
203    """
204    Error due to exceeding the memory limit for the job on the master node.
205    """
206
207
208class SlaveProcessMemoryCancelError(AbstractError):
209    """
210    Error due to exceeding the memory limit for the job on a node different from the master.
211    """
212
213
214class NodeFailureError(AbstractError):
215    """
216    Error due the hardware failure of a specific node.
217     .node will return a list of problematic nodes, None if it could not be determined.
218    """
219
220    @property
221    def nodes(self):
222        return self.meta_data.get('nodes')
223
224    @property
225    def scheduler_adapter_solutions(self):
226        return [(CorrectorProtocolScheduler.exclude_nodes, [self.nodes])]
227
228
229class AbstractErrorParser(metaclass=ABCMeta):
230    """
231    Abstract class for parsing errors originating from the scheduler system and error that are not reported by the
232    program itself, i.e. segmentation faults.
233
234    A concrete implementation of this class for a specific scheduler needs a class attribute ERRORS for containing a
235    dictionary specifying error:
236
237    ERRORS = {ErrorClass: {
238                'file_specifier' : {
239                    'string': "the string to be looked for",
240                    'meta_filter': "string specifing the regular expression to obtain the meta data"
241                    }
242                }
243
244    """
245    def __init__(self, err_file, out_file=None, run_err_file=None, batch_err_file=None):
246        self.files = {'err': err_file, 'out': out_file, 'run_err': run_err_file, 'batch_err': batch_err_file}
247        self.errors = []
248
249    @property
250    @abc.abstractmethod
251    def error_definitions(self):
252        return dict()
253
254    @staticmethod
255    def extract_metadata(lines, meta_filter):
256        meta_dict = {}
257        for key in meta_filter.keys():
258            values = []
259            for line in lines:
260                match = re.match(meta_filter[key][0], line)
261                if match is not None:
262                    values.append(re.match(meta_filter[key][0], line).group(meta_filter[key][1]))
263            values = sorted(set(values))
264            meta_dict.update({key: values})
265        return meta_dict
266
267    def parse_single(self, errmsg):
268        """
269        Parse the provided files for the corresponding strings.
270        """
271        found = False
272        message = None
273        metadata = None
274        for k in errmsg.keys():
275            if self.files[k] is not None:
276                #print('parsing ', self.files[k], ' for ', errmsg[k]['string'])
277                try:
278                    with open(self.files[k], mode='r') as f:
279                        lines = f.read().split('\n')
280                    for line in lines:
281                        if errmsg[k]['string'] in line:
282                            message = line
283                            found = True
284                    if found:
285                        metadata = self.extract_metadata(lines, errmsg[k]['meta_filter'])
286                except (IOError, OSError):
287                    print(self.files[k], 'not found')
288                    pass
289                except TypeError:
290                    print('type error', self.files[k], ' has type ', self.files[k].cls(), ' should be string.')
291                    pass
292
293        return found, message, metadata
294
295    def parse(self):
296        """
297        Parse for the occurens of all errors defined in ERRORS
298        """
299        errors_tested = 0
300        for error in self.error_definitions:
301            errors_tested += 1
302            result = self.parse_single(self.error_definitions[error])
303            if result[0]:
304                self.errors.append(error(result[1], result[2]))
305        if len(self.errors) > 0:
306            print('QUEUE_ERROR FOUND')
307            for error in self.errors:
308                print(error)
309
310        return errors_tested
311
312
313class SlurmErrorParser(AbstractErrorParser):
314    """
315    Implementation of the error definitions for the Slurm scheduler
316    """
317    @property
318    def error_definitions(self):
319        return {
320            SubmitError: {
321                'batch_err': {
322                    'string': "Batch job submission failed",
323                    'meta_filter': {}
324                }
325            },
326            FullQueueError: {
327                'batch_err': {
328                    'string': "Job violates accounting/QOS policy",
329                    'meta_filter': {}
330                }
331            },
332            MemoryCancelError: {
333                'err': {
334                    'string': "Exceeded job memory limit",
335                    'meta_filter': {}
336                }
337            },
338#slurmstepd: error: *** JOB 1803480 CANCELLED AT 2015-12-16T14:57:32 DUE TO TIME LIMIT on lmWn009 ***
339#slurmstepd: error: *** JOB 1803712 CANCELLED AT 2015-12-17T15:21:41 DUE TO TIME LIMIT on lmWn001 ***
340            TimeCancelError: {
341                'err': {
342                    'string': "DUE TO TIME LIMIT",
343                    'meta_filter': {
344                        'time_of_cancel': [r"(.*)JOB (\d+) CANCELLED AT (\S*) DUE TO TIME LIMIT(.*)", 3]
345                    }
346                }
347            },
348            NodeFailureError: {
349                'run_err': {
350                    'string': "can't open /dev/ipath, network down",
351                    'meta_filter': {
352                        'nodes': [r"node(\d+)\.(\d+)can't open (\S*), network down \(err=26\)", 1]
353                    }
354                }
355            },
356            AbstractError: {
357                'out': {
358                    'string': "a string to be found",
359                    'meta_filter': {}
360                }
361            }
362        }
363
364
365class PBSErrorParser(AbstractErrorParser):
366    """
367    Implementation for the PBS scheduler
368        PBS: job killed: walltime 932 exceeded limit 900
369        PBS: job killed: walltime 46 exceeded limit 30
370        PBS: job killed: vmem 2085244kb exceeded limit 1945600kb
371    """
372
373    @property
374    def error_definitions(self):
375        return {
376            TimeCancelError: {
377                'out': {
378                    'string': "job killed: walltime",
379                    'meta_filter': {
380                        'broken_limit': [r"=>> PBS: job killed: walltime (\d+) exceeded limit (\d+)", 2]
381                    }
382                }
383            },
384            AbstractError: {
385                'out': {
386                    'string': "a string to be found",
387                    'meta_filter': {}
388                }
389            },
390            MemoryCancelError: {
391                'out': {
392                    'string': "job killed: vmem",
393                    'meta_filter': {
394                        'broken_limit': [r"(.*)job killed: vmem (\d+)kb exceeded limit (\d+)kb", 3]
395                    }
396                }
397            }
398        }
399
400
401ALL_PARSERS = {'slurm': SlurmErrorParser, 'pbspro': PBSErrorParser, 'torque': PBSErrorParser}
402
403
404def get_parser(scheduler, err_file, out_file=None, run_err_file=None, batch_err_file=None):
405    """
406    Factory function to provide the parser for the specified scheduler. If the scheduler is not implemented None is
407    returned. The files, string, correspond to file names of the out and err files:
408    err_file        stderr of the scheduler
409    out_file        stdout of the scheduler
410    run_err_file    stderr of the application
411    batch_err_file  stderr of the submission
412
413    Returns:
414        None if scheduler is not supported.
415    """
416    cls = ALL_PARSERS.get(scheduler)
417    return cls if cls is None else cls(err_file, out_file, run_err_file, batch_err_file)
418
419
420if __name__ == "__main__":
421    my_parser = get_parser('pbs', err_file='queue.err', out_file='queue.out', run_err_file='run.err',
422                           batch_err_file='sbatch.err')
423    my_parser.parse()
424    print('parser.errors', my_parser.errors)
425    for my_error in my_parser.errors:
426        print(my_error)
427