1# This file is part of Buildbot.  Buildbot is free software: you can
2# redistribute it and/or modify it under the terms of the GNU General Public
3# License as published by the Free Software Foundation, version 2.
4#
5# This program is distributed in the hope that it will be useful, but WITHOUT
6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8# details.
9#
10# You should have received a copy of the GNU General Public License along with
11# this program; if not, write to the Free Software Foundation, Inc., 51
12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13#
14# Copyright Buildbot Team Members
15
16# We cannot use the builtins module here from Python-Future.
17# We need to use the native __builtin__ module on Python 2,
18# and builtins module on Python 3, because we need to override
19# the actual native open method.
20
21from __future__ import absolute_import
22from __future__ import print_function
23from future.utils import PY3
24from future.utils import string_types
25
26import errno
27import os
28import re
29import shutil
30import sys
31from io import BytesIO
32from io import StringIO
33
34import mock
35
36from twisted.python import log
37
38from buildbot_worker.scripts import base
39
40try:
41    # Python 2
42    import __builtin__ as builtins
43except ImportError:
44    # Python 3
45    import builtins
46
47
48def nl(s):
49    """Convert the given string to the native newline format, assuming it is
50    already in normal UNIX newline format (\n).  Use this to create the
51    appropriate expectation in an assertEqual"""
52    if not isinstance(s, string_types):
53        return s
54    return s.replace('\n', os.linesep)
55
56
57class BasedirMixin(object):
58
59    """Mix this in and call setUpBasedir and tearDownBasedir to set up
60    a clean basedir with a name given in self.basedir."""
61
62    def setUpBasedir(self):
63        self.basedir = "test-basedir"
64        if os.path.exists(self.basedir):
65            shutil.rmtree(self.basedir)
66
67    def tearDownBasedir(self):
68        if os.path.exists(self.basedir):
69            shutil.rmtree(self.basedir)
70
71
72class IsWorkerDirMixin(object):
73
74    """
75    Mixin for setting up mocked base.isWorkerDir() function
76    """
77
78    def setupUpIsWorkerDir(self, return_value):
79        self.isWorkerDir = mock.Mock(return_value=return_value)
80        self.patch(base, "isWorkerDir", self.isWorkerDir)
81
82
83class PatcherMixin(object):
84
85    """
86    Mix this in to get a few special-cased patching methods
87    """
88
89    def patch_os_uname(self, replacement):
90        # twisted's 'patch' doesn't handle the case where an attribute
91        # doesn't exist..
92        if hasattr(os, 'uname'):
93            self.patch(os, 'uname', replacement)
94        else:
95            def cleanup():
96                del os.uname
97            self.addCleanup(cleanup)
98            os.uname = replacement
99
100
101class FileIOMixin(object):
102
103    """
104    Mixin for patching open(), read() and write() to simulate successful
105    I/O operations and various I/O errors.
106    """
107
108    def setUpOpen(self, file_contents="dummy-contents"):
109        """
110        patch open() to return file object with provided contents.
111
112        @param file_contents: contents that will be returned by file object's
113                              read() method
114        """
115        # Use mock.mock_open() to create a substitute for
116        # open().
117        fakeOpen = mock.mock_open(read_data=file_contents)
118
119        # When fakeOpen() is called, it returns a Mock
120        # that has these methods: read(), write(), __enter__(), __exit__().
121        # read() will always return the value of the 'file_contents variable.
122        self.fileobj = fakeOpen()
123
124        # patch open() to always return our Mock file object
125        self.open = mock.Mock(return_value=self.fileobj)
126        self.patch(builtins, "open", self.open)
127
128    def setUpOpenError(self, errno=errno.ENOENT, strerror="dummy-msg",
129                       filename="dummy-file"):
130        """
131        patch open() to raise IOError
132
133        @param    errno: exception's errno value
134        @param strerror: exception's strerror value
135        @param filename: exception's filename value
136        """
137        # Use mock.mock_open() to create a substitute for
138        # open().
139        fakeOpen = mock.mock_open()
140
141        # Add side_effect so that calling fakeOpen() will always
142        # raise an IOError.
143        fakeOpen.side_effect = IOError(errno, strerror, filename)
144        self.open = fakeOpen
145        self.patch(builtins, "open", self.open)
146
147    def setUpReadError(self, errno=errno.EIO, strerror="dummy-msg",
148                       filename="dummy-file"):
149        """
150        patch open() to return a file object that will raise IOError on read()
151
152        @param    errno: exception's errno value
153        @param strerror: exception's strerror value
154        @param filename: exception's filename value
155
156        """
157        # Use mock.mock_open() to create a substitute for
158        # open().
159        fakeOpen = mock.mock_open()
160
161        # When fakeOpen() is called, it returns a Mock
162        # that has these methods: read(), write(), __enter__(), __exit__().
163        self.fileobj = fakeOpen()
164
165        # Add side_effect so that calling read() will always
166        # raise an IOError.
167        self.fileobj.read.side_effect = IOError(errno, strerror, filename)
168
169        # patch open() to always return our Mock file object
170        self.open = mock.Mock(return_value=self.fileobj)
171        self.patch(builtins, "open", self.open)
172
173    def setUpWriteError(self, errno=errno.ENOSPC, strerror="dummy-msg",
174                        filename="dummy-file"):
175        """
176        patch open() to return a file object that will raise IOError on write()
177
178        @param    errno: exception's errno value
179        @param strerror: exception's strerror value
180        @param filename: exception's filename value
181        """
182        # Use mock.mock_open() to create a substitute for
183        # open().
184        fakeOpen = mock.mock_open()
185
186        # When fakeOpen() is called, it returns a Mock
187        # that has these methods: read(), write(), __enter__(), __exit__().
188        self.fileobj = fakeOpen()
189
190        # Add side_effect so that calling write() will always
191        # raise an IOError.
192        self.fileobj.write.side_effect = IOError(errno, strerror, filename)
193
194        # patch open() to always return our Mock file object
195        self.open = mock.Mock(return_value=self.fileobj)
196        self.patch(builtins, "open", self.open)
197
198
199class LoggingMixin(object):
200
201    def setUpLogging(self):
202        self._logEvents = []
203        log.addObserver(self._logEvents.append)
204        self.addCleanup(log.removeObserver, self._logEvents.append)
205
206    def assertLogged(self, *args):
207        for regexp in args:
208            r = re.compile(regexp)
209            for event in self._logEvents:
210                msg = log.textFromEventDict(event)
211                if msg is not None and r.search(msg):
212                    return
213            self.fail(
214                "{0!r} not matched in log output.\n{1} ".format(regexp, self._logEvents))
215
216    def assertWasQuiet(self):
217        self.assertEqual(self._logEvents, [])
218
219
220class StdoutAssertionsMixin(object):
221
222    """
223    Mix this in to be able to assert on stdout during the test
224    """
225
226    def setUpStdoutAssertions(self):
227        #
228        # sys.stdout is implemented differently
229        # in Python 2 and Python 3, so we need to
230        # override it differently.
231        # In Python 2, sys.stdout is a byte stream.
232        # In Python 3, sys.stdout is a text stream.
233        if PY3:
234            self.stdout = StringIO()
235        else:
236            self.stdout = BytesIO()
237        self.patch(sys, 'stdout', self.stdout)
238
239    def assertWasQuiet(self):
240        self.assertEqual(self.stdout.getvalue(), '')
241
242    def assertInStdout(self, exp):
243        self.assertIn(exp, self.stdout.getvalue())
244
245    def assertStdoutEqual(self, exp, msg=None):
246        self.assertEqual(exp, self.stdout.getvalue(), msg)
247
248    def getStdout(self):
249        return self.stdout.getvalue().strip()
250