1# This file is Copyright (c) 2010 by the GPSD project 2# SPDX-License-Identifier: BSD-2-clause 3""" 4A GPS simulator. 5 6This is proof-of-concept code, not production ready; some functions are stubs. 7""" 8import math 9import random 10import sys 11import time 12 13# pylint wants local modules last 14try: 15 import gps 16 import gpslib 17except ImportError as e: 18 sys.stderr.write( 19 "gpssim.py: can't load Python gps libraries -- check PYTHONPATH.\n") 20 sys.stderr.write("%s\n" % e) 21 sys.exit(1) 22 23# First, the mathematics. We simulate a moving viewpoint on the Earth 24# and a satellite with specified orbital elements in the sky. 25 26 27class ksv(object): 28 "Kinematic state vector." 29 30 def __init__(self, time=0, lat=0, lon=0, alt=0, course=0, 31 speed=0, climb=0, h_acc=0, v_acc=0): 32 self.time = time # Seconds from epoch 33 self.lat = lat # Decimal degrees 34 self.lon = lon # Decimal degrees 35 self.alt = alt # Meters 36 self.course = course # Degrees from true North 37 self.speed = speed # Meters per second 38 self.climb = climb # Meters per second 39 self.h_acc = h_acc # Meters per second per second 40 self.v_acc = v_acc # Meters per second per second 41 42 def next(self, quantum=1): 43 "State after quantum." 44 self.time += quantum 45 avspeed = (2 * self.speed + self.h_acc * quantum) / 2 46 avclimb = (2 * self.climb + self.v_acc * quantum) / 2 47 self.alt += avclimb * quantum 48 self.speed += self.h_acc * quantum 49 self.climb += self.v_acc * quantum 50 distance = avspeed * quantum 51 # Formula from <http://williams.best.vwh.net/avform.htm#Rhumb> 52 # Initial point cannot be a pole, but GPS doesn't work at high. 53 # latitudes anyway so it would be OK to fail there. 54 # Seems to assume a spherical Earth, which means it's going 55 # to have a slight inaccuracy rising towards the poles. 56 # The if/then avoids 0/0 indeterminacies on E-W courses. 57 tc = gps.Deg2Rad(self.course) 58 lat = gps.Deg2Rad(self.lat) 59 lon = gps.Deg2Rad(self.lon) 60 lat += distance * math.cos(tc) 61 dphi = math.log(math.tan(lat / 2 + math.pi / 4) / 62 math.tan(self.lat / 2 + math.pi / 4)) 63 if abs(lat - self.lat) < math.sqrt(1e-15): 64 q = math.cos(self.lat) 65 else: 66 q = (lat - self.lat) / dphi 67 dlon = -distance * math.sin(tc) / q 68 self.lon = gps.Rad2Deg(math.mod(lon + dlon + math.pi, 2 * math.pi) - 69 math.pi) 70 self.lat = gps.Rad2Deg(lat) 71 72# Satellite orbital elements are available at: 73# <http://www.ngs.noaa.gov/orbits/> 74# Orbital theory at: 75# <http://www.wolffdata.se/gps/gpshtml/anomalies.html> 76 77 78class satellite(object): 79 "Orbital elements of one satellite. PRESENTLY A STUB" 80 81 def __init__(self, prn): 82 self.prn = prn 83 84 def position(self, time): 85 "Return right ascension and declination of satellite," 86 return 87 88# Next, the command interpreter. This is an object that takes an 89# input source in the track description language, interprets it into 90# sammples that might be reported by a GPS, and calls a reporting 91# class to generate output. 92 93 94class gpssimException(BaseException): 95 def __init__(self, message, filename, lineno): 96 BaseException.__init__(self) 97 self.message = message 98 self.filename = filename 99 self.lineno = lineno 100 101 def __str__(self): 102 return '"%s", %d:' % (self.filename, self.lineno) 103 104 105class gpssim(object): 106 "Simulate a moving sensor, with skyview." 107 active_PRNs = list(range(1, 24 + 1)) + [134, ] 108 109 def __init__(self, outfmt): 110 self.ksv = ksv() 111 self.ephemeris = {} 112 # This sets up satellites at random. Not really what we want. 113 for prn in gpssim.active_PRNs: 114 for (prn, _satellite) in list(self.ephemeris.items()): 115 self.skyview[prn] = (random.randint(-60, +61), 116 random.randint(0, 359)) 117 self.have_ephemeris = False 118 self.channels = {} 119 self.outfmt = outfmt 120 self.status = gps.STATUS_NO_FIX 121 self.mode = gps.MODE_NO_FIX 122 self.validity = "V" 123 self.satellites_used = 0 124 self.filename = None 125 self.lineno = 0 126 127 def parse_tdl(self, line): 128 "Interpret one TDL directive." 129 line = line.strip() 130 if "#" in line: 131 line = line[:line.find("#")] 132 if line == '': 133 return 134 fields = line.split() 135 command = fields[0] 136 if command == "time": 137 self.ksv.time = gps.isotime(fields[1]) 138 elif command == "location": 139 (self.lat, self.lon, self.alt) = list(map(float, fields[1:])) 140 elif command == "course": 141 self.ksv.time = float(fields[1]) 142 elif command == "speed": 143 self.ksv.speed = float(fields[1]) 144 elif command == "climb": 145 self.ksv.climb = float(fields[1]) 146 elif command == "acceleration": 147 (self.ksv.h_acc, self.ksv.h_acc) = list(map(float, fields[1:])) 148 elif command == "snr": 149 self.channels[int(fields[1])] = float(fields[2]) 150 elif command == "go": 151 self.go(int(fields[1])) 152 elif command == "status": 153 try: 154 code = fields[1] 155 self.status = {"no_fix": 0, "fix": 1, "dgps_fix": 2}[ 156 code.lower()] 157 except KeyError: 158 raise gpssimException("invalid status code '%s'" % code, 159 self.filename, self.lineno) 160 elif command == "mode": 161 try: 162 code = fields[1] 163 self.status = {"no_fix": 1, "2d": 2, "3d": 3}[code.lower()] 164 except KeyError: 165 raise gpssimException("invalid mode code '%s'" % code, 166 self.filename, self.lineno) 167 elif command == "satellites": 168 self.satellites_used = int(fields[1]) 169 elif command == "validity": 170 self.validity = fields[1] 171 else: 172 raise gpssimException("unknown command '%s'" % fields[1], 173 self.filename, self.lineno) 174 # FIX-ME: add syntax for ephemeris elements 175 self.lineno += 1 176 177 def filter(self, inp, outp): 178 "Make this a filter for file-like objects." 179 self.filename = input.name 180 self.lineno = 1 181 self.output = outp 182 for line in inp: 183 self.execute(line) 184 185 def go(self, seconds): 186 "Run the simulation for a specified number of seconds." 187 for i in range(seconds): 188 next(self.ksv) 189 if self.have_ephemeris: 190 self.skyview = {} 191 for (prn, satellite) in list(self.ephemeris.items()): 192 self.skyview[prn] = satellite.position(i) 193 self.output.write(self.gpstype.report(self)) 194 195# Reporting classes need to have a report() method returning a string 196# that is a sentence (or possibly several sentences) reporting the 197# state of the simulation. Presently we have only one, for NMEA 198# devices, but the point of the architecture is so that we could simulate 199# others - SirF, Evermore, whatever. 200 201 202MPS_TO_KNOTS = 1.9438445 # Meters per second to knots 203 204 205class NMEA(object): 206 "NMEA output generator." 207 208 def __init__(self): 209 self.sentences = ("RMC", "GGA",) 210 self.counter = 0 211 212 def add_checksum(self, mstr): 213 "Concatenate NMEA checksum and trailer to a string" 214 csum = 0 215 for (i, c) in enumerate(mstr): 216 if i == 0 and c == "$": 217 continue 218 csum ^= ord(c) 219 mstr += "*%02X\r\n" % csum 220 return mstr 221 222 def degtodm(self, angle): 223 "Decimal degrees to GPS-style, degrees first followed by minutes." 224 (fraction, _integer) = math.modf(angle) 225 return math.floor(angle) * 100 + fraction * 60 226 227 def GGA(self, sim): 228 "Emit GGA sentence describing the simulation state." 229 tm = time.gmtime(sim.ksv.time) 230 gga = "$GPGGA,%02d%02d%02d,%09.4f,%c,%010.4f,%c,%d,%02d," % ( 231 tm.tm_hour, 232 tm.tm_min, 233 tm.tm_sec, 234 self.degtodm(abs(sim.ksv.lat)), "SN"[sim.ksv.lat > 0], 235 self.degtodm(abs(sim.ksv.lon)), "WE"[sim.ksv.lon > 0], 236 sim.status, 237 sim.satellites_used) 238 # HDOP calculation goes here 239 gga += "," 240 if sim.mode == gps.MODE_3D: 241 gga += "%.1f,M" % self.ksv.lat 242 gga += "," 243 gga += "%.3f,M," % gpslib.wg84_separation(sim.ksv.lat, sim.ksv.lon) 244 # Magnetic variation goes here 245 # gga += "%3.2f,M," % mag_var 246 gga += ",," 247 # Time in seconds since last DGPS update goes here 248 gga += "," 249 # DGPS station ID goes here 250 return self.add_checksum(gga) 251 252 def GLL(self, sim): 253 "Emit GLL sentence describing the simulation state." 254 tm = time.gmtime(sim.ksv.time) 255 gll = "$GPLL,%09.4f,%c,%010.4f,%c,%02d%02d%02d,%s," % ( 256 self.degtodm(abs(sim.ksv.lat)), "SN"[sim.ksv.lat > 0], 257 self.degtodm(abs(sim.ksv.lon)), "WE"[sim.ksv.lon > 0], 258 tm.tm_hour, 259 tm.tm_min, 260 tm.tm_sec, 261 sim.validity, ) 262 # FAA mode indicator could go after these fields. 263 return self.add_checksum(gll) 264 265 def RMC(self, sim): 266 "Emit RMC sentence describing the simulation state." 267 tm = time.gmtime(sim.ksv.time) 268 rmc = \ 269 "GPRMC,%02d%02d%02d,%s,%09.4f,%c,%010.4f,%c,%.1f,%02d%02d%02d," % ( 270 tm.tm_hour, 271 tm.tm_min, 272 tm.tm_sec, 273 sim.validity, 274 self.degtodm(abs(sim.ksv.lat)), "SN"[sim.ksv.lat > 0], 275 self.degtodm(abs(sim.ksv.lon)), "WE"[sim.ksv.lon > 0], 276 sim.course * MPS_TO_KNOTS, 277 tm.tm_mday, 278 tm.tm_mon, 279 tm.tm_year % 100) 280 # Magnetic variation goes here 281 # rmc += "%3.2f,M," % mag_var 282 rmc += ",," 283 # FAA mode goes here 284 return self.add_checksum(rmc) 285 286 def ZDA(self, sim): 287 "Emit ZDA sentence describing the simulation state." 288 tm = time.gmtime(sim.ksv.time) 289 zda = "$GPZDA,%02d%2d%02d,%02d,%02d,%04d" % ( 290 tm.tm_hour, 291 tm.tm_min, 292 tm.tm_sec, 293 tm.tm_mday, 294 tm.tm_mon, 295 tm.tm_year, ) 296 # Local zone description, 00 to +- 13 hours, goes here 297 zda += "," 298 # Local zone minutes description goes here 299 zda += "," 300 return self.add_checksum(zda) 301 302 def report(self, sim): 303 "Report the simulation state." 304 out = "" 305 for sentence in self.sentences: 306 if isinstance(sentence, tuple): 307 (interval, sentence) = sentence 308 if self.counter % interval: 309 continue 310 out += getattr(self, sentence)(*[sim]) 311 self.counter += 1 312 return out 313 314# The very simple main line. 315 316 317if __name__ == "__main__": 318 try: 319 gpssim(NMEA).filter(sys.stdin, sys.stdout) 320 except gpssimException as e: 321 sys.stderr.write(repr(e) + "\n") 322 323# gpssim.py ends here. 324