1# Python imports
2# Don't add any from __future__ imports here. This code should execute
3# against standard Python.
4import unittest
5from unittest import skipUnless
6import time
7import signal
8import threading
9
10# Project imports
11import posix_ipc
12# Hack -- add tests directory to sys.path so Python 3 can find base.py.
13import sys
14import os
15sys.path.insert(0, os.path.join(os.getcwd(), 'tests'))
16import base as tests_base  # noqa
17
18if hasattr(posix_ipc, 'USER_SIGNAL_MIN'):
19    # Due to Python bug http://bugs.python.org/issue20584, not all valid signal
20    # values can be used. I noticed it on FreeBSD, it might be visible
21    # elsewhere.
22    if posix_ipc.USER_SIGNAL_MIN >= signal.NSIG:
23        SIGNAL_VALUE = signal.SIGHUP
24    else:
25        SIGNAL_VALUE = posix_ipc.USER_SIGNAL_MIN
26else:
27    SIGNAL_VALUE = signal.SIGHUP
28
29signal_handler_value_received = 0
30
31
32def signal_handler(signal_value, frame):
33    """Handle signal sent for msg q notification test."""
34    global signal_handler_value_received
35    signal_handler_value_received = signal_value
36
37
38def threaded_notification_handler_one_shot(test_case_instance):
39    """Handle msg q notification in a thread without rearming notification."""
40    test_case_instance.threaded_notification_called = True
41    test_case_instance.notification_event.set()
42
43
44def threaded_notification_handler_rearm(test_case_instance):
45    """Handle msg q notification in a thread and rearm notification."""
46    # Rearm.
47    param = (threaded_notification_handler_rearm, test_case_instance)
48    test_case_instance.mq.request_notification(param)
49
50    test_case_instance.threaded_notification_called = True
51    test_case_instance.notification_event.set()
52
53
54class MessageQueueTestBase(tests_base.Base):
55    """base class for MessageQueue test classes"""
56    def setUp(self):
57        self.mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX)
58
59    def tearDown(self):
60        if self.mq:
61            self.mq.close()
62            self.mq.unlink()
63
64    def assertWriteToReadOnlyPropertyFails(self, property_name, value):
65        """test that writing to a readonly property raises TypeError"""
66        tests_base.Base.assertWriteToReadOnlyPropertyFails(self, self.mq, property_name, value)
67
68
69@skipUnless(posix_ipc.MESSAGE_QUEUES_SUPPORTED, "Requires MessageQueue support")
70class TestMessageQueueCreation(MessageQueueTestBase):
71    """Exercise stuff related to creating MessageQueue"""
72    def test_no_flags(self):
73        """tests that opening a queue with no flags opens the existing
74        queue and doesn't create a new queue"""
75        mq_copy = posix_ipc.MessageQueue(self.mq.name)
76        self.assertEqual(self.mq.name, mq_copy.name)
77        mq_copy.close()
78
79    def test_o_creat_existing(self):
80        """tests posix_ipc.O_CREAT to open an existing MessageQueue without
81        O_EXCL"""
82        mq_copy = posix_ipc.MessageQueue(self.mq.name, posix_ipc.O_CREAT)
83        self.assertEqual(self.mq.name, mq_copy.name)
84        mq_copy.close()
85
86    def test_o_creat_new(self):
87        """tests posix_ipc.O_CREAT to create a new MessageQueue without
88        O_EXCL"""
89        # I can't pass None for the name unless I also pass O_EXCL.
90        name = tests_base.make_name()
91
92        # Note: this method of finding an unused name is vulnerable to a
93        # race condition. It's good enough for test, but don't copy it for
94        # use in production code!
95        name_is_available = False
96        while not name_is_available:
97            try:
98                mq = posix_ipc.MessageQueue(name)
99                mq.close()
100            except posix_ipc.ExistentialError:
101                name_is_available = True
102            else:
103                name = tests_base.make_name()
104
105        mq = posix_ipc.MessageQueue(name, posix_ipc.O_CREAT)
106        self.assertIsNotNone(mq)
107        mq.close()
108        mq.unlink()
109
110    def test_o_crex(self):
111        """tests O_CREX prevents opening an existing MessageQueue"""
112        self.assertRaises(posix_ipc.ExistentialError, posix_ipc.MessageQueue,
113                          self.mq.name, posix_ipc.O_CREX)
114
115    def test_randomly_generated_name(self):
116        """tests that the randomly-generated name works"""
117        # This is tested implicitly elsewhere but I want an explicit test
118        mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX)
119        self.assertIsNotNone(mq.name)
120
121        self.assertEqual(mq.name[0], '/')
122        self.assertGreaterEqual(len(mq.name), 2)
123        mq.close()
124        mq.unlink()
125
126    def test_name_as_bytes(self):
127        """Test that the name can be bytes.
128
129        In Python 2, bytes == str. This test is really only interesting in Python 3.
130        """
131        if tests_base.IS_PY3:
132            name = bytes(tests_base.make_name(), 'ASCII')
133        else:
134            name = bytes(tests_base.make_name())
135        mq = posix_ipc.MessageQueue(name, posix_ipc.O_CREX)
136        # No matter what the name is passed as, posix_ipc.name returns the default string type,
137        # i.e. str in Python 2 and unicode in Python 3.
138        if tests_base.IS_PY3:
139            self.assertEqual(name, bytes(mq.name, 'ASCII'))
140        else:
141            self.assertEqual(name, mq.name)
142        mq.unlink()
143        mq.close()
144
145    def test_name_as_unicode(self):
146        """Test that the name can be unicode.
147
148        In Python 3, str == unicode. This test is really only interesting in Python 2.
149        """
150        if tests_base.IS_PY3:
151            name = tests_base.make_name()
152        else:
153            name = unicode(tests_base.make_name(), 'ASCII')
154        mq = posix_ipc.MessageQueue(name, posix_ipc.O_CREX)
155        self.assertEqual(name, mq.name)
156        mq.unlink()
157        mq.close()
158
159    # don't bother testing mode, it's ignored by the OS?
160
161    def test_max_messages(self):
162        """test that the max_messages param is respected"""
163        mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX, max_messages=1)
164        mq.send('foo')
165        self.assertRaises(posix_ipc.BusyError, mq.send, 'bar', timeout=0)
166        mq.close()
167        mq.unlink()
168
169    def test_max_message_size(self):
170        """test that the max_message_size param is respected"""
171        mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX,
172                                    max_message_size=10)
173        self.assertRaises(ValueError, mq.send, ' ' * 11)
174        mq.close()
175        mq.unlink()
176
177    def test_read_flag_new_queue(self):
178        """test that the read flag is respected on a new queue"""
179        mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX, read=False)
180        mq.send('foo')
181        self.assertRaises(posix_ipc.PermissionsError, mq.receive)
182        mq.close()
183        mq.unlink()
184
185    def test_read_flag_existing_queue(self):
186        """test that the read flag is respected on an existing queue"""
187        mq = posix_ipc.MessageQueue(self.mq.name, read=False)
188        mq.send('foo')
189        self.assertRaises(posix_ipc.PermissionsError, mq.receive)
190        mq.close()
191
192    def test_write_flag_new_queue(self):
193        """test that the write flag is respected on a new queue"""
194        mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX, write=False)
195        self.assertRaises(posix_ipc.PermissionsError, mq.send, 'foo')
196        mq.close()
197        mq.unlink()
198
199    def test_write_flag_existing_queue(self):
200        """test that the write flag is respected on an existing queue"""
201        mq = posix_ipc.MessageQueue(self.mq.name, write=False)
202        self.assertRaises(posix_ipc.PermissionsError, mq.send, 'foo')
203        mq.close()
204
205    def test_kwargs(self):
206        """ensure init accepts keyword args as advertised"""
207        # mode 0x180 = 0600. Octal is difficult to express in Python 2/3 compatible code.
208        mq = posix_ipc.MessageQueue(None, flags=posix_ipc.O_CREX, mode=0x180, max_messages=1,
209                                    max_message_size=256, read=True, write=True)
210        mq.close()
211        mq.unlink()
212
213
214@skipUnless(posix_ipc.MESSAGE_QUEUES_SUPPORTED, "Requires MessageQueue support")
215class TestMessageQueueSendReceive(MessageQueueTestBase):
216    """Exercise send() and receive()"""
217    def test_send(self):
218        """Test that simple send works.
219
220        It's already tested elsewhere implicitly, but I want an explicit
221        test.
222        """
223        self.mq.send('foo')
224
225    # FIXME how to test that send with timeout=None waits as expected?
226
227    def test_send_timeout_keyword(self):
228        """Test that the timeout keyword of send works"""
229        n_msgs = self.mq.max_messages
230        while n_msgs:
231            self.mq.send(' ')
232            n_msgs -= 1
233
234        self.assertRaises(posix_ipc.BusyError, self.mq.send, 'foo', timeout=0)
235
236    def test_send_timeout_positional(self):
237        """Test that the timeout positional param of send works"""
238        n_msgs = self.mq.max_messages
239        while n_msgs:
240            self.mq.send(' ')
241            n_msgs -= 1
242
243        self.assertRaises(posix_ipc.BusyError, self.mq.send, 'foo', 0)
244
245    def test_send_timeout_zero_success(self):
246        """Test that send w/a timeout=0 succeeds if queue is not full."""
247        self.mq.send('foo', timeout=0)
248
249    def test_send_timeout_zero_fails(self):
250        """Test that send w/a timeout=0 raises BusyError if queue is
251        full.
252        """
253        n_msgs = self.mq.max_messages
254        while n_msgs:
255            self.mq.send(' ')
256            n_msgs -= 1
257
258        self.assertRaises(posix_ipc.BusyError, self.mq.send, 'foo',
259                          timeout=0)
260
261    def test_send_nonzero_timeout(self):
262        """Test that a non-zero timeout to send is respected."""
263        n_msgs = self.mq.max_messages
264        while n_msgs:
265            self.mq.send(' ')
266            n_msgs -= 1
267
268        start = time.time()
269        self.assertRaises(posix_ipc.BusyError, self.mq.send, ' ',
270                          timeout=1.0)
271        elapsed = time.time() - start
272        # I don't insist on extreme precision.
273        self.assertTrue(elapsed >= 1.0)
274        self.assertTrue(elapsed < 1.5)
275
276    def test_send_priority_default(self):
277        """Test that the send priority defaults to 0"""
278        self.mq.send('foo')
279        self.assertEqual(self.mq.receive(), ('foo'.encode(), 0))
280
281    def test_send_fifo_default(self):
282        """Test that the queue order is FIFO by default"""
283        alphabet = 'abcdefg'
284        for c in alphabet:
285            self.mq.send(c)
286
287        for c in alphabet:
288            self.assertEqual(self.mq.receive(), (c.encode(), 0))
289
290    def test_send_priority(self):
291        """Test that the priority param is respected"""
292        # By simple FIFO, these would be returned lowest, highest, middle.
293        # Instead they'll be returned highest, middle, lowest
294        self.mq.send('lowest', priority=1)
295        self.mq.send('highest', priority=3)
296        self.mq.send('middle', priority=2)
297
298        self.assertEqual(self.mq.receive(), ('highest'.encode(), 3))
299        self.assertEqual(self.mq.receive(), ('middle'.encode(), 2))
300        self.assertEqual(self.mq.receive(), ('lowest'.encode(), 1))
301
302    def test_send_priority_keyword(self):
303        """Test that the priority keyword of send works"""
304        self.mq.send('foo', priority=42)
305        self.assertEqual(self.mq.receive(), ('foo'.encode(), 42))
306
307    def test_send_priority_positional(self):
308        """Test that the priority positional param of send works"""
309        self.mq.send('foo', 0, 42)
310        self.assertEqual(self.mq.receive(), ('foo'.encode(), 42))
311
312    def test_send_kwargs(self):
313        """ensure send() accepts keyword args as advertised"""
314        self.mq.send('foo', timeout=0, priority=0)
315
316    # ##### test receive()
317
318    def test_receive(self):
319        """Test that simple receive works.
320
321        It's already tested elsewhere implicitly, but I want an explicit
322        test.
323        """
324        self.mq.send('foo', priority=3)
325
326        self.assertEqual(self.mq.receive(), ('foo'.encode(), 3))
327
328    def test_receive_timeout_positional(self):
329        """Test that the timeout positional param of receive works."""
330        self.assertRaises(posix_ipc.BusyError, self.mq.receive, 0)
331
332    def test_receive_nonzero_timeout(self):
333        """Test that a non-zero timeout to receive is respected."""
334        start = time.time()
335        self.assertRaises(posix_ipc.BusyError, self.mq.receive, 1.0)
336        elapsed = time.time() - start
337        # I don't insist on extreme precision.
338        self.assertTrue(elapsed >= 1.0)
339        self.assertTrue(elapsed < 1.5)
340
341    # FIXME how to test that timeout=None waits forever?
342
343
344@skipUnless(posix_ipc.MESSAGE_QUEUES_SUPPORTED, "Requires MessageQueue support")
345class TestMessageQueueNotification(MessageQueueTestBase):
346    """exercise request_notification()"""
347    def test_request_notification_signal(self):
348        """Exercise notification by signal"""
349        global someone_rang_the_doorbell
350
351        self.mq.request_notification(SIGNAL_VALUE)
352
353        signal.signal(SIGNAL_VALUE, signal_handler)
354
355        someone_rang_the_doorbell = False
356
357        self.mq.send('')
358
359        self.assertEqual(signal_handler_value_received, SIGNAL_VALUE)
360
361    def test_request_notification_signal_one_shot(self):
362        """Test that after notification by signal, notification is
363        cancelled"""
364        global signal_handler_value_received
365
366        self.mq.request_notification(SIGNAL_VALUE)
367
368        signal.signal(SIGNAL_VALUE, signal_handler)
369
370        signal_handler_value_received = 0
371
372        self.mq.send('')
373
374        self.assertEqual(signal_handler_value_received, SIGNAL_VALUE)
375
376        # Reset the global flag
377        signal_handler_value_received = 0
378
379        self.mq.send('')
380
381        # Flag should not be set because it's only supposed to fire
382        # notification when the queue changes from empty to non-empty,
383        # and there was already 1 msg in the Q when I called send() above.
384        self.assertEqual(signal_handler_value_received, 0)
385
386        # empty the queue
387        self.mq.receive()
388        self.mq.receive()
389
390        self.assertEqual(signal_handler_value_received, 0)
391
392        self.mq.send('')
393
394        # Flag should still not be set because notification was cancelled
395        # after it fired the first time.
396        self.assertEqual(signal_handler_value_received, 0)
397
398    def test_request_notification_cancel_default(self):
399        """Test that notification can be cancelled with the default param"""
400        global signal_handler_value_received
401
402        self.mq.request_notification(SIGNAL_VALUE)
403
404        signal.signal(SIGNAL_VALUE, signal_handler)
405
406        signal_handler_value_received = 0
407
408        # Cancel notification
409        self.mq.request_notification()
410
411        self.mq.send('')
412
413        self.assertEqual(signal_handler_value_received, 0)
414
415    def test_request_notification_cancel_multiple(self):
416        """Test that notification can be cancelled multiple times"""
417        self.mq.request_notification(SIGNAL_VALUE)
418
419        # Cancel notification lots of times
420        for i in range(1000):
421            self.mq.request_notification()
422
423    def test_request_notification_threaded_one_shot(self):
424        """Test simple threaded notification"""
425
426        self.threaded_notification_called = False
427
428        param = (threaded_notification_handler_one_shot, self)
429        self.mq.request_notification(param)
430
431        self.notification_event = threading.Event()
432
433        self.mq.send('')
434
435        # I have to wait on a shared event to ensure that the notification
436        # handler's thread gets a chance to execute before I make my
437        # assertion.
438        self.notification_event.wait(5)
439
440        self.assertTrue(self.threaded_notification_called)
441
442    def test_request_notification_threaded_rearm(self):
443        """Test threaded notification in which the notified thread rearms
444        the notification"""
445
446        self.threaded_notification_called = False
447
448        param = (threaded_notification_handler_rearm, self)
449        self.mq.request_notification(param)
450
451        self.notification_event = threading.Event()
452
453        # Re-arm several times.
454        for i in range(10):
455            self.mq.send('')
456
457            # I have to wait on a shared event to ensure that the
458            # notification handler's thread gets a chance to execute before
459            # I make my assertion.
460            self.notification_event.wait(5)
461
462            self.assertTrue(self.threaded_notification_called)
463
464            self.mq.receive()
465
466            self.notification_event.clear()
467
468
469@skipUnless(posix_ipc.MESSAGE_QUEUES_SUPPORTED, "Requires MessageQueue support")
470class TestMessageQueueDestruction(MessageQueueTestBase):
471    """exercise close() and unlink()"""
472
473    def test_close_and_unlink(self):
474        """tests that mq.close() and mq.unlink() work"""
475        # mq.close() is hard to test since subsequent use of the semaphore
476        # after mq.close() is undefined. All I can think of to do is call it
477        # and note that it does not fail. Also, it allows mq.unlink() to
478        # tell the OS to delete the semaphore entirely, so it makes sense
479        # to test them together,
480        self.mq.unlink()
481        self.mq.close()
482        self.assertRaises(posix_ipc.ExistentialError, posix_ipc.MessageQueue,
483                          self.mq.name)
484
485        # Wipe this out so that self.tearDown() doesn't crash.
486        self.mq = None
487
488
489@skipUnless(posix_ipc.MESSAGE_QUEUES_SUPPORTED, "Requires MessageQueue support")
490class TestMessageQueuePropertiesAndAttributes(MessageQueueTestBase):
491    """Exercise props and attrs"""
492    def test_property_name(self):
493        """exercise MessageQueue.name"""
494        self.assertGreaterEqual(len(self.mq.name), 2)
495
496        self.assertEqual(self.mq.name[0], '/')
497
498        self.assertWriteToReadOnlyPropertyFails('name', 'hello world')
499
500    def test_property_mqd(self):
501        """exercise MessageQueue.mqd"""
502        self.assertNotEqual(self.mq.mqd, -1)
503
504        # The mqd is of type mqd_t. I can't find doc that states what this
505        # type is. All I know is that -1 is an error so it's probably
506        # int-ish, but I can't tell exactly what to expect.
507        self.assertWriteToReadOnlyPropertyFails('mqd', 42)
508
509    def test_property_max_messages(self):
510        """exercise MessageQueue.max_messages"""
511        self.assertGreaterEqual(self.mq.max_messages, 0)
512
513        self.assertWriteToReadOnlyPropertyFails('max_messages', 42)
514
515    def test_property_max_message_size(self):
516        """exercise MessageQueue.max_message_size"""
517        self.assertGreaterEqual(self.mq.max_message_size, 0)
518
519        self.assertWriteToReadOnlyPropertyFails('max_message_size', 42)
520
521    def test_property_current_messages(self):
522        """exercise MessageQueue.current_messages"""
523        self.assertEqual(self.mq.current_messages, 0)
524
525        self.mq.send('')
526        self.assertEqual(self.mq.current_messages, 1)
527        self.mq.send('')
528        self.mq.send('')
529        self.mq.send('')
530        self.assertEqual(self.mq.current_messages, 4)
531        self.mq.receive()
532        self.assertEqual(self.mq.current_messages, 3)
533        self.mq.receive()
534        self.assertEqual(self.mq.current_messages, 2)
535        self.mq.receive()
536        self.assertEqual(self.mq.current_messages, 1)
537        self.mq.receive()
538        self.assertEqual(self.mq.current_messages, 0)
539
540        self.assertWriteToReadOnlyPropertyFails('current_messages', 42)
541
542    def test_block_flag_default_value_and_writability(self):
543        """test that the block flag is True by default and can be changed"""
544        mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX)
545
546        self.assertTrue(mq.block)
547
548        # Toggle. I expect no errors. It's good to test this on a queue
549        # that's only used for this particular test. I would rather have
550        # all the other tests execute using the default value of block
551        # on self.mq.
552        mq.block = False
553        mq.block = True
554
555        mq.close()
556        mq.unlink()
557
558    def test_block_flag_false(self):
559        """test blocking behavior when flag is false"""
560        mq = posix_ipc.MessageQueue(None, posix_ipc.O_CREX, max_messages=3)
561
562        mq.block = False
563
564        # Queue is empty, so receive() should immediately raise BusyError
565        start = time.time()
566        self.assertRaises(posix_ipc.BusyError, mq.receive, 10)
567        elapsed = time.time() - start
568
569        # Not only should receive() have raised BusyError, it should have
570        # done so "immediately". I don't insist on extreme precision since
571        # OS-level task switching might mean that elapsed time is not
572        # vanishingly small as one might expect under most circumstances.
573        self.assertTrue(elapsed < 1.0)
574
575        # Now test send() the same way.
576        mq.send(' ')
577        mq.send(' ')
578        mq.send(' ')
579
580        # Queue is now full.
581        start = time.time()
582        self.assertRaises(posix_ipc.BusyError, mq.send, ' ', 10)
583        elapsed = time.time() - start
584        self.assertTrue(elapsed < 1.0)
585
586        mq.close()
587        mq.unlink()
588
589
590if __name__ == '__main__':
591    unittest.main()
592