1#!/usr/local/bin/python3.8
2# -*- coding: utf-8 -*-
3
4# Copyright (c) 2019 Grand Joldes (grandwork2@yahoo.com).
5#
6# This file is Copyright (c) 2019 by the GPSD project
7#
8# SPDX-License-Identifier: BSD-2-clause
9
10# This code run compatibly under Python 3.x for x >= 6.
11
12"""aiogps.py -- Asyncio Python interface to GPSD.
13
14This module adds asyncio support to the Python gps interface. It runs on
15Python versions >= 3.6 and provides the following benefits:
16    - easy integration in asyncio applications (all I/O operations done through
17        non-blocking coroutines, async context manager, async iterator);
18    - support for cancellation (all operations are cancellable);
19    - support for timeouts (on both read and connect);
20    - support for connection keep-alive (using the TCP keep alive mechanism)
21    - support for automatic re-connection;
22    - configurable connection parameters;
23    - configurable exeption handling (internally or by application);
24    - logging support (logger name: 'gps.aiogps').
25
26The use of timeouts, keepalive and automatic reconnection make possible easy
27handling of GPSD connections over unreliable networks.
28
29Examples:
30    import logging
31    import gps.aiogps
32
33    # configuring logging
34    logging.basicConfig()
35    logging.root.setLevel(logging.INFO)
36    # Example of setting up logging level for the aiogps logger
37    logging.getLogger('gps.aiogps').setLevel(logging.ERROR)
38
39    # using default parameters
40    async with gps.aiogps.aiogps() as gpsd:
41        async for msg in gpsd:
42            # Log last message
43            logging.info(f'Received: {msg}')
44            # Log updated GPS status
45            logging.info(f'\nGPS status:\n{gpsd}')
46
47    # using custom parameters
48    try:
49        async with gps.aiogps.aiogps(
50                connection_args = {
51                    'host': '192.168.10.116',
52                    'port': 2947
53                },
54                connection_timeout = 5,
55                reconnect = 0,   # do not try to reconnect, raise exceptions
56                alive_opts = {
57                    'rx_timeout': 5
58                }
59            ) as gpsd:
60            async for msg in gpsd:
61                logging.info(msg)
62    except asyncio.CancelledError:
63        return
64    except asyncio.IncompleteReadError:
65        logging.info('Connection closed by server')
66    except asyncio.TimeoutError:
67        logging.error('Timeout waiting for gpsd to respond')
68    except Exception as exc:
69        logging.error(f'Error: {exc}')
70
71"""
72
73__all__ = ['aiogps', ]
74
75import logging
76import asyncio
77import socket
78from typing import Optional, Union, Awaitable
79
80from .client import gpsjson, dictwrapper
81from .gps import gps, gpsdata, WATCH_ENABLE, PACKET_SET
82from .misc import polystr, polybytes
83
84
85class aiogps(gps):  # pylint: disable=R0902
86    """An asyncio gps client.
87
88    Reimplements all gps IO methods using asyncio coros. Adds connection
89    management, an asyncio context manager and an asyncio iterator.
90
91    The class uses a logger named 'gps.aiogps' to record events. The logger is
92    configured with a NullHandler to disable any message logging until the
93    application configures another handler.
94    """
95
96    def __init__(self,  # pylint: disable=W0231
97                 connection_args: Optional[dict] = None,
98                 connection_timeout: Optional[float] = None,
99                 reconnect: Optional[float] = 2,
100                 alive_opts: Optional[dict] = None) -> None:
101        """
102        Arguments:
103            connection_args: arguments needed for opening a connection.
104                These will be passed directly to asyncio.open_connection.
105                If set to None, a connection to the default gps host and port
106                will be attempded.
107            connection_timeout: time to wait for a connection to complete
108                (seconds). Set to None to disable.
109            reconnect: configures automatic reconnections:
110                - 0: reconnection is not attempted in case of an error and the
111                error is raised to the user;
112                - number > 0: delay until next reconnection attempt (seconds).
113            alive_opts: options related to detection of disconnections.
114                Two mecanisms are supported: TCP keepalive (default, may not be
115                available on all platforms) and Rx timeout, through the
116                following options:
117                - rx_timeout: Rx timeout (seconds). Set to None to disable.
118                - SO_KEEPALIVE: socket keepalive and related parameters:
119                - TCP_KEEPIDLE
120                - TCP_KEEPINTVL
121                - TCP_KEEPCNT
122        """
123        # If connection_args are not specified use defaults
124        self.connection_args = connection_args or {
125            'host': self.host,
126            'port': self.port
127        }
128        self.connection_timeout = connection_timeout
129        assert reconnect >= 0
130        self.reconnect = reconnect
131        # If alive_opts are not specified use defaults
132        self.alive_opts = alive_opts or {
133            'rx_timeout': None,
134            'SO_KEEPALIVE': 1,
135            'TCP_KEEPIDLE': 2,
136            'TCP_KEEPINTVL': 2,
137            'TCP_KEEPCNT': 3
138        }
139        # Connection access streams
140        self.reader: Optional[asyncio.StreamReader] = None
141        self.writer: Optional[asyncio.StreamWriter] = None
142        # Set up logging
143        self.logger = logging.getLogger(__name__)
144        # Set the Null handler - prevents logging message handling unless the
145        # application sets up a handler.
146        self.logger.addHandler(logging.NullHandler())
147        # Init gps parents
148        gpsdata.__init__(self)  # pylint: disable=W0233
149        gpsjson.__init__(self)  # pylint: disable=W0233
150        # Provide the response in both 'str' and 'bytes' form
151        self.bresponse = b''
152        self.response = polystr(self.bresponse)
153        # Default stream command
154        self.stream_command = self.generate_stream_command(WATCH_ENABLE)
155        self.loop = self.connection_args.get('loop', asyncio.get_event_loop())
156
157    def __del__(self) -> None:
158        """ Destructor """
159        self.close()
160
161    async def _open_connection(self) -> None:
162        """
163        Opens a connection to the GPSD server and configures the TCP socket.
164        """
165        self.logger.info(
166            f"Connecting to gpsd at {self.connection_args['host']}" +
167            (f":{self.connection_args['port']}"
168             if self.connection_args['port'] else ''))
169        self.reader, self.writer = await asyncio.wait_for(
170            asyncio.open_connection(**self.connection_args),
171            self.connection_timeout,
172            loop=self.loop)
173        # Set socket options
174        sock = self.writer.get_extra_info('socket')
175        if sock is not None:
176            if 'SO_KEEPALIVE' in self.alive_opts:
177                sock.setsockopt(socket.SOL_SOCKET,
178                                socket.SO_KEEPALIVE,
179                                self.alive_opts['SO_KEEPALIVE'])
180            if hasattr(
181                    sock,
182                    'TCP_KEEPIDLE') and 'TCP_KEEPIDLE' in self.alive_opts:
183                sock.setsockopt(socket.IPPROTO_TCP,
184                                socket.TCP_KEEPIDLE,    # pylint: disable=E1101
185                                self.alive_opts['TCP_KEEPIDLE'])
186            if hasattr(
187                    sock,
188                    'TCP_KEEPINTVL') and 'TCP_KEEPINTVL' in self.alive_opts:
189                sock.setsockopt(socket.IPPROTO_TCP,
190                                socket.TCP_KEEPINTVL,   # pylint: disable=E1101
191                                self.alive_opts['TCP_KEEPINTVL'])
192            if hasattr(
193                    sock,
194                    'TCP_KEEPCNT') and 'TCP_KEEPCNT' in self.alive_opts:
195                sock.setsockopt(socket.IPPROTO_TCP,
196                                socket.TCP_KEEPCNT,
197                                self.alive_opts['TCP_KEEPCNT'])
198
199    def close(self) -> None:
200        """ Closes connection to GPSD server """
201        if self.writer:
202            try:
203                self.writer.close()
204            except Exception:   # pylint: disable=W0703
205                pass
206            self.writer = None
207
208    def waiting(self) -> bool:   # pylint: disable=W0221
209        """ Mask the blocking waiting method from gpscommon """
210        return True
211
212    async def read(self) -> Union[dictwrapper, str]:
213        """ Reads data from GPSD server """
214        while True:
215            await self.connect()
216            try:
217                rx_timeout = self.alive_opts.get('rx_timeout', None)
218                reader = self.reader.readuntil(separator=b'\n')
219                self.bresponse = await asyncio.wait_for(reader,
220                                                        rx_timeout,
221                                                        loop=self.loop)
222                self.response = polystr(self.bresponse)
223                if self.response.startswith(
224                        "{") and self.response.endswith("}\r\n"):
225                    self.unpack(self.response)
226                    self._oldstyle_shim()
227                    self.valid |= PACKET_SET
228                    return self.data
229                return self.response
230            except asyncio.CancelledError:
231                self.close()
232                raise
233            except Exception as exc:    # pylint: disable=W0703
234                error = 'timeout' if isinstance(
235                    exc, asyncio.TimeoutError) else exc
236                self.logger.warning(
237                    f'Failed to get message from GPSD: {error}')
238                self.close()
239                if self.reconnect:
240                    # Try again later
241                    await asyncio.sleep(self.reconnect)
242                else:
243                    raise
244
245    async def connect(self) -> None:    # pylint: disable=W0221
246        """ Connects to GPSD server and starts streaming data """
247        while not self.writer:
248            try:
249                await self._open_connection()
250                await self.stream()
251                self.logger.info('Connected to gpsd')
252            except asyncio.CancelledError:
253                self.close()
254                raise
255            except Exception as exc:    # pylint: disable=W0703
256                error = 'timeout' if isinstance(
257                    exc, asyncio.TimeoutError) else exc
258                self.logger.error(f'Failed to connect to GPSD: {error}')
259                self.close()
260                if self.reconnect:
261                    # Try again later
262                    await asyncio.sleep(self.reconnect)
263                else:
264                    raise
265
266    async def send(self, commands) -> None:
267        """ Sends commands """
268        bcommands = polybytes(commands + "\n")
269        if self.writer:
270            self.writer.write(bcommands)
271            await self.writer.drain()
272
273    async def stream(self, flags: Optional[int] = 0,
274                     devpath: Optional[str] = None) -> None:
275        """ Creates and sends the stream command """
276        if flags > 0:
277            # Update the stream command
278            self.stream_command = self.generate_stream_command(flags, devpath)
279
280        if self.stream_command:
281            self.logger.info(f'Sent stream as: {self.stream_command}')
282            await self.send(self.stream_command)
283        else:
284            raise TypeError(f'Invalid streaming command: {flags}')
285
286    async def __aenter__(self) -> 'aiogps':
287        """ Context manager entry """
288        return self
289
290    async def __aexit__(self, exc_type, exc, traceback) -> None:
291        """ Context manager exit: close connection """
292        self.close()
293
294    def __aiter__(self) -> 'aiogps':
295        """ Async iterator interface """
296        return self
297
298    async def __anext__(self) -> Union[dictwrapper, str]:
299        """ Returns next message from GPSD """
300        data = await self.read()
301        return data
302
303    def __next__(self) -> Awaitable:
304        """
305        Reimplementation of the blocking iterator from gps.
306        Returns an awaitable which returns the next message from GPSD.
307        """
308        return self.read()
309