1#!/usr/bin/env python
2#
3# This file is part of pySerial - Cross platform serial port support for Python
4# (C) 2016 Chris Liechti <cliechti@gmx.net>
5#
6# SPDX-License-Identifier:    BSD-3-Clause
7"""
8Test cancel functionality.
9"""
10import sys
11import unittest
12import threading
13import time
14import serial
15
16# on which port should the tests be performed:
17PORT = 'loop://'
18
19
20@unittest.skipIf(not hasattr(serial.Serial, 'cancel_read'), "cancel_read not supported on platform")
21class TestCancelRead(unittest.TestCase):
22    """Test cancel_read functionality"""
23
24    def setUp(self):
25        # create a closed serial port
26        self.s = serial.serial_for_url(PORT)
27        self.assertTrue(hasattr(self.s, 'cancel_read'), "serial instance has no cancel_read")
28        self.s.timeout = 10
29        self.cancel_called = 0
30
31    def tearDown(self):
32        self.s.reset_output_buffer()
33        self.s.close()
34
35    def _cancel(self, num_times):
36        for i in range(num_times):
37            #~ print "cancel"
38            self.cancel_called += 1
39            self.s.cancel_read()
40
41    def test_cancel_once(self):
42        """Cancel read"""
43        threading.Timer(1, self._cancel, ((1,))).start()
44        t1 = time.time()
45        self.s.read(1000)
46        t2 = time.time()
47        self.assertEqual(self.cancel_called, 1)
48        self.assertTrue(0.5 < (t2 - t1) < 2.5, 'Function did not return in time: {}'.format(t2 - t1))
49        #~ self.assertTrue(not self.s.isOpen())
50        #~ self.assertRaises(serial.SerialException, self.s.open)
51
52    #~ def test_cancel_before_read(self):
53        #~ self.s.cancel_read()
54        #~ self.s.read()
55
56
57DATA = b'#' * 1024
58
59
60@unittest.skipIf(not hasattr(serial.Serial, 'cancel_write'), "cancel_read not supported on platform")
61class TestCancelWrite(unittest.TestCase):
62    """Test cancel_write functionality"""
63
64    def setUp(self):
65        # create a closed serial port
66        self.s = serial.serial_for_url(PORT, baudrate=300)  # extra slow ~30B/s => 1kb ~ 34s
67        self.assertTrue(hasattr(self.s, 'cancel_write'), "serial instance has no cancel_write")
68        self.s.write_timeout = 10
69        self.cancel_called = 0
70
71    def tearDown(self):
72        self.s.reset_output_buffer()
73        # not all USB-Serial adapters will actually flush the output (maybe
74        # keeping the buffer in the MCU in the adapter) therefore, speed up by
75        # changing the baudrate
76        self.s.baudrate = 115200
77        self.s.flush()
78        self.s.close()
79
80    def _cancel(self, num_times):
81        for i in range(num_times):
82            self.cancel_called += 1
83            self.s.cancel_write()
84
85    def test_cancel_once(self):
86        """Cancel write"""
87        threading.Timer(1, self._cancel, ((1,))).start()
88        t1 = time.time()
89        self.s.write(DATA)
90        t2 = time.time()
91        self.assertEqual(self.cancel_called, 1)
92        self.assertTrue(0.5 < (t2 - t1) < 2.5, 'Function did not return in time: {}'.format(t2 - t1))
93        #~ self.assertTrue(not self.s.isOpen())
94        #~ self.assertRaises(serial.SerialException, self.s.open)
95
96    #~ def test_cancel_before_write(self):
97        #~ self.s.cancel_write()
98        #~ self.s.write(DATA)
99        #~ self.s.reset_output_buffer()
100
101
102if __name__ == '__main__':
103    sys.stdout.write(__doc__)
104    if len(sys.argv) > 1:
105        PORT = sys.argv[1]
106    sys.stdout.write("Testing port: {!r}\n".format(PORT))
107    sys.argv[1:] = ['-v']
108    # When this module is executed from the command-line, it runs all its tests
109    unittest.main()
110