1# coding=utf-8
2# Permission is hereby granted, free of charge, to any person
3# obtaining a copy of this software and associated documentation
4# files (the "Software"), to deal in the Software without
5# restriction, including without limitation the rights to use,
6# copy, modify, merge, publish, distribute, sublicense, and/or
7# sell copies of the Software, and to permit persons to whom the
8# Software is furnished to do so, subject to the following
9# conditions:
10#
11# This permission notice shall be included in all copies or
12# substantial portions of the Software.
13#
14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
15# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
16# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17# PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE AUTHOR(S) BE
18# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
19# AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF
20# OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21# DEALINGS IN THE SOFTWARE.
22
23""" Module for results generation """
24
25import collections
26import copy
27import datetime
28
29from framework import status, exceptions, grouptools
30
31__all__ = [
32    'TestrunResult',
33    'TestResult',
34]
35
36
37class Subtests(collections.abc.MutableMapping):
38    """A dict-like object that stores Statuses as values."""
39    def __init__(self, dict_=None):
40        self.__container = collections.OrderedDict()
41
42        if dict_ is not None:
43            self.update(dict_)
44
45    def __setitem__(self, name, value):
46        self.__container[name.lower()] = status.status_lookup(value)
47
48    def __getitem__(self, name):
49        return self.__container[name.lower()]
50
51    def __delitem__(self, name):
52        del self.__container[name.lower()]
53
54    def __iter__(self):
55        return iter(self.__container)
56
57    def __len__(self):
58        return len(self.__container)
59
60    def __repr__(self):
61        return repr(self.__container)
62
63    def to_json(self):
64        res = dict(self)
65        res['__type__'] = 'Subtests'
66        return res
67
68    @classmethod
69    def from_dict(cls, dict_):
70        if '__type__' in dict_:
71            del dict_['__type__']
72
73        res = cls(dict_)
74
75        return res
76
77
78class StringDescriptor(object):  # pylint: disable=too-few-public-methods
79    """A Shared data descriptor class for TestResult.
80
81    This provides a property that can be passed a str or unicode, but always
82    returns a unicode object.
83
84    """
85    def __init__(self, name, default=''):
86        assert isinstance(default, str)
87        self.__name = name
88        self.__default = default
89
90    def __get__(self, instance, cls):
91        return getattr(instance, self.__name, self.__default)
92
93    def __set__(self, instance, value):
94        if isinstance(value, bytes):
95            setattr(instance, self.__name, value.decode('utf-8', 'replace'))
96        elif isinstance(value, str):
97            setattr(instance, self.__name, value)
98        else:
99            raise TypeError('{} attribute must be a unicode or bytes instance, '
100                            'but was {}.'.format(self.__name, type(value)))
101
102    def __delete__(self, instance):
103        raise NotImplementedError
104
105
106class TimeAttribute(object):
107    """Attribute of TestResult for time.
108
109    This attribute provides a couple of nice helpers. It stores the start and
110    end time and provides methods for getting the total and delta of the times.
111
112    """
113    __slots__ = ['start', 'end']
114
115    def __init__(self, start=0.0, end=0.0):
116        self.start = start
117        self.end = end
118
119    @property
120    def total(self):
121        return self.end - self.start
122
123    @property
124    def delta(self):
125        return str(datetime.timedelta(seconds=self.total))
126
127    def to_json(self):
128        return {
129            'start': self.start,
130            'end': self.end,
131            '__type__': 'TimeAttribute',
132        }
133
134    @classmethod
135    def from_dict(cls, dict_):
136        dict_ = copy.copy(dict_)
137
138        if '__type__' in dict_:
139            del dict_['__type__']
140        return cls(**dict_)
141
142
143class TestResult(object):
144    """An object representing the result of a single test."""
145    __slots__ = ['returncode', '_err', '_out', 'time', 'command', 'traceback',
146                 'environment', 'subtests', 'dmesg', '__result', 'images',
147                 'exception', 'pid']
148    err = StringDescriptor('_err')
149    out = StringDescriptor('_out')
150
151    def __init__(self, result=None):
152        self.returncode = None
153        self.time = TimeAttribute()
154        self.command = str()
155        self.environment = str()
156        self.subtests = Subtests()
157        self.dmesg = str()
158        self.images = None
159        self.traceback = None
160        self.exception = None
161        self.pid = []
162        if result:
163            self.result = result
164        else:
165            self.__result = status.NOTRUN
166
167    @property
168    def result(self):
169        """Return the result of the test.
170
171        If there are subtests return the "worst" value of those subtests. If
172        there are not return the stored value of the test. There is an
173        exception to this rule, and that's if the status is crash; since this
174        status is set by the framework, and can be generated even when some or
175        all unit tests pass.
176
177        """
178        if self.subtests and self.__result != status.CRASH:
179            return max(self.subtests.values())
180        return self.__result
181
182    @property
183    def raw_result(self):
184        """Get the result of the test without taking subtests into account."""
185        return self.__result
186
187    @result.setter
188    def result(self, new):
189        try:
190            self.__result = status.status_lookup(new)
191        except exceptions.PiglitInternalError as e:
192            raise exceptions.PiglitFatalError(str(e))
193
194    def to_json(self):
195        """Return the TestResult as a json serializable object."""
196        obj = {
197            '__type__': 'TestResult',
198            'command': self.command,
199            'environment': self.environment,
200            'err': self.err,
201            'out': self.out,
202            'result': self.result,
203            'returncode': self.returncode,
204            'subtests': self.subtests.to_json(),
205            'time': self.time.to_json(),
206            'exception': self.exception,
207            'traceback': self.traceback,
208            'dmesg': self.dmesg,
209            'pid': self.pid,
210        }
211        return obj
212
213    @classmethod
214    def from_dict(cls, dict_):
215        """Load an already generated result in dictionary form.
216
217        This is used as an alternate constructor which converts an existing
218        dictionary into a TestResult object. It converts a key 'result' into a
219        status.Status object
220
221        """
222        # pylint will say that assining to inst.out or inst.err is a non-slot
223        # because self.err and self.out are descriptors, methods that act like
224        # variables. Just silence pylint
225        # pylint: disable=assigning-non-slot
226        inst = cls()
227
228        for each in ['returncode', 'command', 'exception', 'environment',
229                     'traceback', 'dmesg', 'pid', 'result']:
230            if each in dict_:
231                setattr(inst, each, dict_[each])
232
233        # Set special instances
234        if 'subtests' in dict_:
235            inst.subtests = Subtests.from_dict(dict_['subtests'])
236        if 'time' in dict_:
237            inst.time = TimeAttribute.from_dict(dict_['time'])
238
239        # out and err must be set manually to avoid replacing the setter
240        if 'out' in dict_:
241            inst.out = dict_['out']
242        if 'err' in dict_:
243            inst.err = dict_['err']
244
245        return inst
246
247    def update(self, dict_):
248        """Update the results and subtests fields from a piglit test.
249
250        Native piglit tests output their data as valid json, and piglit uses
251        the json module to parse this data. This method consumes that raw
252        dictionary data and updates itself.
253
254        """
255        if 'result' in dict_:
256            self.result = dict_['result']
257        elif 'subtest' in dict_:
258            self.subtests.update(dict_['subtest'])
259
260
261class Totals(dict):
262    def __init__(self, *args, **kwargs):
263        super(Totals, self).__init__(*args, **kwargs)
264        for each in status.ALL:
265            each = str(each)
266            if each not in self:
267                self[each] = 0
268
269    def __bool__(self):
270        # Since totals are prepopulated, calling 'if not <Totals instance>'
271        # will always result in True, this will cause it to return True only if
272        # one of the values is not zero
273        for each in self.values():
274            if each != 0:
275                return True
276        return False
277
278    def to_json(self):
279        """Convert totals to a json object."""
280        result = copy.copy(self)
281        result['__type__'] = 'Totals'
282        return result
283
284    @classmethod
285    def from_dict(cls, dict_):
286        """Convert a dictionary into a Totals object."""
287        tots = cls(dict_)
288        if '__type__' in tots:
289            del tots['__type__']
290        return tots
291
292
293class TestrunResult(object):
294    """The result of a single piglit run."""
295    def __init__(self):
296        self.info = {}
297        self.options = {}
298        self.time_elapsed = TimeAttribute()
299        self.tests = collections.OrderedDict()
300        self.totals = collections.defaultdict(Totals)
301
302    def get_result(self, key):
303        """Get the result of a test or subtest.
304
305        If neither a test nor a subtest instance exist, then raise the original
306        KeyError generated from looking up <key> in the tests attribute. It is
307        the job of the caller to handle this error.
308
309        Arguments:
310        key -- the key name of the test to return
311
312        """
313        try:
314            return self.tests[key].result
315        except KeyError as e:
316            name, test = grouptools.splitname(key)
317            try:
318                return self.tests[name].subtests[test]
319            except KeyError:
320                raise e
321
322    def calculate_group_totals(self):
323        """Calculate the number of passes, fails, etc at each level."""
324        for name, result in self.tests.items():
325            # If there are subtests treat the test as if it is a group instead
326            # of a test.
327            if result.subtests:
328                for res in result.subtests.values():
329                    res = str(res)
330                    temp = name
331
332                    self.totals[temp][res] += 1
333                    while temp:
334                        temp = grouptools.groupname(temp)
335                        self.totals[temp][res] += 1
336                    self.totals['root'][res] += 1
337            else:
338                res = str(result.result)
339                while name:
340                    name = grouptools.groupname(name)
341                    self.totals[name][res] += 1
342                self.totals['root'][res] += 1
343
344    def to_json(self):
345        if not self.totals:
346            self.calculate_group_totals()
347        rep = copy.copy(self.__dict__)
348        rep['tests'] = collections.OrderedDict((k, t.to_json())
349                       for k, t in self.tests.items())
350        rep['__type__'] = 'TestrunResult'
351        return rep
352
353    @classmethod
354    def from_dict(cls, dict_, _no_totals=False):
355        """Convert a dictionary into a TestrunResult.
356
357        This method is meant to be used for loading results from json or
358        similar formats
359
360        _no_totals is not meant to be used externally, it allows us to control
361        the generation of totals when loading old results formats.
362
363        """
364        res = cls()
365        for name in ['name', 'info', 'options', 'results_version']:
366            value = dict_.get(name)
367            if value:
368                setattr(res, name, value)
369
370        # Since this is used to load partial metadata when writing final test
371        # results there is no guarantee that this will have a "time_elapsed"
372        # key
373        if 'time_elapsed' in dict_:
374            setattr(res, 'time_elapsed',
375                    TimeAttribute.from_dict(dict_['time_elapsed']))
376        res.tests = collections.OrderedDict((n, TestResult.from_dict(t))
377                    for n, t in dict_['tests'].items())
378
379        if not 'totals' in dict_ and not _no_totals:
380            res.calculate_group_totals()
381        else:
382            res.totals = {n: Totals.from_dict(t) for n, t in
383                          dict_['totals'].items()}
384
385        return res
386