1#!/usr/bin/env python 2# 3# This file is part of pySerial - Cross platform serial port support for Python 4# (C) 2016 Chris Liechti <cliechti@gmx.net> 5# 6# SPDX-License-Identifier: BSD-3-Clause 7""" 8Test cancel functionality. 9""" 10import sys 11import unittest 12import threading 13import time 14import serial 15 16# on which port should the tests be performed: 17PORT = 'loop://' 18 19 20@unittest.skipIf(not hasattr(serial.Serial, 'cancel_read'), "cancel_read not supported on platform") 21class TestCancelRead(unittest.TestCase): 22 """Test cancel_read functionality""" 23 24 def setUp(self): 25 # create a closed serial port 26 self.s = serial.serial_for_url(PORT) 27 self.assertTrue(hasattr(self.s, 'cancel_read'), "serial instance has no cancel_read") 28 self.s.timeout = 10 29 self.cancel_called = 0 30 31 def tearDown(self): 32 self.s.reset_output_buffer() 33 self.s.close() 34 35 def _cancel(self, num_times): 36 for i in range(num_times): 37 #~ print "cancel" 38 self.cancel_called += 1 39 self.s.cancel_read() 40 41 def test_cancel_once(self): 42 """Cancel read""" 43 threading.Timer(1, self._cancel, ((1,))).start() 44 t1 = time.time() 45 self.s.read(1000) 46 t2 = time.time() 47 self.assertEqual(self.cancel_called, 1) 48 self.assertTrue(0.5 < (t2 - t1) < 2.5, 'Function did not return in time: {}'.format(t2 - t1)) 49 #~ self.assertTrue(not self.s.isOpen()) 50 #~ self.assertRaises(serial.SerialException, self.s.open) 51 52 #~ def test_cancel_before_read(self): 53 #~ self.s.cancel_read() 54 #~ self.s.read() 55 56 57DATA = b'#' * 1024 58 59 60@unittest.skipIf(not hasattr(serial.Serial, 'cancel_write'), "cancel_read not supported on platform") 61class TestCancelWrite(unittest.TestCase): 62 """Test cancel_write functionality""" 63 64 def setUp(self): 65 # create a closed serial port 66 self.s = serial.serial_for_url(PORT, baudrate=300) # extra slow ~30B/s => 1kb ~ 34s 67 self.assertTrue(hasattr(self.s, 'cancel_write'), "serial instance has no cancel_write") 68 self.s.write_timeout = 10 69 self.cancel_called = 0 70 71 def tearDown(self): 72 self.s.reset_output_buffer() 73 # not all USB-Serial adapters will actually flush the output (maybe 74 # keeping the buffer in the MCU in the adapter) therefore, speed up by 75 # changing the baudrate 76 self.s.baudrate = 115200 77 self.s.flush() 78 self.s.close() 79 80 def _cancel(self, num_times): 81 for i in range(num_times): 82 self.cancel_called += 1 83 self.s.cancel_write() 84 85 def test_cancel_once(self): 86 """Cancel write""" 87 threading.Timer(1, self._cancel, ((1,))).start() 88 t1 = time.time() 89 self.s.write(DATA) 90 t2 = time.time() 91 self.assertEqual(self.cancel_called, 1) 92 self.assertTrue(0.5 < (t2 - t1) < 2.5, 'Function did not return in time: {}'.format(t2 - t1)) 93 #~ self.assertTrue(not self.s.isOpen()) 94 #~ self.assertRaises(serial.SerialException, self.s.open) 95 96 #~ def test_cancel_before_write(self): 97 #~ self.s.cancel_write() 98 #~ self.s.write(DATA) 99 #~ self.s.reset_output_buffer() 100 101 102if __name__ == '__main__': 103 sys.stdout.write(__doc__) 104 if len(sys.argv) > 1: 105 PORT = sys.argv[1] 106 sys.stdout.write("Testing port: {!r}\n".format(PORT)) 107 sys.argv[1:] = ['-v'] 108 # When this module is executed from the command-line, it runs all its tests 109 unittest.main() 110