1import os
2import os.path
3import sys
4import signal
5import tempfile
6import pytest
7from contextlib import contextmanager
8try:
9    from unittest.mock import patch
10except ImportError:
11    from mock import patch
12
13try:
14    from subprocess import run
15except ImportError:
16    # python2 support
17    from subprocess import call as run
18
19import pid
20
21pid.DEFAULT_PID_DIR = tempfile.gettempdir()
22
23if sys.platform == "win32":
24    # Fix backslashes on windows to properly execute "run" command
25    pid.DEFAULT_PID_DIR = pid.DEFAULT_PID_DIR.replace("\\", "/")
26
27
28# https://code.google.com/p/python-nose/issues/detail?id=175
29@contextmanager
30def raising(*exc_types):
31    """
32    A context manager to ensure that an exception of a given list of
33    types is thrown.
34
35    Instead of::
36
37      @nose.tools.raises(ValueError)
38      def test_that_raises():
39        # ... lengthy setup
40        raise ValueError
41
42    you can write::
43
44      def test_that_raises_at_the_end():
45        # ... lengthy setup
46        with raising(ValueError):
47          raise ValueError
48
49    to make the scope for catching exceptions as small as possible.
50    """
51    try:
52        yield
53    except exc_types:
54        pass
55    except Exception:
56        raise
57    else:
58        raise AssertionError("Failed to throw exception of type(s) %s." % (", ".join(exc_type.__name__ for exc_type in exc_types),))
59
60
61@contextmanager
62def raising_windows_io_error():
63    try:
64        yield
65    except IOError as exc:
66        if exc.errno != 13:
67            raise
68    except Exception:
69        raise
70    else:
71        raise AssertionError("Failed to throw exception")
72
73
74def test_pid_class():
75    pidfile = pid.PidFile()
76    pidfile.create()
77    pidfile.close()
78    assert not os.path.exists(pidfile.filename)
79
80
81def test_pid_context_manager():
82    with pid.PidFile() as pidfile:
83        pass
84
85    assert not os.path.exists(pidfile.filename)
86
87
88@pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows")
89def test_pid_pid():
90    with pid.PidFile() as pidfile:
91        pidnr = int(open(pidfile.filename, "r").readline().strip())
92        assert pidnr == os.getpid(), "%s != %s" % (pidnr, os.getpid())
93    assert not os.path.exists(pidfile.filename)
94
95
96@pytest.mark.skipif(sys.platform != "win32", reason="only runs on windows")
97def test_pid_pid_win32():
98    def read_pidfile_data():
99        return open(pidfile.filename, "r").readline().strip()
100
101    with pid.PidFile() as pidfile:
102        # On windows Python2 opens a file but reads an empty line from it
103        # Python3 throws IOError(13, Access denied) instead, which we are catching with raising_windows_io_error()
104        if sys.version_info.major < 3:
105            pidtext = read_pidfile_data()
106            assert pidtext == "", "Read '%s' from locked file on Windows with Python2" % (pidtext)
107        else:
108            with raising_windows_io_error():
109                pidtext = read_pidfile_data()
110    assert not os.path.exists(pidfile.filename)
111
112
113def test_pid_custom_name():
114    with pid.PidFile(pidname="testpidfile") as pidfile:
115        pass
116    assert not os.path.exists(pidfile.filename)
117
118
119def test_pid_enforce_dotpid_postfix():
120    with pid.PidFile(pidname="testpidfile", enforce_dotpid_postfix=False) as pidfile:
121        assert not pidfile.filename.endswith(".pid")
122    assert not os.path.exists(pidfile.filename)
123
124
125def test_pid_force_tmpdir():
126    with pid.PidFile(force_tmpdir=True) as pidfile:
127        pass
128    assert not os.path.exists(pidfile.filename)
129
130
131def test_pid_custom_dir():
132    with pid.PidFile(piddir=os.path.join(pid.DEFAULT_PID_DIR, "testpidfile.dir")) as pidfile:
133        pass
134    assert not os.path.exists(pidfile.filename)
135
136
137def test_pid_piddir_exists_as_file():
138    with tempfile.NamedTemporaryFile() as tmpfile:
139        with raising(IOError):
140            with pid.PidFile(piddir=tmpfile.name):
141                pass
142
143
144def test_pid_no_term_signal():
145    def _noop(*args, **kwargs):
146        pass
147
148    signal.signal(signal.SIGTERM, _noop)
149    with pid.PidFile(register_term_signal_handler=False) as pidfile:
150        assert signal.getsignal(signal.SIGTERM) is _noop
151    assert not os.path.exists(pidfile.filename)
152
153
154def test_pid_term_signal():
155    def _noop(*args, **kwargs):
156        pass
157
158    signal.signal(signal.SIGTERM, _noop)
159    with pid.PidFile(register_term_signal_handler=True) as pidfile:
160        assert signal.getsignal(signal.SIGTERM) is not _noop
161    assert not os.path.exists(pidfile.filename)
162
163
164def test_pid_force_register_term_signal_handler():
165    def _noop(*args, **kwargs):
166        pass
167
168    def _custom_signal_func(*args, **kwargs):
169        pass
170
171    signal.signal(signal.SIGTERM, _custom_signal_func)
172    assert signal.getsignal(signal.SIGTERM) is _custom_signal_func
173    with pid.PidFile(register_term_signal_handler=True) as pidfile:
174        assert signal.getsignal(signal.SIGTERM) is not _custom_signal_func
175    assert not os.path.exists(pidfile.filename)
176
177
178def test_pid_supply_term_signal_handler():
179    def _noop(*args, **kwargs):
180        pass
181
182    signal.signal(signal.SIGTERM, signal.SIG_IGN)
183
184    with pid.PidFile(register_term_signal_handler=_noop) as pidfile:
185        assert signal.getsignal(signal.SIGTERM) is _noop
186    assert not os.path.exists(pidfile.filename)
187
188
189@pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows")
190def test_pid_chmod():
191    with pid.PidFile(chmod=0o600) as pidfile:
192        pass
193    assert not os.path.exists(pidfile.filename)
194
195
196@pytest.mark.skipif(sys.platform != "win32", reason="only runs on windows")
197def test_pid_chmod_win32():
198    with raising(pid.PidFileConfigurationError):
199        with pid.PidFile(chmod=0o600):
200            pass
201
202
203def test_pid_already_locked():
204    with pid.PidFile() as _pid:
205        with raising(pid.PidFileAlreadyLockedError):
206            with pid.PidFile():
207                pass
208        assert os.path.exists(_pid.filename)
209    assert not os.path.exists(_pid.filename)
210
211
212def test_pid_already_locked_custom_name():
213    with pid.PidFile(pidname="testpidfile") as _pid:
214        with raising(pid.PidFileAlreadyLockedError):
215            with pid.PidFile(pidname="testpidfile"):
216                pass
217        assert os.path.exists(_pid.filename)
218    assert not os.path.exists(_pid.filename)
219
220
221def test_pid_already_locked_multi_process():
222    pidname = "test_pid_already_locked_multi_process"
223    piddir = pid.DEFAULT_PID_DIR
224    with pid.PidFile(pidname=pidname, piddir=piddir) as _pid:
225        s = """
226import sys, pid
227try:
228    with pid.PidFile(pidname="%s", piddir="%s"):
229        pass
230except pid.PidFileAlreadyLockedError:
231    sys.exit(123)
232""" % (pidname, piddir)
233        result = run([sys.executable, '-c', s])
234        returncode = result if isinstance(result, int) else result.returncode
235        assert returncode == 123
236        assert os.path.exists(_pid.filename)
237    assert not os.path.exists(_pid.filename)
238
239
240def test_pid_two_locks_multi_process():
241    with pid.PidFile() as _pid:
242        s = """
243import os, pid
244with pid.PidFile("pytest2", piddir="%s") as _pid:
245    assert os.path.exists(_pid.filename)
246assert not os.path.exists(_pid.filename)
247""" % pid.DEFAULT_PID_DIR
248        result = run([sys.executable, '-c', s])
249        returncode = result if isinstance(result, int) else result.returncode
250        assert returncode == 0
251        assert os.path.exists(_pid.filename)
252    assert not os.path.exists(_pid.filename)
253
254
255def test_pid_already_running():
256    with pid.PidFile(lock_pidfile=False) as _pid:
257        with raising(pid.PidFileAlreadyRunningError):
258            with pid.PidFile(lock_pidfile=False):
259                pass
260        assert os.path.exists(_pid.filename)
261    assert not os.path.exists(_pid.filename)
262
263
264def test_pid_already_running_custom_name():
265    with pid.PidFile(lock_pidfile=False, pidname="testpidfile") as _pid:
266        with raising(pid.PidFileAlreadyRunningError):
267            with pid.PidFile(lock_pidfile=False, pidname="testpidfile"):
268                pass
269        assert os.path.exists(_pid.filename)
270    assert not os.path.exists(_pid.filename)
271
272
273def test_pid_already_running_exception_has_pid_value():
274    with pid.PidFile(lock_pidfile=False, pidname="testpidfile") as _pid:
275        with pytest.raises(pid.PidFileAlreadyRunningError) as excinfo:
276            with pid.PidFile(lock_pidfile=False, pidname="testpidfile"):
277                pass
278        assert excinfo.value.pid is not None
279        assert excinfo.value.pid > 0
280        assert os.path.exists(_pid.filename)
281    assert not os.path.exists(_pid.filename)
282
283
284def test_pid_decorator():
285    from pid.decorator import pidfile
286
287    @pidfile()
288    def test_decorator():
289        pass
290
291    test_decorator()
292
293
294def test_pid_decorator_already_locked():
295    from pid.decorator import pidfile
296
297    @pidfile("testpiddecorator")
298    def test_decorator():
299        with raising(pid.PidFileAlreadyLockedError):
300            @pidfile("testpiddecorator")
301            def test_decorator2():
302                pass
303            test_decorator2()
304
305    test_decorator()
306
307
308def test_pid_already_closed():
309    pidfile = pid.PidFile()
310    pidfile.create()
311    try:
312        pidfile.fh.close()
313    finally:
314        pidfile.close()
315    assert not os.path.exists(pidfile.filename)
316
317
318def test_pid_multiplecreate():
319    pidfile = pid.PidFile()
320    pidfile.create()
321    try:
322        with raising(pid.PidFileAlreadyRunningError, pid.PidFileAlreadyLockedError):
323            pidfile.create()
324    finally:
325        pidfile.close()
326    assert not os.path.exists(pidfile.filename)
327
328
329@pytest.mark.skipif(sys.platform == "win32", reason="os.getgid() does not exist on windows")
330def test_pid_gid():
331    gid = os.getgid()
332    with pid.PidFile(gid=gid) as pidfile:
333        pass
334    assert not os.path.exists(pidfile.filename)
335
336
337@pytest.mark.skipif(sys.platform != "win32", reason="only runs on windows")
338def test_pid_gid_win32():
339    gid = 123
340    with raising(pid.PidFileConfigurationError):
341        with pid.PidFile(gid=gid):
342            pass
343
344
345def test_pid_check_const_empty():
346    pidfile = pid.PidFile()
347    pidfile.setup()
348    try:
349        with open(pidfile.filename, "w") as f:
350            f.write("\n")
351        assert pidfile.check() == pid.PID_CHECK_EMPTY
352    finally:
353        pidfile.close(cleanup=True)
354    assert not os.path.exists(pidfile.filename)
355
356
357def test_pid_check_const_nofile():
358    pidfile = pid.PidFile()
359    assert pidfile.check() == pid.PID_CHECK_NOFILE
360
361
362def test_pid_check_const_samepid():
363    def check_const_samepid():
364        with pid.PidFile(allow_samepid=True) as pidfile:
365            assert pidfile.check() == pid.PID_CHECK_SAMEPID
366        assert not os.path.exists(pidfile.filename)
367
368    if sys.platform != "win32":
369        check_const_samepid()
370    else:
371        with raising(pid.PidFileConfigurationError):
372            check_const_samepid()
373
374
375def test_pid_check_const_notrunning():
376    def check_const_notrunning():
377        with pid.PidFile() as pidfile:
378            with open(pidfile.filename, "w") as f:
379                # hope this does not clash
380                f.write("999999999\n")
381                f.flush()
382                assert pidfile.check() == pid.PID_CHECK_NOTRUNNING
383        assert not os.path.exists(pidfile.filename)
384
385    if sys.platform != "win32":
386        check_const_notrunning()
387    else:
388        with raising_windows_io_error():
389            check_const_notrunning()
390
391
392def test_pid_check_already_running():
393    with pid.PidFile() as pidfile:
394        pidfile2 = pid.PidFile()
395        with raising(pid.PidFileAlreadyRunningError):
396            pidfile2.check()
397    assert not os.path.exists(pidfile.filename)
398
399
400def test_pid_check_samepid_with_blocks():
401    def check_samepid_with_blocks_separate_objects():
402        with pid.PidFile(allow_samepid=True):
403            with pid.PidFile(allow_samepid=True):
404                pass
405
406    def check_samepid_with_blocks_same_objects():
407        pidfile = pid.PidFile(allow_samepid=True)
408        with pidfile:
409            with pidfile:
410                pass
411
412        assert not os.path.exists(pidfile.filename)
413
414    if sys.platform != "win32":
415        check_samepid_with_blocks_separate_objects()
416    else:
417        with raising(pid.PidFileConfigurationError):
418            check_samepid_with_blocks_separate_objects()
419
420    if sys.platform != "win32":
421        check_samepid_with_blocks_same_objects()
422    else:
423        with raising(pid.PidFileConfigurationError):
424            check_samepid_with_blocks_same_objects()
425
426
427def test_pid_check_samepid():
428    def check_samepid():
429        pidfile = pid.PidFile(allow_samepid=True)
430
431        try:
432            pidfile.create()
433            pidfile.create()
434        finally:
435            pidfile.close()
436
437        assert not os.path.exists(pidfile.filename)
438
439    if sys.platform != "win32":
440        check_samepid()
441    else:
442        with raising(pid.PidFileConfigurationError):
443            check_samepid()
444
445
446@pytest.mark.skipif(sys.platform == "win32", reason="test not supported on win32")
447@patch("os.getpid")
448@patch("os.kill")
449def test_pid_raises_already_running_when_samepid_and_two_different_pids(mock_getpid, mock_kill):
450    pidfile_proc1 = pid.PidFile()
451    pidfile_proc2 = pid.PidFile(allow_samepid=True)
452
453    try:
454        mock_getpid.return_value = 1
455        pidfile_proc1.create()
456
457        mock_getpid.return_value = 2
458        with raising(pid.PidFileAlreadyRunningError):
459            pidfile_proc2.create()
460
461    finally:
462        pidfile_proc1.close()
463        pidfile_proc2.close()
464
465    assert not os.path.exists(pidfile_proc1.filename)
466    assert not os.path.exists(pidfile_proc2.filename)
467
468
469def test_pid_default_term_signal():
470    signal.signal(signal.SIGTERM, signal.SIG_DFL)
471
472    with pid.PidFile() as pidfile:
473        assert callable(signal.getsignal(signal.SIGTERM)) is True
474
475    assert not os.path.exists(pidfile.filename)
476
477
478def test_pid_ignore_term_signal():
479    signal.signal(signal.SIGTERM, signal.SIG_IGN)
480
481    with pid.PidFile() as pidfile:
482        assert signal.getsignal(signal.SIGTERM) == signal.SIG_IGN
483
484    assert not os.path.exists(pidfile.filename)
485
486
487def test_pid_custom_term_signal():
488    def _noop(*args, **kwargs):
489        pass
490
491    signal.signal(signal.SIGTERM, _noop)
492
493    with pid.PidFile() as pidfile:
494        assert signal.getsignal(signal.SIGTERM) == _noop
495
496    assert not os.path.exists(pidfile.filename)
497
498
499# def test_pid_unknown_term_signal():
500#     # Not sure how to properly test this when signal.getsignal returns None
501#     #  - perhaps by writing a C extension which might get ugly
502#     #
503#     with pid.PidFile():
504#         assert signal.getsignal(signal.SIGTERM) == None
505
506
507def test_double_close_race_condition():
508    # https://github.com/trbs/pid/issues/22
509    pidfile1 = pid.PidFile()
510    pidfile2 = pid.PidFile()
511
512    try:
513        pidfile1.create()
514        assert os.path.exists(pidfile1.filename)
515    finally:
516        pidfile1.close()
517        assert not os.path.exists(pidfile1.filename)
518
519    try:
520        pidfile2.create()
521        assert os.path.exists(pidfile2.filename)
522
523        # simulate calling atexit in process of pidfile1
524        pidfile1.close()
525
526        assert os.path.exists(pidfile2.filename)
527    finally:
528        pidfile2.close()
529
530    assert not os.path.exists(pidfile1.filename)
531    assert not os.path.exists(pidfile2.filename)
532
533
534@pytest.mark.skipif(sys.version_info < (3, 2), reason="requires python3.2 or higher")
535def test_pid_contextdecorator():
536    @pid.PidFile()
537    def test_decorator():
538        pass
539
540    test_decorator()
541
542
543@pytest.mark.skipif(sys.version_info < (3, 2), reason="requires python3.2 or higher")
544def test_pid_contextdecorator_already_locked():
545    @pid.PidFile("testpiddecorator")
546    def test_decorator():
547        with raising(pid.PidFileAlreadyLockedError):
548            @pid.PidFile("testpiddecorator")
549            def test_decorator2():
550                pass
551            test_decorator2()
552
553    test_decorator()
554
555
556@pytest.mark.skipif(sys.version_info < (3, 5), reason="requires python3.5 or higher")
557@patch("atexit.register", autospec=True)
558def test_register_atexit_false(mock_atexit_register):
559    with pid.PidFile(register_atexit=False):
560        mock_atexit_register.assert_not_called()
561
562
563@patch("atexit.register", autospec=True)
564def test_register_atexit_true(mock_atexit_register):
565    with pid.PidFile(register_atexit=True) as pidfile:
566        mock_atexit_register.assert_called_once_with(pidfile.close)
567