1import os
2import datetime
3import operator
4from sys import executable
5
6import pytest
7
8from doit import exceptions
9from doit import tools
10from doit import task
11
12
13class TestCreateFolder(object):
14    def test_create_folder(self):
15        def rm_dir():
16            if os.path.exists(DIR_DEP):
17                os.removedirs(DIR_DEP)
18
19        DIR_DEP = os.path.join(os.path.dirname(__file__),"parent/child/")
20        rm_dir()
21        tools.create_folder(DIR_DEP)
22        assert os.path.exists(DIR_DEP)
23        rm_dir()
24
25    def test_error_if_path_is_a_file(self):
26        def rm_file(path):
27            if os.path.exists(path):
28                os.remove(path)
29
30        path = os.path.join(os.path.dirname(__file__), "test_create_folder")
31        with open(path, 'w') as fp:
32            fp.write('testing')
33        pytest.raises(OSError, tools.create_folder, path)
34        rm_file(path)
35
36
37class TestTitleWithActions(object):
38    def test_actions(self):
39        t = task.Task("MyName",["MyAction"], title=tools.title_with_actions)
40        assert "MyName => Cmd: MyAction" == t.title()
41
42    def test_group(self):
43        t = task.Task("MyName", None, file_dep=['file_foo'],
44                      task_dep=['t1','t2'], title=tools.title_with_actions)
45        assert "MyName => Group: t1, t2" == t.title()
46
47
48class TestRunOnce(object):
49    def test_run(self):
50        t = task.Task("TaskX", None, uptodate=[tools.run_once])
51        assert False == tools.run_once(t, t.values)
52        t.save_extra_values()
53        assert True == tools.run_once(t, t.values)
54
55
56class TestConfigChanged(object):
57    def test_invalid_type(self):
58        class NotValid(object):pass
59        uptodate = tools.config_changed(NotValid())
60        pytest.raises(Exception, uptodate, None, None)
61
62    def test_string(self):
63        ua = tools.config_changed('a')
64        ub = tools.config_changed('b')
65        t1 = task.Task("TaskX", None, uptodate=[ua])
66        assert False == ua(t1, t1.values)
67        assert False == ub(t1, t1.values)
68        t1.save_extra_values()
69        assert True == ua(t1, t1.values)
70        assert False == ub(t1, t1.values)
71
72    def test_unicode(self):
73        ua = tools.config_changed({'x': "中文"})
74        ub = tools.config_changed('b')
75        t1 = task.Task("TaskX", None, uptodate=[ua])
76        assert False == ua(t1, t1.values)
77        assert False == ub(t1, t1.values)
78        t1.save_extra_values()
79        assert True == ua(t1, t1.values)
80        assert False == ub(t1, t1.values)
81
82    def test_dict(self):
83        ua = tools.config_changed({'x':'a', 'y':1})
84        ub = tools.config_changed({'x':'b', 'y':1})
85        t1 = task.Task("TaskX", None, uptodate=[ua])
86        assert False == ua(t1, t1.values)
87        assert False == ub(t1, t1.values)
88        t1.save_extra_values()
89        assert True == ua(t1, t1.values)
90        assert False == ub(t1, t1.values)
91
92
93class TestTimeout(object):
94    def test_invalid(self):
95        pytest.raises(Exception, tools.timeout, "abc")
96
97    def test_int(self, monkeypatch):
98        monkeypatch.setattr(tools.time_module, 'time', lambda: 100)
99        uptodate = tools.timeout(5)
100        t = task.Task("TaskX", None, uptodate=[uptodate])
101
102        assert False == uptodate(t, t.values)
103        t.save_extra_values()
104        assert 100 == t.values['success-time']
105
106        monkeypatch.setattr(tools.time_module, 'time', lambda: 103)
107        assert True == uptodate(t, t.values)
108
109        monkeypatch.setattr(tools.time_module, 'time', lambda: 106)
110        assert False == uptodate(t, t.values)
111
112
113    def test_timedelta(self, monkeypatch):
114        monkeypatch.setattr(tools.time_module, 'time', lambda: 10)
115        limit = datetime.timedelta(minutes=2)
116        uptodate = tools.timeout(limit)
117        t = task.Task("TaskX", None, uptodate=[uptodate])
118
119        assert False == uptodate(t, t.values)
120        t.save_extra_values()
121        assert 10 == t.values['success-time']
122
123        monkeypatch.setattr(tools.time_module, 'time', lambda: 100)
124        assert True == uptodate(t, t.values)
125
126        monkeypatch.setattr(tools.time_module, 'time', lambda: 200)
127        assert False == uptodate(t, t.values)
128
129
130    def test_timedelta_big(self, monkeypatch):
131        monkeypatch.setattr(tools.time_module, 'time', lambda: 10)
132        limit = datetime.timedelta(days=2, minutes=5)
133        uptodate = tools.timeout(limit)
134        t = task.Task("TaskX", None, uptodate=[uptodate])
135
136        assert False == uptodate(t, t.values)
137        t.save_extra_values()
138        assert 10 == t.values['success-time']
139
140        monkeypatch.setattr(tools.time_module, 'time', lambda: 3600 * 30)
141        assert True == uptodate(t, t.values)
142
143        monkeypatch.setattr(tools.time_module, 'time', lambda: 3600 * 49)
144        assert False == uptodate(t, t.values)
145
146
147@pytest.fixture
148def checked_file(request):
149    fname = 'mytmpfile'
150    file_ = open(fname, 'a')
151    file_.close()
152    def remove():
153        os.remove(fname)
154    request.addfinalizer(remove)
155    return fname
156
157
158class TestCheckTimestampUnchanged(object):
159
160    def test_time_selection(self):
161        check = tools.check_timestamp_unchanged('check_atime', 'atime')
162        assert 'st_atime' == check._timeattr
163
164        check = tools.check_timestamp_unchanged('check_ctime', 'ctime')
165        assert 'st_ctime' == check._timeattr
166
167        check = tools.check_timestamp_unchanged('check_mtime', 'mtime')
168        assert 'st_mtime' == check._timeattr
169
170        pytest.raises(
171            ValueError,
172            tools.check_timestamp_unchanged, 'check_invalid_time', 'foo')
173
174    def test_file_missing(self):
175        check = tools.check_timestamp_unchanged('no_such_file')
176        t = task.Task("TaskX", None, uptodate=[check])
177        # fake values saved from previous run
178        task_values = {check._key: 1} # needs any value different from None
179        pytest.raises(OSError, check, t, task_values)
180
181    def test_op_ge(self, monkeypatch, checked_file):
182        check = tools.check_timestamp_unchanged(checked_file,cmp_op=operator.ge)
183        t = task.Task("TaskX", None, uptodate=[check])
184
185        # no stored value/first run
186        assert False == check(t, t.values)
187
188        # value just stored is equal to itself
189        t.save_extra_values()
190        assert True == check(t, t.values)
191
192        # stored timestamp less than current, up to date
193        future_time = list(t.values.values())[0] + 100
194        monkeypatch.setattr(check, '_get_time', lambda: future_time)
195        assert False == check(t, t.values)
196
197
198    def test_op_bad_custom(self, monkeypatch, checked_file):
199        # handling misbehaving custom operators
200        def bad_op(prev_time, current_time):
201            raise Exception('oops')
202
203        check = tools.check_timestamp_unchanged(checked_file, cmp_op=bad_op)
204        t = task.Task("TaskX", None, uptodate=[check])
205        # fake values saved from previous run
206        task_values = {check._key: 1} # needs any value different from None
207        pytest.raises(Exception, check, t, task_values)
208
209    def test_multiple_checks(self):
210        # handling multiple checks on one file (should save values in such way
211        # they don't override each other)
212        check_a = tools.check_timestamp_unchanged('check_multi', 'atime')
213        check_m = tools.check_timestamp_unchanged('check_multi', 'mtime')
214        assert check_a._key != check_m._key
215
216
217class TestLongRunning(object):
218    def test_success(self):
219        TEST_PATH = os.path.dirname(__file__)
220        PROGRAM = "%s %s/sample_process.py" % (executable, TEST_PATH)
221        my_action = tools.LongRunning(PROGRAM + " please fail")
222        got = my_action.execute()
223        assert got is None
224
225    def test_ignore_keyboard_interrupt(self, monkeypatch):
226        my_action = tools.LongRunning('')
227        class FakeRaiseInterruptProcess(object):
228            def __init__(self, *args, **kwargs):
229                pass
230            def wait(self):
231                raise KeyboardInterrupt()
232        monkeypatch.setattr(tools.subprocess, 'Popen', FakeRaiseInterruptProcess)
233        got = my_action.execute()
234        assert got is None
235
236class TestInteractive(object):
237    def test_fail(self):
238        TEST_PATH = os.path.dirname(__file__)
239        PROGRAM = "%s %s/sample_process.py" % (executable, TEST_PATH)
240        my_action = tools.Interactive(PROGRAM + " please fail")
241        got = my_action.execute()
242        assert isinstance(got, exceptions.TaskFailed)
243
244    def test_success(self):
245        TEST_PATH = os.path.dirname(__file__)
246        PROGRAM = "%s %s/sample_process.py" % (executable, TEST_PATH)
247        my_action = tools.Interactive(PROGRAM + " ok")
248        got = my_action.execute()
249        assert got is None
250
251
252class TestPythonInteractiveAction(object):
253    def test_success(self):
254        def hello(): print('hello')
255        my_action = tools.PythonInteractiveAction(hello)
256        got = my_action.execute()
257        assert got is None
258
259    def test_ignore_keyboard_interrupt(self, monkeypatch):
260        def raise_x(): raise Exception('x')
261        my_action = tools.PythonInteractiveAction(raise_x)
262        got = my_action.execute()
263        assert isinstance(got, exceptions.TaskError)
264
265    def test_returned_dict_saved_result_values(self):
266        def val(): return {'x': 3}
267        my_action = tools.PythonInteractiveAction(val)
268        got = my_action.execute()
269        assert got is None
270        assert my_action.result == {'x': 3}
271        assert my_action.values == {'x': 3}
272
273    def test_returned_string_saved_result(self):
274        def val(): return 'hello'
275        my_action = tools.PythonInteractiveAction(val)
276        got = my_action.execute()
277        assert got is None
278        assert my_action.result == 'hello'
279