1#! python 2# 3# This module implements a special URL handler that wraps an other port, 4# print the traffic for debugging purposes. With this, it is possible 5# to debug the serial port traffic on every application that uses 6# serial_for_url. 7# 8# This file is part of pySerial. https://github.com/pyserial/pyserial 9# (C) 2015 Chris Liechti <cliechti@gmx.net> 10# 11# SPDX-License-Identifier: BSD-3-Clause 12# 13# URL format: spy://port[?option[=value][&option[=value]]] 14# options: 15# - dev=X a file or device to write to 16# - color use escape code to colorize output 17# - raw forward raw bytes instead of hexdump 18# 19# example: 20# redirect output to an other terminal window on Posix (Linux): 21# python -m serial.tools.miniterm spy:///dev/ttyUSB0?dev=/dev/pts/14\&color 22 23from __future__ import absolute_import 24 25import sys 26import time 27 28import serial 29from serial.serialutil import to_bytes 30 31try: 32 import urlparse 33except ImportError: 34 import urllib.parse as urlparse 35 36 37def sixteen(data): 38 """\ 39 yield tuples of hex and ASCII display in multiples of 16. Includes a 40 space after 8 bytes and (None, None) after 16 bytes and at the end. 41 """ 42 n = 0 43 for b in serial.iterbytes(data): 44 yield ('{:02X} '.format(ord(b)), b.decode('ascii') if b' ' <= b < b'\x7f' else '.') 45 n += 1 46 if n == 8: 47 yield (' ', '') 48 elif n >= 16: 49 yield (None, None) 50 n = 0 51 if n > 0: 52 while n < 16: 53 n += 1 54 if n == 8: 55 yield (' ', '') 56 yield (' ', ' ') 57 yield (None, None) 58 59 60def hexdump(data): 61 """yield lines with hexdump of data""" 62 values = [] 63 ascii = [] 64 offset = 0 65 for h, a in sixteen(data): 66 if h is None: 67 yield (offset, ' '.join([''.join(values), ''.join(ascii)])) 68 del values[:] 69 del ascii[:] 70 offset += 0x10 71 else: 72 values.append(h) 73 ascii.append(a) 74 75 76class FormatRaw(object): 77 """Forward only RX and TX data to output.""" 78 79 def __init__(self, output, color): 80 self.output = output 81 self.color = color 82 self.rx_color = '\x1b[32m' 83 self.tx_color = '\x1b[31m' 84 85 def rx(self, data): 86 """show received data""" 87 if self.color: 88 self.output.write(self.rx_color) 89 self.output.write(data) 90 self.output.flush() 91 92 def tx(self, data): 93 """show transmitted data""" 94 if self.color: 95 self.output.write(self.tx_color) 96 self.output.write(data) 97 self.output.flush() 98 99 def control(self, name, value): 100 """(do not) show control calls""" 101 pass 102 103 104class FormatHexdump(object): 105 """\ 106 Create a hex dump of RX ad TX data, show when control lines are read or 107 written. 108 109 output example:: 110 111 000000.000 Q-RX flushInput 112 000002.469 RTS inactive 113 000002.773 RTS active 114 000003.001 TX 48 45 4C 4C 4F HELLO 115 000003.102 RX 48 45 4C 4C 4F HELLO 116 117 """ 118 119 def __init__(self, output, color): 120 self.start_time = time.time() 121 self.output = output 122 self.color = color 123 self.rx_color = '\x1b[32m' 124 self.tx_color = '\x1b[31m' 125 self.control_color = '\x1b[37m' 126 127 def write_line(self, timestamp, label, value, value2=''): 128 self.output.write('{:010.3f} {:4} {}{}\n'.format(timestamp, label, value, value2)) 129 self.output.flush() 130 131 def rx(self, data): 132 """show received data as hex dump""" 133 if self.color: 134 self.output.write(self.rx_color) 135 if data: 136 for offset, row in hexdump(data): 137 self.write_line(time.time() - self.start_time, 'RX', '{:04X} '.format(offset), row) 138 else: 139 self.write_line(time.time() - self.start_time, 'RX', '<empty>') 140 141 def tx(self, data): 142 """show transmitted data as hex dump""" 143 if self.color: 144 self.output.write(self.tx_color) 145 for offset, row in hexdump(data): 146 self.write_line(time.time() - self.start_time, 'TX', '{:04X} '.format(offset), row) 147 148 def control(self, name, value): 149 """show control calls""" 150 if self.color: 151 self.output.write(self.control_color) 152 self.write_line(time.time() - self.start_time, name, value) 153 154 155class Serial(serial.Serial): 156 """\ 157 Inherit the native Serial port implementation and wrap all the methods and 158 attributes. 159 """ 160 # pylint: disable=no-member 161 162 def __init__(self, *args, **kwargs): 163 super(Serial, self).__init__(*args, **kwargs) 164 self.formatter = None 165 self.show_all = False 166 167 @serial.Serial.port.setter 168 def port(self, value): 169 if value is not None: 170 serial.Serial.port.__set__(self, self.from_url(value)) 171 172 def from_url(self, url): 173 """extract host and port from an URL string""" 174 parts = urlparse.urlsplit(url) 175 if parts.scheme != 'spy': 176 raise serial.SerialException( 177 'expected a string in the form ' 178 '"spy://port[?option[=value][&option[=value]]]": ' 179 'not starting with spy:// ({!r})'.format(parts.scheme)) 180 # process options now, directly altering self 181 formatter = FormatHexdump 182 color = False 183 output = sys.stderr 184 try: 185 for option, values in urlparse.parse_qs(parts.query, True).items(): 186 if option == 'file': 187 output = open(values[0], 'w') 188 elif option == 'color': 189 color = True 190 elif option == 'raw': 191 formatter = FormatRaw 192 elif option == 'all': 193 self.show_all = True 194 else: 195 raise ValueError('unknown option: {!r}'.format(option)) 196 except ValueError as e: 197 raise serial.SerialException( 198 'expected a string in the form ' 199 '"spy://port[?option[=value][&option[=value]]]": {}'.format(e)) 200 self.formatter = formatter(output, color) 201 return ''.join([parts.netloc, parts.path]) 202 203 def write(self, tx): 204 tx = to_bytes(tx) 205 self.formatter.tx(tx) 206 return super(Serial, self).write(tx) 207 208 def read(self, size=1): 209 rx = super(Serial, self).read(size) 210 if rx or self.show_all: 211 self.formatter.rx(rx) 212 return rx 213 214 if hasattr(serial.Serial, 'cancel_read'): 215 def cancel_read(self): 216 self.formatter.control('Q-RX', 'cancel_read') 217 super(Serial, self).cancel_read() 218 219 if hasattr(serial.Serial, 'cancel_write'): 220 def cancel_write(self): 221 self.formatter.control('Q-TX', 'cancel_write') 222 super(Serial, self).cancel_write() 223 224 @property 225 def in_waiting(self): 226 n = super(Serial, self).in_waiting 227 if self.show_all: 228 self.formatter.control('Q-RX', 'in_waiting -> {}'.format(n)) 229 return n 230 231 def flush(self): 232 self.formatter.control('Q-TX', 'flush') 233 super(Serial, self).flush() 234 235 def reset_input_buffer(self): 236 self.formatter.control('Q-RX', 'reset_input_buffer') 237 super(Serial, self).reset_input_buffer() 238 239 def reset_output_buffer(self): 240 self.formatter.control('Q-TX', 'reset_output_buffer') 241 super(Serial, self).reset_output_buffer() 242 243 def send_break(self, duration=0.25): 244 self.formatter.control('BRK', 'send_break {}s'.format(duration)) 245 super(Serial, self).send_break(duration) 246 247 @serial.Serial.break_condition.setter 248 def break_condition(self, level): 249 self.formatter.control('BRK', 'active' if level else 'inactive') 250 serial.Serial.break_condition.__set__(self, level) 251 252 @serial.Serial.rts.setter 253 def rts(self, level): 254 self.formatter.control('RTS', 'active' if level else 'inactive') 255 serial.Serial.rts.__set__(self, level) 256 257 @serial.Serial.dtr.setter 258 def dtr(self, level): 259 self.formatter.control('DTR', 'active' if level else 'inactive') 260 serial.Serial.dtr.__set__(self, level) 261 262 @serial.Serial.cts.getter 263 def cts(self): 264 level = super(Serial, self).cts 265 self.formatter.control('CTS', 'active' if level else 'inactive') 266 return level 267 268 @serial.Serial.dsr.getter 269 def dsr(self): 270 level = super(Serial, self).dsr 271 self.formatter.control('DSR', 'active' if level else 'inactive') 272 return level 273 274 @serial.Serial.ri.getter 275 def ri(self): 276 level = super(Serial, self).ri 277 self.formatter.control('RI', 'active' if level else 'inactive') 278 return level 279 280 @serial.Serial.cd.getter 281 def cd(self): 282 level = super(Serial, self).cd 283 self.formatter.control('CD', 'active' if level else 'inactive') 284 return level 285 286# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 287if __name__ == '__main__': 288 ser = Serial(None) 289 ser.port = 'spy:///dev/ttyS0' 290 print(ser) 291