1# This file is Copyright (c) 2010 by the GPSD project
2# SPDX-License-Identifier: BSD-2-clause
3"""
4A GPS simulator.
5
6This is proof-of-concept code, not production ready; some functions are stubs.
7"""
8import math
9import random
10import sys
11import time
12
13# pylint wants local modules last
14try:
15    import gps
16    import gpslib
17except ImportError as e:
18    sys.stderr.write(
19        "gpssim.py: can't load Python gps libraries -- check PYTHONPATH.\n")
20    sys.stderr.write("%s\n" % e)
21    sys.exit(1)
22
23# First, the mathematics.  We simulate a moving viewpoint on the Earth
24# and a satellite with specified orbital elements in the sky.
25
26
27class ksv(object):
28    "Kinematic state vector."
29
30    def __init__(self, time=0, lat=0, lon=0, alt=0, course=0,
31                 speed=0, climb=0, h_acc=0, v_acc=0):
32        self.time = time        # Seconds from epoch
33        self.lat = lat          # Decimal degrees
34        self.lon = lon          # Decimal degrees
35        self.alt = alt          # Meters
36        self.course = course    # Degrees from true North
37        self.speed = speed      # Meters per second
38        self.climb = climb      # Meters per second
39        self.h_acc = h_acc      # Meters per second per second
40        self.v_acc = v_acc      # Meters per second per second
41
42    def next(self, quantum=1):
43        "State after quantum."
44        self.time += quantum
45        avspeed = (2 * self.speed + self.h_acc * quantum) / 2
46        avclimb = (2 * self.climb + self.v_acc * quantum) / 2
47        self.alt += avclimb * quantum
48        self.speed += self.h_acc * quantum
49        self.climb += self.v_acc * quantum
50        distance = avspeed * quantum
51        # Formula from <http://williams.best.vwh.net/avform.htm#Rhumb>
52        # Initial point cannot be a pole, but GPS doesn't work at high.
53        # latitudes anyway so it would be OK to fail there.
54        # Seems to assume a spherical Earth, which means it's going
55        # to have a slight inaccuracy rising towards the poles.
56        # The if/then avoids 0/0 indeterminacies on E-W courses.
57        tc = gps.Deg2Rad(self.course)
58        lat = gps.Deg2Rad(self.lat)
59        lon = gps.Deg2Rad(self.lon)
60        lat += distance * math.cos(tc)
61        dphi = math.log(math.tan(lat / 2 + math.pi / 4) /
62                        math.tan(self.lat / 2 + math.pi / 4))
63        if abs(lat - self.lat) < math.sqrt(1e-15):
64            q = math.cos(self.lat)
65        else:
66            q = (lat - self.lat) / dphi
67        dlon = -distance * math.sin(tc) / q
68        self.lon = gps.Rad2Deg(math.mod(lon + dlon + math.pi, 2 * math.pi) -
69                               math.pi)
70        self.lat = gps.Rad2Deg(lat)
71
72# Satellite orbital elements are available at:
73# <http://www.ngs.noaa.gov/orbits/>
74# Orbital theory at:
75# <http://www.wolffdata.se/gps/gpshtml/anomalies.html>
76
77
78class satellite(object):
79    "Orbital elements of one satellite. PRESENTLY A STUB"
80
81    def __init__(self, prn):
82        self.prn = prn
83
84    def position(self, time):
85        "Return right ascension and declination of satellite,"
86        return
87
88# Next, the command interpreter.  This is an object that takes an
89# input source in the track description language, interprets it into
90# sammples that might be reported by a GPS, and calls a reporting
91# class to generate output.
92
93
94class gpssimException(BaseException):
95    def __init__(self, message, filename, lineno):
96        BaseException.__init__(self)
97        self.message = message
98        self.filename = filename
99        self.lineno = lineno
100
101    def __str__(self):
102        return '"%s", %d:' % (self.filename, self.lineno)
103
104
105class gpssim(object):
106    "Simulate a moving sensor, with skyview."
107    active_PRNs = list(range(1, 24 + 1)) + [134, ]
108
109    def __init__(self, outfmt):
110        self.ksv = ksv()
111        self.ephemeris = {}
112        # This sets up satellites at random.  Not really what we want.
113        for prn in gpssim.active_PRNs:
114            for (prn, _satellite) in list(self.ephemeris.items()):
115                self.skyview[prn] = (random.randint(-60, +61),
116                                     random.randint(0, 359))
117        self.have_ephemeris = False
118        self.channels = {}
119        self.outfmt = outfmt
120        self.status = gps.STATUS_NO_FIX
121        self.mode = gps.MODE_NO_FIX
122        self.validity = "V"
123        self.satellites_used = 0
124        self.filename = None
125        self.lineno = 0
126
127    def parse_tdl(self, line):
128        "Interpret one TDL directive."
129        line = line.strip()
130        if "#" in line:
131            line = line[:line.find("#")]
132        if line == '':
133            return
134        fields = line.split()
135        command = fields[0]
136        if command == "time":
137            self.ksv.time = gps.isotime(fields[1])
138        elif command == "location":
139            (self.lat, self.lon, self.alt) = list(map(float, fields[1:]))
140        elif command == "course":
141            self.ksv.time = float(fields[1])
142        elif command == "speed":
143            self.ksv.speed = float(fields[1])
144        elif command == "climb":
145            self.ksv.climb = float(fields[1])
146        elif command == "acceleration":
147            (self.ksv.h_acc, self.ksv.h_acc) = list(map(float, fields[1:]))
148        elif command == "snr":
149            self.channels[int(fields[1])] = float(fields[2])
150        elif command == "go":
151            self.go(int(fields[1]))
152        elif command == "status":
153            try:
154                code = fields[1]
155                self.status = {"no_fix": 0, "fix": 1, "dgps_fix": 2}[
156                    code.lower()]
157            except KeyError:
158                raise gpssimException("invalid status code '%s'" % code,
159                                      self.filename, self.lineno)
160        elif command == "mode":
161            try:
162                code = fields[1]
163                self.status = {"no_fix": 1, "2d": 2, "3d": 3}[code.lower()]
164            except KeyError:
165                raise gpssimException("invalid mode code '%s'" % code,
166                                      self.filename, self.lineno)
167        elif command == "satellites":
168            self.satellites_used = int(fields[1])
169        elif command == "validity":
170            self.validity = fields[1]
171        else:
172            raise gpssimException("unknown command '%s'" % fields[1],
173                                  self.filename, self.lineno)
174        # FIX-ME: add syntax for ephemeris elements
175        self.lineno += 1
176
177    def filter(self, inp, outp):
178        "Make this a filter for file-like objects."
179        self.filename = input.name
180        self.lineno = 1
181        self.output = outp
182        for line in inp:
183            self.execute(line)
184
185    def go(self, seconds):
186        "Run the simulation for a specified number of seconds."
187        for i in range(seconds):
188            next(self.ksv)
189            if self.have_ephemeris:
190                self.skyview = {}
191                for (prn, satellite) in list(self.ephemeris.items()):
192                    self.skyview[prn] = satellite.position(i)
193            self.output.write(self.gpstype.report(self))
194
195# Reporting classes need to have a report() method returning a string
196# that is a sentence (or possibly several sentences) reporting the
197# state of the simulation.  Presently we have only one, for NMEA
198# devices, but the point of the architecture is so that we could simulate
199# others - SirF, Evermore, whatever.
200
201
202MPS_TO_KNOTS = 1.9438445      # Meters per second to knots
203
204
205class NMEA(object):
206    "NMEA output generator."
207
208    def __init__(self):
209        self.sentences = ("RMC", "GGA",)
210        self.counter = 0
211
212    def add_checksum(self, mstr):
213        "Concatenate NMEA checksum and trailer to a string"
214        csum = 0
215        for (i, c) in enumerate(mstr):
216            if i == 0 and c == "$":
217                continue
218            csum ^= ord(c)
219        mstr += "*%02X\r\n" % csum
220        return mstr
221
222    def degtodm(self, angle):
223        "Decimal degrees to GPS-style, degrees first followed by minutes."
224        (fraction, _integer) = math.modf(angle)
225        return math.floor(angle) * 100 + fraction * 60
226
227    def GGA(self, sim):
228        "Emit GGA sentence describing the simulation state."
229        tm = time.gmtime(sim.ksv.time)
230        gga = "$GPGGA,%02d%02d%02d,%09.4f,%c,%010.4f,%c,%d,%02d," % (
231            tm.tm_hour,
232            tm.tm_min,
233            tm.tm_sec,
234            self.degtodm(abs(sim.ksv.lat)), "SN"[sim.ksv.lat > 0],
235            self.degtodm(abs(sim.ksv.lon)), "WE"[sim.ksv.lon > 0],
236            sim.status,
237            sim.satellites_used)
238        # HDOP calculation goes here
239        gga += ","
240        if sim.mode == gps.MODE_3D:
241            gga += "%.1f,M" % self.ksv.lat
242        gga += ","
243        gga += "%.3f,M," % gpslib.wg84_separation(sim.ksv.lat, sim.ksv.lon)
244        # Magnetic variation goes here
245        # gga += "%3.2f,M," % mag_var
246        gga += ",,"
247        # Time in seconds since last DGPS update goes here
248        gga += ","
249        # DGPS station ID goes here
250        return self.add_checksum(gga)
251
252    def GLL(self, sim):
253        "Emit GLL sentence describing the simulation state."
254        tm = time.gmtime(sim.ksv.time)
255        gll = "$GPLL,%09.4f,%c,%010.4f,%c,%02d%02d%02d,%s," % (
256            self.degtodm(abs(sim.ksv.lat)), "SN"[sim.ksv.lat > 0],
257            self.degtodm(abs(sim.ksv.lon)), "WE"[sim.ksv.lon > 0],
258            tm.tm_hour,
259            tm.tm_min,
260            tm.tm_sec,
261            sim.validity, )
262        # FAA mode indicator could go after these fields.
263        return self.add_checksum(gll)
264
265    def RMC(self, sim):
266        "Emit RMC sentence describing the simulation state."
267        tm = time.gmtime(sim.ksv.time)
268        rmc = \
269            "GPRMC,%02d%02d%02d,%s,%09.4f,%c,%010.4f,%c,%.1f,%02d%02d%02d," % (
270                tm.tm_hour,
271                tm.tm_min,
272                tm.tm_sec,
273                sim.validity,
274                self.degtodm(abs(sim.ksv.lat)), "SN"[sim.ksv.lat > 0],
275                self.degtodm(abs(sim.ksv.lon)), "WE"[sim.ksv.lon > 0],
276                sim.course * MPS_TO_KNOTS,
277                tm.tm_mday,
278                tm.tm_mon,
279                tm.tm_year % 100)
280        # Magnetic variation goes here
281        # rmc += "%3.2f,M," % mag_var
282        rmc += ",,"
283        # FAA mode goes here
284        return self.add_checksum(rmc)
285
286    def ZDA(self, sim):
287        "Emit ZDA sentence describing the simulation state."
288        tm = time.gmtime(sim.ksv.time)
289        zda = "$GPZDA,%02d%2d%02d,%02d,%02d,%04d" % (
290            tm.tm_hour,
291            tm.tm_min,
292            tm.tm_sec,
293            tm.tm_mday,
294            tm.tm_mon,
295            tm.tm_year, )
296        # Local zone description, 00 to +- 13 hours, goes here
297        zda += ","
298        # Local zone minutes description goes here
299        zda += ","
300        return self.add_checksum(zda)
301
302    def report(self, sim):
303        "Report the simulation state."
304        out = ""
305        for sentence in self.sentences:
306            if isinstance(sentence, tuple):
307                (interval, sentence) = sentence
308                if self.counter % interval:
309                    continue
310            out += getattr(self, sentence)(*[sim])
311        self.counter += 1
312        return out
313
314# The very simple main line.
315
316
317if __name__ == "__main__":
318    try:
319        gpssim(NMEA).filter(sys.stdin, sys.stdout)
320    except gpssimException as e:
321        sys.stderr.write(repr(e) + "\n")
322
323# gpssim.py ends here.
324