1# Python imports 2# Don't add any from __future__ imports here. This code should execute 3# against standard Python. 4import unittest 5from unittest import skipUnless 6import time 7import signal 8import threading 9 10# Project imports 11import posix_ipc 12# Hack -- add tests directory to sys.path so Python 3 can find base.py. 13import sys 14import os 15sys.path.insert(0, os.path.join(os.getcwd(), 'tests')) 16import base as tests_base # noqa 17 18if hasattr(posix_ipc, 'USER_SIGNAL_MIN'): 19 # Due to Python bug http://bugs.python.org/issue20584, not all valid signal 20 # values can be used. I noticed it on FreeBSD, it might be visible 21 # elsewhere. 22 if posix_ipc.USER_SIGNAL_MIN >= signal.NSIG: 23 SIGNAL_VALUE = signal.SIGHUP 24 else: 25 SIGNAL_VALUE = posix_ipc.USER_SIGNAL_MIN 26else: 27 SIGNAL_VALUE = signal.SIGHUP 28 29signal_handler_value_received = 0 30 31 32def signal_handler(signal_value, frame): 33 """Handle signal sent for msg q notification test.""" 34 global signal_handler_value_received 35 signal_handler_value_received = signal_value 36 37 38def threaded_notification_handler_one_shot(test_case_instance): 39 """Handle msg q notification in a thread without rearming notification.""" 40 test_case_instance.threaded_notification_called = True 41 test_case_instance.notification_event.set() 42 43 44def threaded_notification_handler_rearm(test_case_instance): 45 """Handle msg q notification in a thread and rearm notification.""" 46 # Rearm. 47 param = (threaded_notification_handler_rearm, test_case_instance) 48 test_case_instance.mq.request_notification(param) 49 50 test_case_instance.threaded_notification_called = True 51 test_case_instance.notification_event.set() 52 53 54class MessageQueueTestBase(tests_base.Base): 55 """base class for MessageQueue test classes""" 56 def setUp(self): 57 self.mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX) 58 59 def tearDown(self): 60 if self.mq: 61 self.mq.close() 62 self.mq.unlink() 63 64 def assertWriteToReadOnlyPropertyFails(self, property_name, value): 65 """test that writing to a readonly property raises TypeError""" 66 tests_base.Base.assertWriteToReadOnlyPropertyFails(self, self.mq, property_name, value) 67 68 69@skipUnless(posix_ipc.MESSAGE_QUEUES_SUPPORTED, "Requires MessageQueue support") 70class TestMessageQueueCreation(MessageQueueTestBase): 71 """Exercise stuff related to creating MessageQueue""" 72 def test_no_flags(self): 73 """tests that opening a queue with no flags opens the existing 74 queue and doesn't create a new queue""" 75 mq_copy = posix_ipc.MessageQueue(self.mq.name) 76 self.assertEqual(self.mq.name, mq_copy.name) 77 mq_copy.close() 78 79 def test_o_creat_existing(self): 80 """tests posix_ipc.O_CREAT to open an existing MessageQueue without 81 O_EXCL""" 82 mq_copy = posix_ipc.MessageQueue(self.mq.name, posix_ipc.O_CREAT) 83 self.assertEqual(self.mq.name, mq_copy.name) 84 mq_copy.close() 85 86 def test_o_creat_new(self): 87 """tests posix_ipc.O_CREAT to create a new MessageQueue without 88 O_EXCL""" 89 # I can't pass None for the name unless I also pass O_EXCL. 90 name = tests_base.make_name() 91 92 # Note: this method of finding an unused name is vulnerable to a 93 # race condition. It's good enough for test, but don't copy it for 94 # use in production code! 95 name_is_available = False 96 while not name_is_available: 97 try: 98 mq = posix_ipc.MessageQueue(name) 99 mq.close() 100 except posix_ipc.ExistentialError: 101 name_is_available = True 102 else: 103 name = tests_base.make_name() 104 105 mq = posix_ipc.MessageQueue(name, posix_ipc.O_CREAT) 106 self.assertIsNotNone(mq) 107 mq.close() 108 mq.unlink() 109 110 def test_o_crex(self): 111 """tests O_CREX prevents opening an existing MessageQueue""" 112 self.assertRaises(posix_ipc.ExistentialError, posix_ipc.MessageQueue, 113 self.mq.name, posix_ipc.O_CREX) 114 115 def test_randomly_generated_name(self): 116 """tests that the randomly-generated name works""" 117 # This is tested implicitly elsewhere but I want an explicit test 118 mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX) 119 self.assertIsNotNone(mq.name) 120 121 self.assertEqual(mq.name[0], '/') 122 self.assertGreaterEqual(len(mq.name), 2) 123 mq.close() 124 mq.unlink() 125 126 def test_name_as_bytes(self): 127 """Test that the name can be bytes. 128 129 In Python 2, bytes == str. This test is really only interesting in Python 3. 130 """ 131 if tests_base.IS_PY3: 132 name = bytes(tests_base.make_name(), 'ASCII') 133 else: 134 name = bytes(tests_base.make_name()) 135 mq = posix_ipc.MessageQueue(name, posix_ipc.O_CREX) 136 # No matter what the name is passed as, posix_ipc.name returns the default string type, 137 # i.e. str in Python 2 and unicode in Python 3. 138 if tests_base.IS_PY3: 139 self.assertEqual(name, bytes(mq.name, 'ASCII')) 140 else: 141 self.assertEqual(name, mq.name) 142 mq.unlink() 143 mq.close() 144 145 def test_name_as_unicode(self): 146 """Test that the name can be unicode. 147 148 In Python 3, str == unicode. This test is really only interesting in Python 2. 149 """ 150 if tests_base.IS_PY3: 151 name = tests_base.make_name() 152 else: 153 name = unicode(tests_base.make_name(), 'ASCII') 154 mq = posix_ipc.MessageQueue(name, posix_ipc.O_CREX) 155 self.assertEqual(name, mq.name) 156 mq.unlink() 157 mq.close() 158 159 # don't bother testing mode, it's ignored by the OS? 160 161 def test_max_messages(self): 162 """test that the max_messages param is respected""" 163 mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX, max_messages=1) 164 mq.send('foo') 165 self.assertRaises(posix_ipc.BusyError, mq.send, 'bar', timeout=0) 166 mq.close() 167 mq.unlink() 168 169 def test_max_message_size(self): 170 """test that the max_message_size param is respected""" 171 mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX, 172 max_message_size=10) 173 self.assertRaises(ValueError, mq.send, ' ' * 11) 174 mq.close() 175 mq.unlink() 176 177 def test_read_flag_new_queue(self): 178 """test that the read flag is respected on a new queue""" 179 mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX, read=False) 180 mq.send('foo') 181 self.assertRaises(posix_ipc.PermissionsError, mq.receive) 182 mq.close() 183 mq.unlink() 184 185 def test_read_flag_existing_queue(self): 186 """test that the read flag is respected on an existing queue""" 187 mq = posix_ipc.MessageQueue(self.mq.name, read=False) 188 mq.send('foo') 189 self.assertRaises(posix_ipc.PermissionsError, mq.receive) 190 mq.close() 191 192 def test_write_flag_new_queue(self): 193 """test that the write flag is respected on a new queue""" 194 mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX, write=False) 195 self.assertRaises(posix_ipc.PermissionsError, mq.send, 'foo') 196 mq.close() 197 mq.unlink() 198 199 def test_write_flag_existing_queue(self): 200 """test that the write flag is respected on an existing queue""" 201 mq = posix_ipc.MessageQueue(self.mq.name, write=False) 202 self.assertRaises(posix_ipc.PermissionsError, mq.send, 'foo') 203 mq.close() 204 205 def test_kwargs(self): 206 """ensure init accepts keyword args as advertised""" 207 # mode 0x180 = 0600. Octal is difficult to express in Python 2/3 compatible code. 208 mq = posix_ipc.MessageQueue(None, flags=posix_ipc.O_CREX, mode=0x180, max_messages=1, 209 max_message_size=256, read=True, write=True) 210 mq.close() 211 mq.unlink() 212 213 214@skipUnless(posix_ipc.MESSAGE_QUEUES_SUPPORTED, "Requires MessageQueue support") 215class TestMessageQueueSendReceive(MessageQueueTestBase): 216 """Exercise send() and receive()""" 217 def test_send(self): 218 """Test that simple send works. 219 220 It's already tested elsewhere implicitly, but I want an explicit 221 test. 222 """ 223 self.mq.send('foo') 224 225 # FIXME how to test that send with timeout=None waits as expected? 226 227 def test_send_timeout_keyword(self): 228 """Test that the timeout keyword of send works""" 229 n_msgs = self.mq.max_messages 230 while n_msgs: 231 self.mq.send(' ') 232 n_msgs -= 1 233 234 self.assertRaises(posix_ipc.BusyError, self.mq.send, 'foo', timeout=0) 235 236 def test_send_timeout_positional(self): 237 """Test that the timeout positional param of send works""" 238 n_msgs = self.mq.max_messages 239 while n_msgs: 240 self.mq.send(' ') 241 n_msgs -= 1 242 243 self.assertRaises(posix_ipc.BusyError, self.mq.send, 'foo', 0) 244 245 def test_send_timeout_zero_success(self): 246 """Test that send w/a timeout=0 succeeds if queue is not full.""" 247 self.mq.send('foo', timeout=0) 248 249 def test_send_timeout_zero_fails(self): 250 """Test that send w/a timeout=0 raises BusyError if queue is 251 full. 252 """ 253 n_msgs = self.mq.max_messages 254 while n_msgs: 255 self.mq.send(' ') 256 n_msgs -= 1 257 258 self.assertRaises(posix_ipc.BusyError, self.mq.send, 'foo', 259 timeout=0) 260 261 def test_send_nonzero_timeout(self): 262 """Test that a non-zero timeout to send is respected.""" 263 n_msgs = self.mq.max_messages 264 while n_msgs: 265 self.mq.send(' ') 266 n_msgs -= 1 267 268 start = time.time() 269 self.assertRaises(posix_ipc.BusyError, self.mq.send, ' ', 270 timeout=1.0) 271 elapsed = time.time() - start 272 # I don't insist on extreme precision. 273 self.assertTrue(elapsed >= 1.0) 274 self.assertTrue(elapsed < 1.5) 275 276 def test_send_priority_default(self): 277 """Test that the send priority defaults to 0""" 278 self.mq.send('foo') 279 self.assertEqual(self.mq.receive(), ('foo'.encode(), 0)) 280 281 def test_send_fifo_default(self): 282 """Test that the queue order is FIFO by default""" 283 alphabet = 'abcdefg' 284 for c in alphabet: 285 self.mq.send(c) 286 287 for c in alphabet: 288 self.assertEqual(self.mq.receive(), (c.encode(), 0)) 289 290 def test_send_priority(self): 291 """Test that the priority param is respected""" 292 # By simple FIFO, these would be returned lowest, highest, middle. 293 # Instead they'll be returned highest, middle, lowest 294 self.mq.send('lowest', priority=1) 295 self.mq.send('highest', priority=3) 296 self.mq.send('middle', priority=2) 297 298 self.assertEqual(self.mq.receive(), ('highest'.encode(), 3)) 299 self.assertEqual(self.mq.receive(), ('middle'.encode(), 2)) 300 self.assertEqual(self.mq.receive(), ('lowest'.encode(), 1)) 301 302 def test_send_priority_keyword(self): 303 """Test that the priority keyword of send works""" 304 self.mq.send('foo', priority=42) 305 self.assertEqual(self.mq.receive(), ('foo'.encode(), 42)) 306 307 def test_send_priority_positional(self): 308 """Test that the priority positional param of send works""" 309 self.mq.send('foo', 0, 42) 310 self.assertEqual(self.mq.receive(), ('foo'.encode(), 42)) 311 312 def test_send_kwargs(self): 313 """ensure send() accepts keyword args as advertised""" 314 self.mq.send('foo', timeout=0, priority=0) 315 316 # ##### test receive() 317 318 def test_receive(self): 319 """Test that simple receive works. 320 321 It's already tested elsewhere implicitly, but I want an explicit 322 test. 323 """ 324 self.mq.send('foo', priority=3) 325 326 self.assertEqual(self.mq.receive(), ('foo'.encode(), 3)) 327 328 def test_receive_timeout_positional(self): 329 """Test that the timeout positional param of receive works.""" 330 self.assertRaises(posix_ipc.BusyError, self.mq.receive, 0) 331 332 def test_receive_nonzero_timeout(self): 333 """Test that a non-zero timeout to receive is respected.""" 334 start = time.time() 335 self.assertRaises(posix_ipc.BusyError, self.mq.receive, 1.0) 336 elapsed = time.time() - start 337 # I don't insist on extreme precision. 338 self.assertTrue(elapsed >= 1.0) 339 self.assertTrue(elapsed < 1.5) 340 341 # FIXME how to test that timeout=None waits forever? 342 343 344@skipUnless(posix_ipc.MESSAGE_QUEUES_SUPPORTED, "Requires MessageQueue support") 345class TestMessageQueueNotification(MessageQueueTestBase): 346 """exercise request_notification()""" 347 def test_request_notification_signal(self): 348 """Exercise notification by signal""" 349 global someone_rang_the_doorbell 350 351 self.mq.request_notification(SIGNAL_VALUE) 352 353 signal.signal(SIGNAL_VALUE, signal_handler) 354 355 someone_rang_the_doorbell = False 356 357 self.mq.send('') 358 359 self.assertEqual(signal_handler_value_received, SIGNAL_VALUE) 360 361 def test_request_notification_signal_one_shot(self): 362 """Test that after notification by signal, notification is 363 cancelled""" 364 global signal_handler_value_received 365 366 self.mq.request_notification(SIGNAL_VALUE) 367 368 signal.signal(SIGNAL_VALUE, signal_handler) 369 370 signal_handler_value_received = 0 371 372 self.mq.send('') 373 374 self.assertEqual(signal_handler_value_received, SIGNAL_VALUE) 375 376 # Reset the global flag 377 signal_handler_value_received = 0 378 379 self.mq.send('') 380 381 # Flag should not be set because it's only supposed to fire 382 # notification when the queue changes from empty to non-empty, 383 # and there was already 1 msg in the Q when I called send() above. 384 self.assertEqual(signal_handler_value_received, 0) 385 386 # empty the queue 387 self.mq.receive() 388 self.mq.receive() 389 390 self.assertEqual(signal_handler_value_received, 0) 391 392 self.mq.send('') 393 394 # Flag should still not be set because notification was cancelled 395 # after it fired the first time. 396 self.assertEqual(signal_handler_value_received, 0) 397 398 def test_request_notification_cancel_default(self): 399 """Test that notification can be cancelled with the default param""" 400 global signal_handler_value_received 401 402 self.mq.request_notification(SIGNAL_VALUE) 403 404 signal.signal(SIGNAL_VALUE, signal_handler) 405 406 signal_handler_value_received = 0 407 408 # Cancel notification 409 self.mq.request_notification() 410 411 self.mq.send('') 412 413 self.assertEqual(signal_handler_value_received, 0) 414 415 def test_request_notification_cancel_multiple(self): 416 """Test that notification can be cancelled multiple times""" 417 self.mq.request_notification(SIGNAL_VALUE) 418 419 # Cancel notification lots of times 420 for i in range(1000): 421 self.mq.request_notification() 422 423 def test_request_notification_threaded_one_shot(self): 424 """Test simple threaded notification""" 425 426 self.threaded_notification_called = False 427 428 param = (threaded_notification_handler_one_shot, self) 429 self.mq.request_notification(param) 430 431 self.notification_event = threading.Event() 432 433 self.mq.send('') 434 435 # I have to wait on a shared event to ensure that the notification 436 # handler's thread gets a chance to execute before I make my 437 # assertion. 438 self.notification_event.wait(5) 439 440 self.assertTrue(self.threaded_notification_called) 441 442 def test_request_notification_threaded_rearm(self): 443 """Test threaded notification in which the notified thread rearms 444 the notification""" 445 446 self.threaded_notification_called = False 447 448 param = (threaded_notification_handler_rearm, self) 449 self.mq.request_notification(param) 450 451 self.notification_event = threading.Event() 452 453 # Re-arm several times. 454 for i in range(10): 455 self.mq.send('') 456 457 # I have to wait on a shared event to ensure that the 458 # notification handler's thread gets a chance to execute before 459 # I make my assertion. 460 self.notification_event.wait(5) 461 462 self.assertTrue(self.threaded_notification_called) 463 464 self.mq.receive() 465 466 self.notification_event.clear() 467 468 469@skipUnless(posix_ipc.MESSAGE_QUEUES_SUPPORTED, "Requires MessageQueue support") 470class TestMessageQueueDestruction(MessageQueueTestBase): 471 """exercise close() and unlink()""" 472 473 def test_close_and_unlink(self): 474 """tests that mq.close() and mq.unlink() work""" 475 # mq.close() is hard to test since subsequent use of the semaphore 476 # after mq.close() is undefined. All I can think of to do is call it 477 # and note that it does not fail. Also, it allows mq.unlink() to 478 # tell the OS to delete the semaphore entirely, so it makes sense 479 # to test them together, 480 self.mq.unlink() 481 self.mq.close() 482 self.assertRaises(posix_ipc.ExistentialError, posix_ipc.MessageQueue, 483 self.mq.name) 484 485 # Wipe this out so that self.tearDown() doesn't crash. 486 self.mq = None 487 488 489@skipUnless(posix_ipc.MESSAGE_QUEUES_SUPPORTED, "Requires MessageQueue support") 490class TestMessageQueuePropertiesAndAttributes(MessageQueueTestBase): 491 """Exercise props and attrs""" 492 def test_property_name(self): 493 """exercise MessageQueue.name""" 494 self.assertGreaterEqual(len(self.mq.name), 2) 495 496 self.assertEqual(self.mq.name[0], '/') 497 498 self.assertWriteToReadOnlyPropertyFails('name', 'hello world') 499 500 def test_property_mqd(self): 501 """exercise MessageQueue.mqd""" 502 self.assertNotEqual(self.mq.mqd, -1) 503 504 # The mqd is of type mqd_t. I can't find doc that states what this 505 # type is. All I know is that -1 is an error so it's probably 506 # int-ish, but I can't tell exactly what to expect. 507 self.assertWriteToReadOnlyPropertyFails('mqd', 42) 508 509 def test_property_max_messages(self): 510 """exercise MessageQueue.max_messages""" 511 self.assertGreaterEqual(self.mq.max_messages, 0) 512 513 self.assertWriteToReadOnlyPropertyFails('max_messages', 42) 514 515 def test_property_max_message_size(self): 516 """exercise MessageQueue.max_message_size""" 517 self.assertGreaterEqual(self.mq.max_message_size, 0) 518 519 self.assertWriteToReadOnlyPropertyFails('max_message_size', 42) 520 521 def test_property_current_messages(self): 522 """exercise MessageQueue.current_messages""" 523 self.assertEqual(self.mq.current_messages, 0) 524 525 self.mq.send('') 526 self.assertEqual(self.mq.current_messages, 1) 527 self.mq.send('') 528 self.mq.send('') 529 self.mq.send('') 530 self.assertEqual(self.mq.current_messages, 4) 531 self.mq.receive() 532 self.assertEqual(self.mq.current_messages, 3) 533 self.mq.receive() 534 self.assertEqual(self.mq.current_messages, 2) 535 self.mq.receive() 536 self.assertEqual(self.mq.current_messages, 1) 537 self.mq.receive() 538 self.assertEqual(self.mq.current_messages, 0) 539 540 self.assertWriteToReadOnlyPropertyFails('current_messages', 42) 541 542 def test_block_flag_default_value_and_writability(self): 543 """test that the block flag is True by default and can be changed""" 544 mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX) 545 546 self.assertTrue(mq.block) 547 548 # Toggle. I expect no errors. It's good to test this on a queue 549 # that's only used for this particular test. I would rather have 550 # all the other tests execute using the default value of block 551 # on self.mq. 552 mq.block = False 553 mq.block = True 554 555 mq.close() 556 mq.unlink() 557 558 def test_block_flag_false(self): 559 """test blocking behavior when flag is false""" 560 mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX, max_messages=3) 561 562 mq.block = False 563 564 # Queue is empty, so receive() should immediately raise BusyError 565 start = time.time() 566 self.assertRaises(posix_ipc.BusyError, mq.receive, 10) 567 elapsed = time.time() - start 568 569 # Not only should receive() have raised BusyError, it should have 570 # done so "immediately". I don't insist on extreme precision since 571 # OS-level task switching might mean that elapsed time is not 572 # vanishingly small as one might expect under most circumstances. 573 self.assertTrue(elapsed < 1.0) 574 575 # Now test send() the same way. 576 mq.send(' ') 577 mq.send(' ') 578 mq.send(' ') 579 580 # Queue is now full. 581 start = time.time() 582 self.assertRaises(posix_ipc.BusyError, mq.send, ' ', 10) 583 elapsed = time.time() - start 584 self.assertTrue(elapsed < 1.0) 585 586 mq.close() 587 mq.unlink() 588 589 590if __name__ == '__main__': 591 unittest.main() 592