1#!/usr/local/bin/python3.8 2# -*- coding: utf-8 -*- 3 4# Copyright (c) 2019 Grand Joldes (grandwork2@yahoo.com). 5# 6# This file is Copyright (c) 2019 by the GPSD project 7# 8# SPDX-License-Identifier: BSD-2-clause 9 10# This code run compatibly under Python 3.x for x >= 6. 11 12"""aiogps.py -- Asyncio Python interface to GPSD. 13 14This module adds asyncio support to the Python gps interface. It runs on 15Python versions >= 3.6 and provides the following benefits: 16 - easy integration in asyncio applications (all I/O operations done through 17 non-blocking coroutines, async context manager, async iterator); 18 - support for cancellation (all operations are cancellable); 19 - support for timeouts (on both read and connect); 20 - support for connection keep-alive (using the TCP keep alive mechanism) 21 - support for automatic re-connection; 22 - configurable connection parameters; 23 - configurable exeption handling (internally or by application); 24 - logging support (logger name: 'gps.aiogps'). 25 26The use of timeouts, keepalive and automatic reconnection make possible easy 27handling of GPSD connections over unreliable networks. 28 29Examples: 30 import logging 31 import gps.aiogps 32 33 # configuring logging 34 logging.basicConfig() 35 logging.root.setLevel(logging.INFO) 36 # Example of setting up logging level for the aiogps logger 37 logging.getLogger('gps.aiogps').setLevel(logging.ERROR) 38 39 # using default parameters 40 async with gps.aiogps.aiogps() as gpsd: 41 async for msg in gpsd: 42 # Log last message 43 logging.info(f'Received: {msg}') 44 # Log updated GPS status 45 logging.info(f'\nGPS status:\n{gpsd}') 46 47 # using custom parameters 48 try: 49 async with gps.aiogps.aiogps( 50 connection_args = { 51 'host': '192.168.10.116', 52 'port': 2947 53 }, 54 connection_timeout = 5, 55 reconnect = 0, # do not try to reconnect, raise exceptions 56 alive_opts = { 57 'rx_timeout': 5 58 } 59 ) as gpsd: 60 async for msg in gpsd: 61 logging.info(msg) 62 except asyncio.CancelledError: 63 return 64 except asyncio.IncompleteReadError: 65 logging.info('Connection closed by server') 66 except asyncio.TimeoutError: 67 logging.error('Timeout waiting for gpsd to respond') 68 except Exception as exc: 69 logging.error(f'Error: {exc}') 70 71""" 72 73__all__ = ['aiogps', ] 74 75import logging 76import asyncio 77import socket 78from typing import Optional, Union, Awaitable 79 80from .client import gpsjson, dictwrapper 81from .gps import gps, gpsdata, WATCH_ENABLE, PACKET_SET 82from .misc import polystr, polybytes 83 84 85class aiogps(gps): # pylint: disable=R0902 86 """An asyncio gps client. 87 88 Reimplements all gps IO methods using asyncio coros. Adds connection 89 management, an asyncio context manager and an asyncio iterator. 90 91 The class uses a logger named 'gps.aiogps' to record events. The logger is 92 configured with a NullHandler to disable any message logging until the 93 application configures another handler. 94 """ 95 96 def __init__(self, # pylint: disable=W0231 97 connection_args: Optional[dict] = None, 98 connection_timeout: Optional[float] = None, 99 reconnect: Optional[float] = 2, 100 alive_opts: Optional[dict] = None) -> None: 101 """ 102 Arguments: 103 connection_args: arguments needed for opening a connection. 104 These will be passed directly to asyncio.open_connection. 105 If set to None, a connection to the default gps host and port 106 will be attempded. 107 connection_timeout: time to wait for a connection to complete 108 (seconds). Set to None to disable. 109 reconnect: configures automatic reconnections: 110 - 0: reconnection is not attempted in case of an error and the 111 error is raised to the user; 112 - number > 0: delay until next reconnection attempt (seconds). 113 alive_opts: options related to detection of disconnections. 114 Two mecanisms are supported: TCP keepalive (default, may not be 115 available on all platforms) and Rx timeout, through the 116 following options: 117 - rx_timeout: Rx timeout (seconds). Set to None to disable. 118 - SO_KEEPALIVE: socket keepalive and related parameters: 119 - TCP_KEEPIDLE 120 - TCP_KEEPINTVL 121 - TCP_KEEPCNT 122 """ 123 # If connection_args are not specified use defaults 124 self.connection_args = connection_args or { 125 'host': self.host, 126 'port': self.port 127 } 128 self.connection_timeout = connection_timeout 129 assert reconnect >= 0 130 self.reconnect = reconnect 131 # If alive_opts are not specified use defaults 132 self.alive_opts = alive_opts or { 133 'rx_timeout': None, 134 'SO_KEEPALIVE': 1, 135 'TCP_KEEPIDLE': 2, 136 'TCP_KEEPINTVL': 2, 137 'TCP_KEEPCNT': 3 138 } 139 # Connection access streams 140 self.reader: Optional[asyncio.StreamReader] = None 141 self.writer: Optional[asyncio.StreamWriter] = None 142 # Set up logging 143 self.logger = logging.getLogger(__name__) 144 # Set the Null handler - prevents logging message handling unless the 145 # application sets up a handler. 146 self.logger.addHandler(logging.NullHandler()) 147 # Init gps parents 148 gpsdata.__init__(self) # pylint: disable=W0233 149 gpsjson.__init__(self) # pylint: disable=W0233 150 # Provide the response in both 'str' and 'bytes' form 151 self.bresponse = b'' 152 self.response = polystr(self.bresponse) 153 # Default stream command 154 self.stream_command = self.generate_stream_command(WATCH_ENABLE) 155 self.loop = self.connection_args.get('loop', asyncio.get_event_loop()) 156 157 def __del__(self) -> None: 158 """ Destructor """ 159 self.close() 160 161 async def _open_connection(self) -> None: 162 """ 163 Opens a connection to the GPSD server and configures the TCP socket. 164 """ 165 self.logger.info( 166 f"Connecting to gpsd at {self.connection_args['host']}" + 167 (f":{self.connection_args['port']}" 168 if self.connection_args['port'] else '')) 169 self.reader, self.writer = await asyncio.wait_for( 170 asyncio.open_connection(**self.connection_args), 171 self.connection_timeout, 172 loop=self.loop) 173 # Set socket options 174 sock = self.writer.get_extra_info('socket') 175 if sock is not None: 176 if 'SO_KEEPALIVE' in self.alive_opts: 177 sock.setsockopt(socket.SOL_SOCKET, 178 socket.SO_KEEPALIVE, 179 self.alive_opts['SO_KEEPALIVE']) 180 if hasattr( 181 sock, 182 'TCP_KEEPIDLE') and 'TCP_KEEPIDLE' in self.alive_opts: 183 sock.setsockopt(socket.IPPROTO_TCP, 184 socket.TCP_KEEPIDLE, # pylint: disable=E1101 185 self.alive_opts['TCP_KEEPIDLE']) 186 if hasattr( 187 sock, 188 'TCP_KEEPINTVL') and 'TCP_KEEPINTVL' in self.alive_opts: 189 sock.setsockopt(socket.IPPROTO_TCP, 190 socket.TCP_KEEPINTVL, # pylint: disable=E1101 191 self.alive_opts['TCP_KEEPINTVL']) 192 if hasattr( 193 sock, 194 'TCP_KEEPCNT') and 'TCP_KEEPCNT' in self.alive_opts: 195 sock.setsockopt(socket.IPPROTO_TCP, 196 socket.TCP_KEEPCNT, 197 self.alive_opts['TCP_KEEPCNT']) 198 199 def close(self) -> None: 200 """ Closes connection to GPSD server """ 201 if self.writer: 202 try: 203 self.writer.close() 204 except Exception: # pylint: disable=W0703 205 pass 206 self.writer = None 207 208 def waiting(self) -> bool: # pylint: disable=W0221 209 """ Mask the blocking waiting method from gpscommon """ 210 return True 211 212 async def read(self) -> Union[dictwrapper, str]: 213 """ Reads data from GPSD server """ 214 while True: 215 await self.connect() 216 try: 217 rx_timeout = self.alive_opts.get('rx_timeout', None) 218 reader = self.reader.readuntil(separator=b'\n') 219 self.bresponse = await asyncio.wait_for(reader, 220 rx_timeout, 221 loop=self.loop) 222 self.response = polystr(self.bresponse) 223 if self.response.startswith( 224 "{") and self.response.endswith("}\r\n"): 225 self.unpack(self.response) 226 self._oldstyle_shim() 227 self.valid |= PACKET_SET 228 return self.data 229 return self.response 230 except asyncio.CancelledError: 231 self.close() 232 raise 233 except Exception as exc: # pylint: disable=W0703 234 error = 'timeout' if isinstance( 235 exc, asyncio.TimeoutError) else exc 236 self.logger.warning( 237 f'Failed to get message from GPSD: {error}') 238 self.close() 239 if self.reconnect: 240 # Try again later 241 await asyncio.sleep(self.reconnect) 242 else: 243 raise 244 245 async def connect(self) -> None: # pylint: disable=W0221 246 """ Connects to GPSD server and starts streaming data """ 247 while not self.writer: 248 try: 249 await self._open_connection() 250 await self.stream() 251 self.logger.info('Connected to gpsd') 252 except asyncio.CancelledError: 253 self.close() 254 raise 255 except Exception as exc: # pylint: disable=W0703 256 error = 'timeout' if isinstance( 257 exc, asyncio.TimeoutError) else exc 258 self.logger.error(f'Failed to connect to GPSD: {error}') 259 self.close() 260 if self.reconnect: 261 # Try again later 262 await asyncio.sleep(self.reconnect) 263 else: 264 raise 265 266 async def send(self, commands) -> None: 267 """ Sends commands """ 268 bcommands = polybytes(commands + "\n") 269 if self.writer: 270 self.writer.write(bcommands) 271 await self.writer.drain() 272 273 async def stream(self, flags: Optional[int] = 0, 274 devpath: Optional[str] = None) -> None: 275 """ Creates and sends the stream command """ 276 if flags > 0: 277 # Update the stream command 278 self.stream_command = self.generate_stream_command(flags, devpath) 279 280 if self.stream_command: 281 self.logger.info(f'Sent stream as: {self.stream_command}') 282 await self.send(self.stream_command) 283 else: 284 raise TypeError(f'Invalid streaming command: {flags}') 285 286 async def __aenter__(self) -> 'aiogps': 287 """ Context manager entry """ 288 return self 289 290 async def __aexit__(self, exc_type, exc, traceback) -> None: 291 """ Context manager exit: close connection """ 292 self.close() 293 294 def __aiter__(self) -> 'aiogps': 295 """ Async iterator interface """ 296 return self 297 298 async def __anext__(self) -> Union[dictwrapper, str]: 299 """ Returns next message from GPSD """ 300 data = await self.read() 301 return data 302 303 def __next__(self) -> Awaitable: 304 """ 305 Reimplementation of the blocking iterator from gps. 306 Returns an awaitable which returns the next message from GPSD. 307 """ 308 return self.read() 309