1#! python
2#
3# This module implements a special URL handler that wraps an other port,
4# print the traffic for debugging purposes. With this, it is possible
5# to debug the serial port traffic on every application that uses
6# serial_for_url.
7#
8# This file is part of pySerial. https://github.com/pyserial/pyserial
9# (C) 2015 Chris Liechti <cliechti@gmx.net>
10#
11# SPDX-License-Identifier:    BSD-3-Clause
12#
13# URL format:    spy://port[?option[=value][&option[=value]]]
14# options:
15# - dev=X   a file or device to write to
16# - color   use escape code to colorize output
17# - raw     forward raw bytes instead of hexdump
18#
19# example:
20#   redirect output to an other terminal window on Posix (Linux):
21#   python -m serial.tools.miniterm spy:///dev/ttyUSB0?dev=/dev/pts/14\&color
22
23from __future__ import absolute_import
24
25import sys
26import time
27
28import serial
29from serial.serialutil import  to_bytes
30
31try:
32    import urlparse
33except ImportError:
34    import urllib.parse as urlparse
35
36
37def sixteen(data):
38    """\
39    yield tuples of hex and ASCII display in multiples of 16. Includes a
40    space after 8 bytes and (None, None) after 16 bytes and at the end.
41    """
42    n = 0
43    for b in serial.iterbytes(data):
44        yield ('{:02X} '.format(ord(b)), b.decode('ascii') if b' ' <= b < b'\x7f' else '.')
45        n += 1
46        if n == 8:
47            yield (' ', '')
48        elif n >= 16:
49            yield (None, None)
50            n = 0
51    if n > 0:
52        while n < 16:
53            n += 1
54            if n == 8:
55                yield (' ', '')
56            yield ('   ', ' ')
57        yield (None, None)
58
59
60def hexdump(data):
61    """yield lines with hexdump of data"""
62    values = []
63    ascii = []
64    offset = 0
65    for h, a in sixteen(data):
66        if h is None:
67            yield (offset, ' '.join([''.join(values), ''.join(ascii)]))
68            del values[:]
69            del ascii[:]
70            offset += 0x10
71        else:
72            values.append(h)
73            ascii.append(a)
74
75
76class FormatRaw(object):
77    """Forward only RX and TX data to output."""
78
79    def __init__(self, output, color):
80        self.output = output
81        self.color = color
82        self.rx_color = '\x1b[32m'
83        self.tx_color = '\x1b[31m'
84
85    def rx(self, data):
86        """show received data"""
87        if self.color:
88            self.output.write(self.rx_color)
89        self.output.write(data)
90        self.output.flush()
91
92    def tx(self, data):
93        """show transmitted data"""
94        if self.color:
95            self.output.write(self.tx_color)
96        self.output.write(data)
97        self.output.flush()
98
99    def control(self, name, value):
100        """(do not) show control calls"""
101        pass
102
103
104class FormatHexdump(object):
105    """\
106    Create a hex dump of RX ad TX data, show when control lines are read or
107    written.
108
109    output example::
110
111        000000.000 Q-RX flushInput
112        000002.469 RTS  inactive
113        000002.773 RTS  active
114        000003.001 TX   48 45 4C 4C 4F                                    HELLO
115        000003.102 RX   48 45 4C 4C 4F                                    HELLO
116
117    """
118
119    def __init__(self, output, color):
120        self.start_time = time.time()
121        self.output = output
122        self.color = color
123        self.rx_color = '\x1b[32m'
124        self.tx_color = '\x1b[31m'
125        self.control_color = '\x1b[37m'
126
127    def write_line(self, timestamp, label, value, value2=''):
128        self.output.write('{:010.3f} {:4} {}{}\n'.format(timestamp, label, value, value2))
129        self.output.flush()
130
131    def rx(self, data):
132        """show received data as hex dump"""
133        if self.color:
134            self.output.write(self.rx_color)
135        if data:
136            for offset, row in hexdump(data):
137                self.write_line(time.time() - self.start_time, 'RX', '{:04X}  '.format(offset), row)
138        else:
139            self.write_line(time.time() - self.start_time, 'RX', '<empty>')
140
141    def tx(self, data):
142        """show transmitted data as hex dump"""
143        if self.color:
144            self.output.write(self.tx_color)
145        for offset, row in hexdump(data):
146            self.write_line(time.time() - self.start_time, 'TX', '{:04X}  '.format(offset), row)
147
148    def control(self, name, value):
149        """show control calls"""
150        if self.color:
151            self.output.write(self.control_color)
152        self.write_line(time.time() - self.start_time, name, value)
153
154
155class Serial(serial.Serial):
156    """\
157    Inherit the native Serial port implementation and wrap all the methods and
158    attributes.
159    """
160    # pylint: disable=no-member
161
162    def __init__(self, *args, **kwargs):
163        super(Serial, self).__init__(*args, **kwargs)
164        self.formatter = None
165        self.show_all = False
166
167    @serial.Serial.port.setter
168    def port(self, value):
169        if value is not None:
170            serial.Serial.port.__set__(self, self.from_url(value))
171
172    def from_url(self, url):
173        """extract host and port from an URL string"""
174        parts = urlparse.urlsplit(url)
175        if parts.scheme != 'spy':
176            raise serial.SerialException(
177                'expected a string in the form '
178                '"spy://port[?option[=value][&option[=value]]]": '
179                'not starting with spy:// ({!r})'.format(parts.scheme))
180        # process options now, directly altering self
181        formatter = FormatHexdump
182        color = False
183        output = sys.stderr
184        try:
185            for option, values in urlparse.parse_qs(parts.query, True).items():
186                if option == 'file':
187                    output = open(values[0], 'w')
188                elif option == 'color':
189                    color = True
190                elif option == 'raw':
191                    formatter = FormatRaw
192                elif option == 'all':
193                    self.show_all = True
194                else:
195                    raise ValueError('unknown option: {!r}'.format(option))
196        except ValueError as e:
197            raise serial.SerialException(
198                'expected a string in the form '
199                '"spy://port[?option[=value][&option[=value]]]": {}'.format(e))
200        self.formatter = formatter(output, color)
201        return ''.join([parts.netloc, parts.path])
202
203    def write(self, tx):
204        tx = to_bytes(tx)
205        self.formatter.tx(tx)
206        return super(Serial, self).write(tx)
207
208    def read(self, size=1):
209        rx = super(Serial, self).read(size)
210        if rx or self.show_all:
211            self.formatter.rx(rx)
212        return rx
213
214    if hasattr(serial.Serial, 'cancel_read'):
215        def cancel_read(self):
216            self.formatter.control('Q-RX', 'cancel_read')
217            super(Serial, self).cancel_read()
218
219    if hasattr(serial.Serial, 'cancel_write'):
220        def cancel_write(self):
221            self.formatter.control('Q-TX', 'cancel_write')
222            super(Serial, self).cancel_write()
223
224    @property
225    def in_waiting(self):
226        n = super(Serial, self).in_waiting
227        if self.show_all:
228            self.formatter.control('Q-RX', 'in_waiting -> {}'.format(n))
229        return n
230
231    def flush(self):
232        self.formatter.control('Q-TX', 'flush')
233        super(Serial, self).flush()
234
235    def reset_input_buffer(self):
236        self.formatter.control('Q-RX', 'reset_input_buffer')
237        super(Serial, self).reset_input_buffer()
238
239    def reset_output_buffer(self):
240        self.formatter.control('Q-TX', 'reset_output_buffer')
241        super(Serial, self).reset_output_buffer()
242
243    def send_break(self, duration=0.25):
244        self.formatter.control('BRK', 'send_break {}s'.format(duration))
245        super(Serial, self).send_break(duration)
246
247    @serial.Serial.break_condition.setter
248    def break_condition(self, level):
249        self.formatter.control('BRK', 'active' if level else 'inactive')
250        serial.Serial.break_condition.__set__(self, level)
251
252    @serial.Serial.rts.setter
253    def rts(self, level):
254        self.formatter.control('RTS', 'active' if level else 'inactive')
255        serial.Serial.rts.__set__(self, level)
256
257    @serial.Serial.dtr.setter
258    def dtr(self, level):
259        self.formatter.control('DTR', 'active' if level else 'inactive')
260        serial.Serial.dtr.__set__(self, level)
261
262    @serial.Serial.cts.getter
263    def cts(self):
264        level = super(Serial, self).cts
265        self.formatter.control('CTS', 'active' if level else 'inactive')
266        return level
267
268    @serial.Serial.dsr.getter
269    def dsr(self):
270        level = super(Serial, self).dsr
271        self.formatter.control('DSR', 'active' if level else 'inactive')
272        return level
273
274    @serial.Serial.ri.getter
275    def ri(self):
276        level = super(Serial, self).ri
277        self.formatter.control('RI', 'active' if level else 'inactive')
278        return level
279
280    @serial.Serial.cd.getter
281    def cd(self):
282        level = super(Serial, self).cd
283        self.formatter.control('CD', 'active' if level else 'inactive')
284        return level
285
286# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
287if __name__ == '__main__':
288    ser = Serial(None)
289    ser.port = 'spy:///dev/ttyS0'
290    print(ser)
291