1# This file is Copyright (c) 2010 by the GPSD project 2# BSD terms apply: see the file COPYING in the distribution root for details. 3""" 4gpsfake.py -- classes for creating a controlled test environment around gpsd. 5 6The gpsfake(1) regression tester shipped with GPSD is a trivial wrapper 7around this code. For a more interesting usage example, see the 8valgrind-audit script shipped with the GPSD code. 9 10To use this code, start by instantiating a TestSession class. Use the 11prefix argument if you want to run the daemon under some kind of run-time 12monitor like valgrind or gdb. Here are some particularly useful possibilities: 13 14valgrind --tool=memcheck --gen-suppressions=yes --leak-check=yes 15 Run under Valgrind, checking for malloc errors and memory leaks. 16 17xterm -e gdb -tui --args 18 Run under gdb, controlled from a new xterm. 19 20You can use the options argument to pass in daemon options; normally you will 21use this to set the debug-logging level. 22 23On initialization, the test object spawns an instance of gpsd with no 24devices or clients attached, connected to a control socket. 25 26TestSession has methods to attach and detch fake GPSes. The 27TestSession class simulates GPS devices for you with objects composed 28from a pty and a class instance that cycles sentences into the master side 29from some specified logfile; gpsd reads the slave side. A fake GPS is 30identified by the string naming its slave device. 31 32TestSession also has methods to start and end client sessions. Daemon 33responses to a client are fed to a hook function which, by default, 34discards them. Note that this data is 'bytes' to accommodate possible 35binary data in Python 3; use polystr() if you need a str. You can 36change the hook to misc.get_bytes_stream(sys.stdout).write to dump 37responses to standard output (this is what the gpsfake executable does) 38or do something more exotic. A client session is identified by a small 39integer that counts the number of client session starts. 40 41There are a couple of convenience methods. TestSession.wait() does nothing, 42allowing a specified number of seconds to elapse. TestSession.send() 43ships commands to an open client session. 44 45TestSession does not currently capture the daemon's log output. It is 46run with -N, so the output will go to stderr (along with, for example, 47Valgrind notifications). 48 49Each FakeGPS instance tries to packetize the data from the logfile it 50is initialized with. It uses the same packet-getter as the daemon. 51Exception: if there is a Delay-Cookie line in a header comment, that 52delimiter is used to split up the test load. 53 54The TestSession code maintains a run queue of FakeGPS and gps.gs 55(client- session) objects. It repeatedly cycles through the run queue. 56For each client session object in the queue, it tries to read data 57from gpsd. For each fake GPS, it sends one line or packet of stored 58data. When a fake-GPS's go predicate becomes false, the fake GPS is 59removed from the run queue. 60 61There are two ways to use this code. The more deterministic is 62non-threaded mode: set up your client sessions and fake GPS devices, 63then call the run() method. The run() method will terminate when 64there are no more objects in the run queue. Note, you must have 65created at least one fake client or fake GPS before calling run(), 66otherwise it will terminate immediately. 67 68To allow for adding and removing clients while the test is running, 69run in threaded mode by calling the start() method. This simply calls 70the run method in a subthread, with locking of critical regions. 71""" 72# This code runs compatibly under Python 2 and 3.x for x >= 2. 73# Preserve this property! 74from __future__ import absolute_import, print_function, division 75 76import os 77import pty 78import select 79import signal 80import socket 81import stat 82import subprocess 83import sys 84import termios # fcntl, array, struct 85import threading 86import time 87 88import gps 89from . import packet as sniffer 90 91# The magic number below has to be derived from observation. If 92# it's too high you'll slow the tests down a lot. If it's too low 93# you'll get regression tests timing out. 94 95# WRITE_PAD: Define a per-line delay on writes so we won't spam the 96# buffers in the pty layer or gpsd itself. Values smaller than the 97# system timer tick don't make any difference here. Can be set from 98# WRITE_PAD in the environment. 99 100if sys.platform.startswith("linux"): 101 WRITE_PAD = 0.0 102elif sys.platform.startswith("freebsd"): 103 WRITE_PAD = 0.01 104elif sys.platform.startswith("netbsd5"): 105 WRITE_PAD = 0.200 106elif sys.platform.startswith("netbsd"): 107 WRITE_PAD = 0.01 108elif sys.platform.startswith("darwin"): 109 # darwin Darwin-13.4.0-x86_64-i386-64bit 110 WRITE_PAD = 0.005 111else: 112 WRITE_PAD = 0.004 113 114# Additional delays in slow mode 115WRITE_PAD_SLOWDOWN = 0.01 116 117# If a test takes longer than this, we deem it to have timed out 118TEST_TIMEOUT = 60 119 120 121def GetDelay(slow=False): 122 "Get appropriate per-line delay." 123 delay = float(os.getenv("WRITE_PAD", WRITE_PAD)) 124 if slow: 125 delay += WRITE_PAD_SLOWDOWN 126 return delay 127 128 129class TestError(BaseException): 130 "Class TestError" 131 def __init__(self, msg): 132 super(TestError, self).__init__() 133 self.msg = msg 134 135 136class TestLoadError(TestError): 137 "Class TestLoadError, empty" 138 139 140class TestLoad(object): 141 "Digest a logfile into a list of sentences we can cycle through." 142 143 def __init__(self, logfp, predump=False, slow=False, oneshot=False): 144 self.sentences = [] # This is the interesting part 145 if isinstance(logfp, str): 146 logfp = open(logfp, "rb") 147 self.name = logfp.name 148 self.logfp = logfp 149 self.predump = predump 150 self.type = None 151 self.sourcetype = "pty" 152 self.serial = None 153 self.delay = GetDelay(slow) 154 self.delimiter = None 155 # Stash away a copy in case we need to resplit 156 text = logfp.read() 157 logfp = open(logfp.name, 'rb') 158 # Grab the packets in the normal way 159 getter = sniffer.new() 160 # gps.packet.register_report(reporter) 161 type_latch = None 162 commentlen = 0 163 while True: 164 # Note that packet data is bytes rather than str 165 (plen, ptype, packet, _counter) = getter.get(logfp.fileno()) 166 if plen <= 0: 167 break 168 elif ptype == sniffer.COMMENT_PACKET: 169 commentlen += len(packet) 170 # Some comments are magic 171 if b"Serial:" in packet: 172 # Change serial parameters 173 packet = packet[1:].strip() 174 try: 175 (_xx, baud, params) = packet.split() 176 baud = int(baud) 177 if params[0] in (b'7', b'8'): 178 databits = int(params[0]) 179 else: 180 raise ValueError 181 if params[1] in (b'N', b'O', b'E'): 182 parity = params[1] 183 else: 184 raise ValueError 185 if params[2] in (b'1', b'2'): 186 stopbits = int(params[2]) 187 else: 188 raise ValueError 189 except (ValueError, IndexError): 190 raise TestLoadError("bad serial-parameter spec in %s" % 191 self.name) 192 self.serial = (baud, databits, parity, stopbits) 193 elif b"Transport: UDP" in packet: 194 self.sourcetype = "UDP" 195 elif b"Transport: TCP" in packet: 196 self.sourcetype = "TCP" 197 elif b"Delay-Cookie:" in packet: 198 if packet.startswith(b"#"): 199 packet = packet[1:] 200 try: 201 (_dummy, self.delimiter, delay) = \ 202 packet.strip().split() 203 self.delay = float(delay) 204 except ValueError: 205 raise TestLoadError("bad Delay-Cookie line in %s" % 206 self.name) 207 self.resplit = True 208 else: 209 if type_latch is None: 210 type_latch = ptype 211 if self.predump: 212 print(repr(packet)) 213 if not packet: 214 raise TestLoadError("zero-length packet from %s" % 215 self.name) 216 self.sentences.append(packet) 217 # Look at the first packet to grok the GPS type 218 self.textual = (type_latch == sniffer.NMEA_PACKET) 219 if self.textual: 220 self.legend = "gpsfake: line %d: " 221 else: 222 self.legend = "gpsfake: packet %d" 223 # Maybe this needs to be split on different delimiters? 224 if self.delimiter is not None: 225 self.sentences = text[commentlen:].split(self.delimiter) 226 # Do we want single-shot operation? 227 if oneshot: 228 self.sentences.append(b"# EOF\n") 229 230 231class PacketError(TestError): 232 "Class PacketError, empty" 233 234 235class FakeGPS(object): 236 "Class FakeGPS" 237 def __init__(self, testload, progress=lambda x: None): 238 self.exhausted = 0 239 self.go_predicate = lambda: True 240 self.index = 0 241 self.progress = progress 242 self.readers = 0 243 self.testload = testload 244 self.progress("gpsfake: %s provides %d sentences\n" 245 % (self.testload.name, len(self.testload.sentences))) 246 247 def write(self, line): 248 "Throw an error if this superclass is ever instantiated." 249 raise ValueError(line) 250 251 def feed(self): 252 "Feed a line from the contents of the GPS log to the daemon." 253 line = self.testload.sentences[self.index 254 % len(self.testload.sentences)] 255 if b"%Delay:" in line: 256 # Delay specified number of seconds 257 delay = line.split()[1] 258 time.sleep(int(delay)) 259 # self.write has to be set by the derived class 260 self.write(line) 261 time.sleep(self.testload.delay) 262 self.index += 1 263 264 265class FakePTY(FakeGPS): 266 "A FakePTY is a pty with a test log ready to be cycled to it." 267 268 def __init__(self, testload, 269 speed=4800, databits=8, parity='N', stopbits=1, 270 progress=lambda x: None): 271 super(FakePTY, self).__init__(testload, progress) 272 # Allow Serial: header to be overridden by explicit speed. 273 if self.testload.serial: 274 (speed, databits, parity, stopbits) = self.testload.serial 275 self.speed = speed 276 baudrates = { 277 0: termios.B0, 278 50: termios.B50, 279 75: termios.B75, 280 110: termios.B110, 281 134: termios.B134, 282 150: termios.B150, 283 200: termios.B200, 284 300: termios.B300, 285 600: termios.B600, 286 1200: termios.B1200, 287 1800: termios.B1800, 288 2400: termios.B2400, 289 4800: termios.B4800, 290 9600: termios.B9600, 291 19200: termios.B19200, 292 38400: termios.B38400, 293 57600: termios.B57600, 294 115200: termios.B115200, 295 230400: termios.B230400, 296 } 297 (self.fd, self.slave_fd) = pty.openpty() 298 self.byname = os.ttyname(self.slave_fd) 299 os.chmod(self.byname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | 300 stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH) 301 (iflag, oflag, cflag, lflag, ispeed, ospeed, cc) = termios.tcgetattr( 302 self.slave_fd) 303 cc[termios.VMIN] = 1 304 cflag &= ~(termios.PARENB | termios.PARODD | termios.CRTSCTS) 305 cflag |= termios.CREAD | termios.CLOCAL 306 iflag = oflag = lflag = 0 307 iflag &= ~ (termios.PARMRK | termios.INPCK) 308 cflag &= ~ (termios.CSIZE | termios.CSTOPB | termios.PARENB | 309 termios.PARODD) 310 if databits == 7: 311 cflag |= termios.CS7 312 else: 313 cflag |= termios.CS8 314 if stopbits == 2: 315 cflag |= termios.CSTOPB 316 # Warning: attempting to set parity makes Fedora lose its cookies 317 if parity == 'E': 318 iflag |= termios.INPCK 319 cflag |= termios.PARENB 320 elif parity == 'O': 321 iflag |= termios.INPCK 322 cflag |= termios.PARENB | termios.PARODD 323 ispeed = ospeed = baudrates[speed] 324 try: 325 termios.tcsetattr(self.slave_fd, termios.TCSANOW, 326 [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]) 327 except termios.error: 328 raise TestLoadError("error attempting to set serial mode to %s " 329 " %s%s%s" 330 % (speed, databits, parity, stopbits)) 331 332 def read(self): 333 "Discard control strings written by gpsd." 334 # A tcflush implementation works on Linux but fails on OpenBSD 4. 335 termios.tcflush(self.fd, termios.TCIFLUSH) 336 # Alas, the FIONREAD version also works on Linux and fails on OpenBSD. 337 # try: 338 # buf = array.array('i', [0]) 339 # fcntl.ioctl(self.master_fd, termios.FIONREAD, buf, True) 340 # n = struct.unpack('i', buf)[0] 341 # os.read(self.master_fd, n) 342 # except IOError: 343 # pass 344 345 def write(self, line): 346 self.progress("gpsfake: %s writes %d=%s\n" 347 % (self.testload.name, len(line), repr(line))) 348 os.write(self.fd, line) 349 350 def drain(self): 351 "Wait for the associated device to drain (e.g. before closing)." 352 termios.tcdrain(self.fd) 353 354 355def cleansocket(host, port, socktype=socket.SOCK_STREAM): 356 "Get a socket that we can re-use cleanly after it's closed." 357 cs = socket.socket(socket.AF_INET, socktype) 358 # This magic prevents "Address already in use" errors after 359 # we release the socket. 360 cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 361 cs.bind((host, port)) 362 return cs 363 364 365def freeport(socktype=socket.SOCK_STREAM): 366 """Get a free port number for the given connection type. 367 368 This lets the OS assign a unique port, and then assumes 369 that it will become available for reuse once the socket 370 is closed, and remain so long enough for the real use. 371 """ 372 s = cleansocket("127.0.0.1", 0, socktype) 373 port = s.getsockname()[1] 374 s.close() 375 return port 376 377 378class FakeTCP(FakeGPS): 379 "A TCP serverlet with a test log ready to be cycled to it." 380 381 def __init__(self, testload, 382 host, port, 383 progress=lambda x: None): 384 super(FakeTCP, self).__init__(testload, progress) 385 self.host = host 386 self.dispatcher = cleansocket(self.host, int(port)) 387 # Get actual assigned port 388 self.port = self.dispatcher.getsockname()[1] 389 self.byname = "tcp://" + host + ":" + str(self.port) 390 self.dispatcher.listen(5) 391 self.readables = [self.dispatcher] 392 393 def read(self): 394 "Handle connection requests and data." 395 readable, _writable, _errored = select.select(self.readables, [], [], 396 0) 397 for s in readable: 398 if s == self.dispatcher: # Connection request 399 client_socket, _address = s.accept() 400 self.readables = [client_socket] 401 # Depending on timing, gpsd may try to reconnect between the 402 # end of the log data and the remove_device. With no listener, 403 # this results in spurious error messages. Keeping the 404 # listener around avoids this. It will eventually be closed 405 # by the Python object cleanup. self.dispatcher.close() 406 else: # Incoming data 407 data = s.recv(1024) 408 if not data: 409 s.close() 410 self.readables.remove(s) 411 412 def write(self, line): 413 "Send the next log packet to everybody connected." 414 self.progress("gpsfake: %s writes %d=%s\n" 415 % (self.testload.name, len(line), repr(line))) 416 for s in self.readables: 417 if s != self.dispatcher: 418 s.send(line) 419 420 def drain(self): 421 "Wait for the associated device(s) to drain (e.g. before closing)." 422 for s in self.readables: 423 if s != self.dispatcher: 424 s.shutdown(socket.SHUT_RDWR) 425 426 427class FakeUDP(FakeGPS): 428 "A UDP broadcaster with a test log ready to be cycled to it." 429 430 def __init__(self, testload, 431 ipaddr, port, 432 progress=lambda x: None): 433 super(FakeUDP, self).__init__(testload, progress) 434 self.byname = "udp://" + ipaddr + ":" + str(port) 435 self.ipaddr = ipaddr 436 self.port = port 437 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 438 439 def read(self): 440 "Discard control strings written by gpsd." 441 return 442 443 def write(self, line): 444 self.progress("gpsfake: %s writes %d=%s\n" 445 % (self.testload.name, len(line), repr(line))) 446 self.sock.sendto(line, (self.ipaddr, int(self.port))) 447 448 def drain(self): 449 "Wait for the associated device to drain (e.g. before closing)." 450 # shutdown() fails on UDP 451 return # shutdown() fails on UDP 452 453 454class SubprogramError(TestError): 455 "Class SubprogramError" 456 def __str__(self): 457 return repr(self.msg) 458 459 460class SubprogramInstance(object): 461 "Class for generic subprogram." 462 ERROR = SubprogramError 463 464 def __init__(self): 465 self.spawncmd = None 466 self.process = None 467 self.returncode = None 468 self.env = None 469 470 def spawn_sub(self, program, options, background=False, prefix="", 471 env=None): 472 "Spawn a subprogram instance." 473 spawncmd = None 474 475 # Look for program in GPSD_HOME env variable 476 if os.environ.get('GPSD_HOME'): 477 for path in os.environ['GPSD_HOME'].split(':'): 478 _spawncmd = "%s/%s" % (path, program) 479 if os.path.isfile(_spawncmd) and os.access(_spawncmd, os.X_OK): 480 spawncmd = _spawncmd 481 break 482 483 # if we could not find it yet try PATH env variable for it 484 if not spawncmd: 485 if '/usr/sbin' not in os.environ['PATH']: 486 os.environ['PATH'] = os.environ['PATH'] + ":/usr/sbin" 487 for path in os.environ['PATH'].split(':'): 488 _spawncmd = "%s/%s" % (path, program) 489 if os.path.isfile(_spawncmd) and os.access(_spawncmd, os.X_OK): 490 spawncmd = _spawncmd 491 break 492 493 if not spawncmd: 494 raise self.ERROR("Cannot execute %s: executable not found. " 495 "Set GPSD_HOME env variable" % program) 496 self.spawncmd = [spawncmd] + options.split() 497 if prefix: 498 self.spawncmd = prefix.split() + self.spawncmd 499 if env: 500 self.env = os.environ.copy() 501 self.env.update(env) 502 self.process = subprocess.Popen(self.spawncmd, env=self.env) 503 if not background: 504 self.returncode = status = self.process.wait() 505 if os.WIFSIGNALED(status) or os.WEXITSTATUS(status): 506 raise self.ERROR("%s exited with status %d" 507 % (program, status)) 508 509 def is_alive(self): 510 "Is the program still alive?" 511 if not self.process: 512 return False 513 self.returncode = self.process.poll() 514 if self.returncode is None: 515 return True 516 self.process = None 517 return False 518 519 def kill(self): 520 "Kill the program instance." 521 while self.is_alive(): 522 try: # terminate() may fail if already killed 523 self.process.terminate() 524 except OSError: 525 continue 526 time.sleep(0.01) 527 528 529class DaemonError(SubprogramError): 530 "Class DaemonError" 531 532 533class DaemonInstance(SubprogramInstance): 534 "Control a gpsd instance." 535 ERROR = DaemonError 536 537 def __init__(self, control_socket=None): 538 self.sock = None 539 super(DaemonInstance, self).__init__() 540 if control_socket: 541 self.control_socket = control_socket 542 else: 543 tmpdir = os.environ.get('TMPDIR', '/tmp') 544 self.control_socket = "%s/gpsfake-%d.sock" % (tmpdir, os.getpid()) 545 546 def spawn(self, options, port, background=False, prefix=""): 547 "Spawn a daemon instance." 548 # The -b option to suppress hanging on probe returns is needed to cope 549 # with OpenBSD (and possibly other non-Linux systems) that don't 550 # support anything we can use to implement the FakeGPS.read() method 551 opts = (" -b -N -S %s -F %s %s" 552 % (port, self.control_socket, options)) 553 # Derive a unique SHM key from the port # to avoid collisions. 554 # Use 'Gp' as the prefix to avoid colliding with 'GPSD'. 555 shmkey = '0x4770%.04X' % int(port) 556 env = {'GPSD_SHM_KEY': shmkey} 557 self.spawn_sub('gpsd', opts, background, prefix, env) 558 559 def wait_ready(self): 560 "Wait for the daemon to create the control socket." 561 while self.is_alive(): 562 if os.path.exists(self.control_socket): 563 return 564 time.sleep(0.1) 565 566 def __get_control_socket(self): 567 # Now we know it's running, get a connection to the control socket. 568 if not os.path.exists(self.control_socket): 569 return None 570 try: 571 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 572 self.sock.connect(self.control_socket) 573 except socket.error: 574 if self.sock: 575 self.sock.close() 576 self.sock = None 577 return self.sock 578 579 def add_device(self, path): 580 "Add a device to the daemon's internal search list." 581 if self.__get_control_socket(): 582 self.sock.sendall(gps.polybytes("+%s\r\n\x00" % path)) 583 self.sock.recv(12) 584 self.sock.close() 585 586 def remove_device(self, path): 587 "Remove a device from the daemon's internal search list." 588 if self.__get_control_socket(): 589 self.sock.sendall(gps.polybytes("-%s\r\n\x00" % path)) 590 self.sock.recv(12) 591 self.sock.close() 592 593 594class TestSessionError(TestError): 595 "class TestSessionError" 596 # why does testSessionError() do nothing? " 597 598 599class TestSession(object): 600 "Manage a session including a daemon with fake GPSes and clients." 601 602 def __init__(self, prefix=None, port=None, options=None, verbose=0, 603 predump=False, udp=False, tcp=False, slow=False, 604 timeout=None): 605 "Initialize the test session by launching the daemon." 606 self.prefix = prefix 607 self.options = options 608 self.verbose = verbose 609 self.predump = predump 610 self.udp = udp 611 self.tcp = tcp 612 self.slow = slow 613 self.daemon = DaemonInstance() 614 self.fakegpslist = {} 615 self.client_id = 0 616 self.readers = 0 617 self.writers = 0 618 self.runqueue = [] 619 self.index = 0 620 if port: 621 self.port = port 622 else: 623 self.port = freeport() 624 self.progress = lambda x: None 625 # for debugging 626 # self.progress = lambda x: sys.stderr.write("# Hi " + x) 627 self.reporter = lambda x: None 628 self.default_predicate = None 629 self.fd_set = [] 630 self.threadlock = None 631 self.timeout = TEST_TIMEOUT if timeout is None else timeout 632 633 def spawn(self): 634 "Spawn daemon" 635 for sig in (signal.SIGQUIT, signal.SIGINT, signal.SIGTERM): 636 signal.signal(sig, lambda unused, dummy: self.cleanup()) 637 self.daemon.spawn(background=True, prefix=self.prefix, port=self.port, 638 options=self.options) 639 self.daemon.wait_ready() 640 641 def set_predicate(self, pred): 642 "Set a default go predicate for the session." 643 self.default_predicate = pred 644 645 def gps_add(self, logfile, speed=19200, pred=None, oneshot=False): 646 "Add a simulated GPS being fed by the specified logfile." 647 self.progress("gpsfake: gps_add(%s, %d)\n" % (logfile, speed)) 648 if logfile not in self.fakegpslist: 649 testload = TestLoad(logfile, predump=self.predump, slow=self.slow, 650 oneshot=oneshot) 651 if testload.sourcetype == "UDP" or self.udp: 652 newgps = FakeUDP(testload, ipaddr="127.0.0.1", 653 port=freeport(socket.SOCK_DGRAM), 654 progress=self.progress) 655 elif testload.sourcetype == "TCP" or self.tcp: 656 # Let OS assign the port 657 newgps = FakeTCP(testload, host="127.0.0.1", port=0, 658 progress=self.progress) 659 else: 660 newgps = FakePTY(testload, speed=speed, 661 progress=self.progress) 662 if pred: 663 newgps.go_predicate = pred 664 elif self.default_predicate: 665 newgps.go_predicate = self.default_predicate 666 self.fakegpslist[newgps.byname] = newgps 667 self.append(newgps) 668 newgps.exhausted = 0 669 self.daemon.add_device(newgps.byname) 670 return newgps.byname 671 672 def gps_remove(self, name): 673 "Remove a simulated GPS from the daemon's search list." 674 self.progress("gpsfake: gps_remove(%s)\n" % name) 675 self.fakegpslist[name].drain() 676 self.remove(self.fakegpslist[name]) 677 self.daemon.remove_device(name) 678 del self.fakegpslist[name] 679 680 def client_add(self, commands): 681 "Initiate a client session and force connection to a fake GPS." 682 self.progress("gpsfake: client_add()\n") 683 try: 684 newclient = gps.gps(port=self.port, verbose=self.verbose) 685 except socket.error: 686 if not self.daemon.is_alive(): 687 raise TestSessionError("daemon died") 688 raise 689 self.append(newclient) 690 newclient.id = self.client_id + 1 691 self.client_id += 1 692 self.progress("gpsfake: client %d has %s\n" 693 % (self.client_id, newclient.device)) 694 if commands: 695 self.initialize(newclient, commands) 696 return self.client_id 697 698 def client_remove(self, cid): 699 "Terminate a client session." 700 self.progress("gpsfake: client_remove(%d)\n" % cid) 701 for obj in self.runqueue: 702 if isinstance(obj, gps.gps) and obj.id == cid: 703 self.remove(obj) 704 return True 705 return False 706 707 def wait(self, seconds): 708 "Wait, doing nothing." 709 self.progress("gpsfake: wait(%d)\n" % seconds) 710 time.sleep(seconds) 711 712 def gather(self, seconds): 713 "Wait, doing nothing but watching for sentences." 714 self.progress("gpsfake: gather(%d)\n" % seconds) 715 time.sleep(seconds) 716 717 def cleanup(self): 718 "We're done, kill the daemon." 719 self.progress("gpsfake: cleanup()\n") 720 if self.daemon: 721 self.daemon.kill() 722 self.daemon = None 723 724 def run(self): 725 "Run the tests." 726 try: 727 self.progress("gpsfake: test loop begins\n") 728 while self.daemon: 729 if not self.daemon.is_alive(): 730 raise TestSessionError("daemon died") 731 # We have to read anything that gpsd might have tried 732 # to send to the GPS here -- under OpenBSD the 733 # TIOCDRAIN will hang, otherwise. 734 for device in self.runqueue: 735 if isinstance(device, FakeGPS): 736 device.read() 737 had_output = False 738 chosen = self.choose() 739 if isinstance(chosen, FakeGPS): 740 if (((chosen.exhausted and self.timeout and 741 (time.time() - chosen.exhausted > self.timeout) and 742 chosen.byname in self.fakegpslist))): 743 sys.stderr.write( 744 "Test timed out: maybe increase WRITE_PAD (= %s)\n" 745 % GetDelay(self.slow)) 746 raise SystemExit(1) 747 748 if not chosen.go_predicate(chosen.index, chosen): 749 if chosen.exhausted == 0: 750 chosen.exhausted = time.time() 751 self.progress("gpsfake: GPS %s ran out of input\n" 752 % chosen.byname) 753 else: 754 chosen.feed() 755 elif isinstance(chosen, gps.gps): 756 if chosen.enqueued: 757 chosen.send(chosen.enqueued) 758 chosen.enqueued = "" 759 while chosen.waiting(): 760 if not self.daemon or not self.daemon.is_alive(): 761 raise TestSessionError("daemon died") 762 if chosen.read() < 0: 763 raise TestSessionError("daemon output stopped") 764 had_output = True 765 if not chosen.valid & gps.PACKET_SET: 766 continue 767 self.reporter(chosen.bresponse) 768 if ((chosen.data["class"] == "DEVICE" and 769 chosen.data["activated"] == 0 and 770 chosen.data["path"] in self.fakegpslist)): 771 self.gps_remove(chosen.data["path"]) 772 self.progress( 773 "gpsfake: GPS %s removed (notification)\n" 774 % chosen.data["path"]) 775 else: 776 raise TestSessionError("test object of unknown type") 777 if not self.writers and not had_output: 778 self.progress("gpsfake: no writers and no output\n") 779 break 780 self.progress("gpsfake: test loop ends\n") 781 finally: 782 self.cleanup() 783 784 # All knowledge about locks and threading is below this line, 785 # except for the bare fact that self.threadlock is set to None 786 # in the class init method. 787 788 def append(self, obj): 789 "Add a producer or consumer to the object list." 790 if self.threadlock: 791 self.threadlock.acquire() 792 self.runqueue.append(obj) 793 if isinstance(obj, FakeGPS): 794 self.writers += 1 795 elif isinstance(obj, gps.gps): 796 self.readers += 1 797 if self.threadlock: 798 self.threadlock.release() 799 800 def remove(self, obj): 801 "Remove a producer or consumer from the object list." 802 if self.threadlock: 803 self.threadlock.acquire() 804 self.runqueue.remove(obj) 805 if isinstance(obj, FakeGPS): 806 self.writers -= 1 807 elif isinstance(obj, gps.gps): 808 self.readers -= 1 809 self.index = min(len(self.runqueue) - 1, self.index) 810 if self.threadlock: 811 self.threadlock.release() 812 813 def choose(self): 814 "Atomically get the next object scheduled to do something." 815 if self.threadlock: 816 self.threadlock.acquire() 817 chosen = self.index 818 self.index += 1 819 self.index %= len(self.runqueue) 820 if self.threadlock: 821 self.threadlock.release() 822 return self.runqueue[chosen] 823 824 def initialize(self, client, commands): 825 "Arrange for client to ship specified commands when it goes active." 826 client.enqueued = "" 827 if not self.threadlock: 828 client.send(commands) 829 else: 830 client.enqueued = commands 831 832 def start(self): 833 "Start thread" 834 self.threadlock = threading.Lock() 835 threading.Thread(target=self.run) 836 837# End 838