1import os 2import os.path 3import sys 4import signal 5import tempfile 6import pytest 7from contextlib import contextmanager 8try: 9 from unittest.mock import patch 10except ImportError: 11 from mock import patch 12 13try: 14 from subprocess import run 15except ImportError: 16 # python2 support 17 from subprocess import call as run 18 19import pid 20 21pid.DEFAULT_PID_DIR = tempfile.gettempdir() 22 23if sys.platform == "win32": 24 # Fix backslashes on windows to properly execute "run" command 25 pid.DEFAULT_PID_DIR = pid.DEFAULT_PID_DIR.replace("\\", "/") 26 27 28# https://code.google.com/p/python-nose/issues/detail?id=175 29@contextmanager 30def raising(*exc_types): 31 """ 32 A context manager to ensure that an exception of a given list of 33 types is thrown. 34 35 Instead of:: 36 37 @nose.tools.raises(ValueError) 38 def test_that_raises(): 39 # ... lengthy setup 40 raise ValueError 41 42 you can write:: 43 44 def test_that_raises_at_the_end(): 45 # ... lengthy setup 46 with raising(ValueError): 47 raise ValueError 48 49 to make the scope for catching exceptions as small as possible. 50 """ 51 try: 52 yield 53 except exc_types: 54 pass 55 except Exception: 56 raise 57 else: 58 raise AssertionError("Failed to throw exception of type(s) %s." % (", ".join(exc_type.__name__ for exc_type in exc_types),)) 59 60 61@contextmanager 62def raising_windows_io_error(): 63 try: 64 yield 65 except IOError as exc: 66 if exc.errno != 13: 67 raise 68 except Exception: 69 raise 70 else: 71 raise AssertionError("Failed to throw exception") 72 73 74def test_pid_class(): 75 pidfile = pid.PidFile() 76 pidfile.create() 77 pidfile.close() 78 assert not os.path.exists(pidfile.filename) 79 80 81def test_pid_context_manager(): 82 with pid.PidFile() as pidfile: 83 pass 84 85 assert not os.path.exists(pidfile.filename) 86 87 88@pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows") 89def test_pid_pid(): 90 with pid.PidFile() as pidfile: 91 pidnr = int(open(pidfile.filename, "r").readline().strip()) 92 assert pidnr == os.getpid(), "%s != %s" % (pidnr, os.getpid()) 93 assert not os.path.exists(pidfile.filename) 94 95 96@pytest.mark.skipif(sys.platform != "win32", reason="only runs on windows") 97def test_pid_pid_win32(): 98 def read_pidfile_data(): 99 return open(pidfile.filename, "r").readline().strip() 100 101 with pid.PidFile() as pidfile: 102 # On windows Python2 opens a file but reads an empty line from it 103 # Python3 throws IOError(13, Access denied) instead, which we are catching with raising_windows_io_error() 104 if sys.version_info.major < 3: 105 pidtext = read_pidfile_data() 106 assert pidtext == "", "Read '%s' from locked file on Windows with Python2" % (pidtext) 107 else: 108 with raising_windows_io_error(): 109 pidtext = read_pidfile_data() 110 assert not os.path.exists(pidfile.filename) 111 112 113def test_pid_custom_name(): 114 with pid.PidFile(pidname="testpidfile") as pidfile: 115 pass 116 assert not os.path.exists(pidfile.filename) 117 118 119def test_pid_enforce_dotpid_postfix(): 120 with pid.PidFile(pidname="testpidfile", enforce_dotpid_postfix=False) as pidfile: 121 assert not pidfile.filename.endswith(".pid") 122 assert not os.path.exists(pidfile.filename) 123 124 125def test_pid_force_tmpdir(): 126 with pid.PidFile(force_tmpdir=True) as pidfile: 127 pass 128 assert not os.path.exists(pidfile.filename) 129 130 131def test_pid_custom_dir(): 132 with pid.PidFile(piddir=os.path.join(pid.DEFAULT_PID_DIR, "testpidfile.dir")) as pidfile: 133 pass 134 assert not os.path.exists(pidfile.filename) 135 136 137def test_pid_piddir_exists_as_file(): 138 with tempfile.NamedTemporaryFile() as tmpfile: 139 with raising(IOError): 140 with pid.PidFile(piddir=tmpfile.name): 141 pass 142 143 144def test_pid_no_term_signal(): 145 def _noop(*args, **kwargs): 146 pass 147 148 signal.signal(signal.SIGTERM, _noop) 149 with pid.PidFile(register_term_signal_handler=False) as pidfile: 150 assert signal.getsignal(signal.SIGTERM) is _noop 151 assert not os.path.exists(pidfile.filename) 152 153 154def test_pid_term_signal(): 155 def _noop(*args, **kwargs): 156 pass 157 158 signal.signal(signal.SIGTERM, _noop) 159 with pid.PidFile(register_term_signal_handler=True) as pidfile: 160 assert signal.getsignal(signal.SIGTERM) is not _noop 161 assert not os.path.exists(pidfile.filename) 162 163 164def test_pid_force_register_term_signal_handler(): 165 def _noop(*args, **kwargs): 166 pass 167 168 def _custom_signal_func(*args, **kwargs): 169 pass 170 171 signal.signal(signal.SIGTERM, _custom_signal_func) 172 assert signal.getsignal(signal.SIGTERM) is _custom_signal_func 173 with pid.PidFile(register_term_signal_handler=True) as pidfile: 174 assert signal.getsignal(signal.SIGTERM) is not _custom_signal_func 175 assert not os.path.exists(pidfile.filename) 176 177 178def test_pid_supply_term_signal_handler(): 179 def _noop(*args, **kwargs): 180 pass 181 182 signal.signal(signal.SIGTERM, signal.SIG_IGN) 183 184 with pid.PidFile(register_term_signal_handler=_noop) as pidfile: 185 assert signal.getsignal(signal.SIGTERM) is _noop 186 assert not os.path.exists(pidfile.filename) 187 188 189@pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows") 190def test_pid_chmod(): 191 with pid.PidFile(chmod=0o600) as pidfile: 192 pass 193 assert not os.path.exists(pidfile.filename) 194 195 196@pytest.mark.skipif(sys.platform != "win32", reason="only runs on windows") 197def test_pid_chmod_win32(): 198 with raising(pid.PidFileConfigurationError): 199 with pid.PidFile(chmod=0o600): 200 pass 201 202 203def test_pid_already_locked(): 204 with pid.PidFile() as _pid: 205 with raising(pid.PidFileAlreadyLockedError): 206 with pid.PidFile(): 207 pass 208 assert os.path.exists(_pid.filename) 209 assert not os.path.exists(_pid.filename) 210 211 212def test_pid_already_locked_custom_name(): 213 with pid.PidFile(pidname="testpidfile") as _pid: 214 with raising(pid.PidFileAlreadyLockedError): 215 with pid.PidFile(pidname="testpidfile"): 216 pass 217 assert os.path.exists(_pid.filename) 218 assert not os.path.exists(_pid.filename) 219 220 221def test_pid_already_locked_multi_process(): 222 pidname = "test_pid_already_locked_multi_process" 223 piddir = pid.DEFAULT_PID_DIR 224 with pid.PidFile(pidname=pidname, piddir=piddir) as _pid: 225 s = """ 226import sys, pid 227try: 228 with pid.PidFile(pidname="%s", piddir="%s"): 229 pass 230except pid.PidFileAlreadyLockedError: 231 sys.exit(123) 232""" % (pidname, piddir) 233 result = run([sys.executable, '-c', s]) 234 returncode = result if isinstance(result, int) else result.returncode 235 assert returncode == 123 236 assert os.path.exists(_pid.filename) 237 assert not os.path.exists(_pid.filename) 238 239 240def test_pid_two_locks_multi_process(): 241 with pid.PidFile() as _pid: 242 s = """ 243import os, pid 244with pid.PidFile("pytest2", piddir="%s") as _pid: 245 assert os.path.exists(_pid.filename) 246assert not os.path.exists(_pid.filename) 247""" % pid.DEFAULT_PID_DIR 248 result = run([sys.executable, '-c', s]) 249 returncode = result if isinstance(result, int) else result.returncode 250 assert returncode == 0 251 assert os.path.exists(_pid.filename) 252 assert not os.path.exists(_pid.filename) 253 254 255def test_pid_already_running(): 256 with pid.PidFile(lock_pidfile=False) as _pid: 257 with raising(pid.PidFileAlreadyRunningError): 258 with pid.PidFile(lock_pidfile=False): 259 pass 260 assert os.path.exists(_pid.filename) 261 assert not os.path.exists(_pid.filename) 262 263 264def test_pid_already_running_custom_name(): 265 with pid.PidFile(lock_pidfile=False, pidname="testpidfile") as _pid: 266 with raising(pid.PidFileAlreadyRunningError): 267 with pid.PidFile(lock_pidfile=False, pidname="testpidfile"): 268 pass 269 assert os.path.exists(_pid.filename) 270 assert not os.path.exists(_pid.filename) 271 272 273def test_pid_already_running_exception_has_pid_value(): 274 with pid.PidFile(lock_pidfile=False, pidname="testpidfile") as _pid: 275 with pytest.raises(pid.PidFileAlreadyRunningError) as excinfo: 276 with pid.PidFile(lock_pidfile=False, pidname="testpidfile"): 277 pass 278 assert excinfo.value.pid is not None 279 assert excinfo.value.pid > 0 280 assert os.path.exists(_pid.filename) 281 assert not os.path.exists(_pid.filename) 282 283 284def test_pid_decorator(): 285 from pid.decorator import pidfile 286 287 @pidfile() 288 def test_decorator(): 289 pass 290 291 test_decorator() 292 293 294def test_pid_decorator_already_locked(): 295 from pid.decorator import pidfile 296 297 @pidfile("testpiddecorator") 298 def test_decorator(): 299 with raising(pid.PidFileAlreadyLockedError): 300 @pidfile("testpiddecorator") 301 def test_decorator2(): 302 pass 303 test_decorator2() 304 305 test_decorator() 306 307 308def test_pid_already_closed(): 309 pidfile = pid.PidFile() 310 pidfile.create() 311 try: 312 pidfile.fh.close() 313 finally: 314 pidfile.close() 315 assert not os.path.exists(pidfile.filename) 316 317 318def test_pid_multiplecreate(): 319 pidfile = pid.PidFile() 320 pidfile.create() 321 try: 322 with raising(pid.PidFileAlreadyRunningError, pid.PidFileAlreadyLockedError): 323 pidfile.create() 324 finally: 325 pidfile.close() 326 assert not os.path.exists(pidfile.filename) 327 328 329@pytest.mark.skipif(sys.platform == "win32", reason="os.getgid() does not exist on windows") 330def test_pid_gid(): 331 gid = os.getgid() 332 with pid.PidFile(gid=gid) as pidfile: 333 pass 334 assert not os.path.exists(pidfile.filename) 335 336 337@pytest.mark.skipif(sys.platform != "win32", reason="only runs on windows") 338def test_pid_gid_win32(): 339 gid = 123 340 with raising(pid.PidFileConfigurationError): 341 with pid.PidFile(gid=gid): 342 pass 343 344 345def test_pid_check_const_empty(): 346 pidfile = pid.PidFile() 347 pidfile.setup() 348 try: 349 with open(pidfile.filename, "w") as f: 350 f.write("\n") 351 assert pidfile.check() == pid.PID_CHECK_EMPTY 352 finally: 353 pidfile.close(cleanup=True) 354 assert not os.path.exists(pidfile.filename) 355 356 357def test_pid_check_const_nofile(): 358 pidfile = pid.PidFile() 359 assert pidfile.check() == pid.PID_CHECK_NOFILE 360 361 362def test_pid_check_const_samepid(): 363 def check_const_samepid(): 364 with pid.PidFile(allow_samepid=True) as pidfile: 365 assert pidfile.check() == pid.PID_CHECK_SAMEPID 366 assert not os.path.exists(pidfile.filename) 367 368 if sys.platform != "win32": 369 check_const_samepid() 370 else: 371 with raising(pid.PidFileConfigurationError): 372 check_const_samepid() 373 374 375def test_pid_check_const_notrunning(): 376 def check_const_notrunning(): 377 with pid.PidFile() as pidfile: 378 with open(pidfile.filename, "w") as f: 379 # hope this does not clash 380 f.write("999999999\n") 381 f.flush() 382 assert pidfile.check() == pid.PID_CHECK_NOTRUNNING 383 assert not os.path.exists(pidfile.filename) 384 385 if sys.platform != "win32": 386 check_const_notrunning() 387 else: 388 with raising_windows_io_error(): 389 check_const_notrunning() 390 391 392def test_pid_check_already_running(): 393 with pid.PidFile() as pidfile: 394 pidfile2 = pid.PidFile() 395 with raising(pid.PidFileAlreadyRunningError): 396 pidfile2.check() 397 assert not os.path.exists(pidfile.filename) 398 399 400def test_pid_check_samepid_with_blocks(): 401 def check_samepid_with_blocks_separate_objects(): 402 with pid.PidFile(allow_samepid=True): 403 with pid.PidFile(allow_samepid=True): 404 pass 405 406 def check_samepid_with_blocks_same_objects(): 407 pidfile = pid.PidFile(allow_samepid=True) 408 with pidfile: 409 with pidfile: 410 pass 411 412 assert not os.path.exists(pidfile.filename) 413 414 if sys.platform != "win32": 415 check_samepid_with_blocks_separate_objects() 416 else: 417 with raising(pid.PidFileConfigurationError): 418 check_samepid_with_blocks_separate_objects() 419 420 if sys.platform != "win32": 421 check_samepid_with_blocks_same_objects() 422 else: 423 with raising(pid.PidFileConfigurationError): 424 check_samepid_with_blocks_same_objects() 425 426 427def test_pid_check_samepid(): 428 def check_samepid(): 429 pidfile = pid.PidFile(allow_samepid=True) 430 431 try: 432 pidfile.create() 433 pidfile.create() 434 finally: 435 pidfile.close() 436 437 assert not os.path.exists(pidfile.filename) 438 439 if sys.platform != "win32": 440 check_samepid() 441 else: 442 with raising(pid.PidFileConfigurationError): 443 check_samepid() 444 445 446@pytest.mark.skipif(sys.platform == "win32", reason="test not supported on win32") 447@patch("os.getpid") 448@patch("os.kill") 449def test_pid_raises_already_running_when_samepid_and_two_different_pids(mock_getpid, mock_kill): 450 pidfile_proc1 = pid.PidFile() 451 pidfile_proc2 = pid.PidFile(allow_samepid=True) 452 453 try: 454 mock_getpid.return_value = 1 455 pidfile_proc1.create() 456 457 mock_getpid.return_value = 2 458 with raising(pid.PidFileAlreadyRunningError): 459 pidfile_proc2.create() 460 461 finally: 462 pidfile_proc1.close() 463 pidfile_proc2.close() 464 465 assert not os.path.exists(pidfile_proc1.filename) 466 assert not os.path.exists(pidfile_proc2.filename) 467 468 469def test_pid_default_term_signal(): 470 signal.signal(signal.SIGTERM, signal.SIG_DFL) 471 472 with pid.PidFile() as pidfile: 473 assert callable(signal.getsignal(signal.SIGTERM)) is True 474 475 assert not os.path.exists(pidfile.filename) 476 477 478def test_pid_ignore_term_signal(): 479 signal.signal(signal.SIGTERM, signal.SIG_IGN) 480 481 with pid.PidFile() as pidfile: 482 assert signal.getsignal(signal.SIGTERM) == signal.SIG_IGN 483 484 assert not os.path.exists(pidfile.filename) 485 486 487def test_pid_custom_term_signal(): 488 def _noop(*args, **kwargs): 489 pass 490 491 signal.signal(signal.SIGTERM, _noop) 492 493 with pid.PidFile() as pidfile: 494 assert signal.getsignal(signal.SIGTERM) == _noop 495 496 assert not os.path.exists(pidfile.filename) 497 498 499# def test_pid_unknown_term_signal(): 500# # Not sure how to properly test this when signal.getsignal returns None 501# # - perhaps by writing a C extension which might get ugly 502# # 503# with pid.PidFile(): 504# assert signal.getsignal(signal.SIGTERM) == None 505 506 507def test_double_close_race_condition(): 508 # https://github.com/trbs/pid/issues/22 509 pidfile1 = pid.PidFile() 510 pidfile2 = pid.PidFile() 511 512 try: 513 pidfile1.create() 514 assert os.path.exists(pidfile1.filename) 515 finally: 516 pidfile1.close() 517 assert not os.path.exists(pidfile1.filename) 518 519 try: 520 pidfile2.create() 521 assert os.path.exists(pidfile2.filename) 522 523 # simulate calling atexit in process of pidfile1 524 pidfile1.close() 525 526 assert os.path.exists(pidfile2.filename) 527 finally: 528 pidfile2.close() 529 530 assert not os.path.exists(pidfile1.filename) 531 assert not os.path.exists(pidfile2.filename) 532 533 534@pytest.mark.skipif(sys.version_info < (3, 2), reason="requires python3.2 or higher") 535def test_pid_contextdecorator(): 536 @pid.PidFile() 537 def test_decorator(): 538 pass 539 540 test_decorator() 541 542 543@pytest.mark.skipif(sys.version_info < (3, 2), reason="requires python3.2 or higher") 544def test_pid_contextdecorator_already_locked(): 545 @pid.PidFile("testpiddecorator") 546 def test_decorator(): 547 with raising(pid.PidFileAlreadyLockedError): 548 @pid.PidFile("testpiddecorator") 549 def test_decorator2(): 550 pass 551 test_decorator2() 552 553 test_decorator() 554 555 556@pytest.mark.skipif(sys.version_info < (3, 5), reason="requires python3.5 or higher") 557@patch("atexit.register", autospec=True) 558def test_register_atexit_false(mock_atexit_register): 559 with pid.PidFile(register_atexit=False): 560 mock_atexit_register.assert_not_called() 561 562 563@patch("atexit.register", autospec=True) 564def test_register_atexit_true(mock_atexit_register): 565 with pid.PidFile(register_atexit=True) as pidfile: 566 mock_atexit_register.assert_called_once_with(pidfile.close) 567