1#!/usr/bin/env python3 2 3# -*- coding: utf-8 -*- 4from __future__ import division 5from __future__ import print_function 6import traceback 7from math import pi, exp, atan, log, tan, sqrt 8import sys 9import os 10import json 11import threading 12from ast import literal_eval 13from collections import OrderedDict 14from asciimatics.event import KeyboardEvent 15from asciimatics.renderers import ColourImageFile 16from asciimatics.widgets import Effect, Button, Text, Layout, Frame, Divider, PopUpDialog 17from asciimatics.scene import Scene 18from asciimatics.screen import Screen 19from asciimatics.exceptions import ResizeScreenError, StopApplication, InvalidFields 20try: 21 import mapbox_vector_tile 22 import requests 23 from google.protobuf.message import DecodeError 24except ImportError: 25 print("Run `pip install mapbox-vector-tile protobuf requests` to fix your dependencies.") 26 print("See https://github.com/Toblerity/Shapely#installing-shapely-16b2 for Shapely install.") 27 sys.exit(0) 28 29# Global constants for the applications 30# Replace `_KEY` with the free one that you get from signing up with www.mapbox.com 31_KEY = "" 32_VECTOR_URL = \ 33 "http://a.tiles.mapbox.com/v4/mapbox.mapbox-streets-v7/{}/{}/{}.mvt?access_token={}" 34_IMAGE_URL = \ 35 "https://api.mapbox.com/styles/v1/mapbox/satellite-v9/tiles/256/{}/{}/{}?access_token={}" 36_START_SIZE = 64 37_ZOOM_IN_SIZE = _START_SIZE * 2 38_ZOOM_OUT_SIZE = _START_SIZE // 2 39_ZOOM_ANIMATION_STEPS = 6 40_ZOOM_STEP = exp(log(2) / _ZOOM_ANIMATION_STEPS) 41_CACHE_SIZE = 180 42_HELP = """ 43You can moved around using the cursor keys. To jump to any location in the world, press Enter and \ 44then fill in the longitude and latitude of the location and press 'OK'. 45 46To zoom in and out use '+'/'-'. To zoom all the way in/out, press '9'/'0'. 47 48To swap between satellite and vector views, press 'T'. To quit, press 'Q'. 49""" 50 51 52class EnterLocation(Frame): 53 """Form to enter a new desired location to display on the map.""" 54 def __init__(self, screen, longitude, latitude, on_ok): 55 super(EnterLocation, self).__init__( 56 screen, 7, 40, data={"long": str(longitude), "lat": str(latitude)}, name="loc", 57 title="Enter New Location", is_modal=True) 58 self._on_ok = on_ok 59 layout = Layout([1, 18, 1]) 60 self.add_layout(layout) 61 layout.add_widget(Divider(draw_line=False), 1) 62 layout.add_widget(Text(label="Longitude:", name="long", validator=r"^[-]?\d+?\.\d+?$"), 1) 63 layout.add_widget(Text(label="Latitude:", name="lat", validator=r"^[-]?\d+?\.\d+?$"), 1) 64 layout.add_widget(Divider(draw_line=False), 1) 65 layout2 = Layout([1, 1, 1]) 66 self.add_layout(layout2) 67 layout2.add_widget(Button("OK", self._ok), 1) 68 layout2.add_widget(Button("Cancel", self._cancel), 2) 69 self.fix() 70 71 def _ok(self): 72 try: 73 self.save(validate=True) 74 except InvalidFields: 75 return 76 self._on_ok(self) 77 self._scene.remove_effect(self) 78 79 def _cancel(self): 80 self._scene.remove_effect(self) 81 82 83class Map(Effect): 84 """Effect to display a satellite image or vector map of the world.""" 85 86 # Colour palettes 87 _256_PALETTE = { 88 "landuse": 193, 89 "water": 153, 90 "waterway": 153, 91 "marine_label": 12, 92 "admin": 7, 93 "country_label": 9, 94 "state_label": 1, 95 "place_label": 0, 96 "building": 252, 97 "road": 15, 98 "poi_label": 8 99 } 100 _16_PALETTE = { 101 "landuse": Screen.COLOUR_GREEN, 102 "water": Screen.COLOUR_BLUE, 103 "waterway": Screen.COLOUR_BLUE, 104 "marine_label": Screen.COLOUR_BLUE, 105 "admin": Screen.COLOUR_WHITE, 106 "country_label": Screen.COLOUR_RED, 107 "state_label": Screen.COLOUR_RED, 108 "place_label": Screen.COLOUR_YELLOW, 109 "building": Screen.COLOUR_WHITE, 110 "road": Screen.COLOUR_WHITE, 111 "poi_label": Screen.COLOUR_RED 112 } 113 114 def __init__(self, screen): 115 super(Map, self).__init__(screen) 116 # Current state of the map 117 self._screen = screen 118 self._zoom = 0 119 self._latitude = 51.4778 120 self._longitude = -0.0015 121 self._tiles = OrderedDict() 122 self._size = _START_SIZE 123 self._satellite = False 124 125 # Desired viewing location and animation flags 126 self._desired_zoom = self._zoom 127 self._desired_latitude = self._latitude 128 self._desired_longitude = self._longitude 129 self._next_update = 100000 130 131 # State for the background thread which reads in the tiles 132 self._running = True 133 self._updated = threading.Event() 134 self._updated.set() 135 self._oops = None 136 self._thread = threading.Thread(target=self._get_tiles) 137 self._thread.daemon = True 138 self._thread.start() 139 140 def _scale_coords(self, x, y, extent, xo, yo): 141 """Convert from tile coordinates to "pixels" - i.e. text characters.""" 142 return xo + (x * self._size * 2 / extent), yo + ((extent - y) * self._size / extent) 143 144 def _convert_longitude(self, longitude): 145 """Convert from longitude to the x position in overall map.""" 146 return int((180 + longitude) * (2 ** self._zoom) * self._size / 360) 147 148 def _convert_latitude(self, latitude): 149 """Convert from latitude to the y position in overall map.""" 150 return int((180 - (180 / pi * log(tan( 151 pi / 4 + latitude * pi / 360)))) * (2 ** self._zoom) * self._size / 360) 152 153 def _inc_lat(self, latitude, delta): 154 """Shift the latitude by the required number of pixels (i.e. text lines).""" 155 y = self._convert_latitude(latitude) 156 y += delta 157 return 360 / pi * atan( 158 exp((180 - y * 360 / (2 ** self._zoom) / self._size) * pi / 180)) - 90 159 160 def _get_satellite_tile(self, x_tile, y_tile, z_tile): 161 """Load up a single satellite image tile.""" 162 cache_file = "mapscache/{}.{}.{}.jpg".format(z_tile, x_tile, y_tile) 163 if cache_file not in self._tiles: 164 if not os.path.isfile(cache_file): 165 url = _IMAGE_URL.format(z_tile, x_tile, y_tile, _KEY) 166 data = requests.get(url).content 167 with open(cache_file, 'wb') as f: 168 f.write(data) 169 self._tiles[cache_file] = [ 170 x_tile, y_tile, z_tile, 171 ColourImageFile(self._screen, cache_file, height=_START_SIZE, dither=True, 172 uni=self._screen.unicode_aware), 173 True] 174 if len(self._tiles) > _CACHE_SIZE: 175 self._tiles.popitem(False) 176 self._screen.force_update() 177 178 def _get_vector_tile(self, x_tile, y_tile, z_tile): 179 """Load up a single vector tile.""" 180 cache_file = "mapscache/{}.{}.{}.json".format(z_tile, x_tile, y_tile) 181 if cache_file not in self._tiles: 182 if os.path.isfile(cache_file): 183 with open(cache_file, 'rb') as f: 184 tile = json.loads(f.read().decode('utf-8')) 185 else: 186 url = _VECTOR_URL.format(z_tile, x_tile, y_tile, _KEY) 187 data = requests.get(url).content 188 try: 189 tile = mapbox_vector_tile.decode(data) 190 with open(cache_file, mode='w') as f: 191 json.dump(literal_eval(repr(tile)), f) 192 except DecodeError: 193 tile = None 194 if tile: 195 self._tiles[cache_file] = [x_tile, y_tile, z_tile, tile, False] 196 if len(self._tiles) > _CACHE_SIZE: 197 self._tiles.popitem(False) 198 self._screen.force_update() 199 200 def _get_tiles(self): 201 """Background thread to download map tiles as required.""" 202 while self._running: 203 self._updated.wait() 204 self._updated.clear() 205 206 # Save off current view and find the nearest tile. 207 satellite = self._satellite 208 zoom = self._zoom 209 size = self._size 210 n = 2 ** zoom 211 x_offset = self._convert_longitude(self._longitude) 212 y_offset = self._convert_latitude(self._latitude) 213 214 # Get the visible tiles around that location - getting most relevant first 215 for x, y, z in [(0, 0, 0), (1, 0, 0), (0, 1, 0), (-1, 0, 0), (0, -1, 0), 216 (0, 0, -1), (0, 0, 1), 217 (1, 1, 0), (1, -1, 0), (-1, -1, 0), (-1, 1, 0)]: 218 # Restart if we've already zoomed to another level 219 if self._zoom != zoom: 220 break 221 222 # Don't get tile if it falls off the grid 223 x_tile = int(x_offset // size) + x 224 y_tile = int(y_offset // size) + y 225 z_tile = zoom + z 226 if (x_tile < 0 or x_tile >= n or y_tile < 0 or y_tile >= n or 227 z_tile < 0 or z_tile > 20): 228 continue 229 # noinspection PyBroadException 230 try: 231 232 # Don't bother rendering if the tile is not visible 233 top = y_tile * size - y_offset + self._screen.height // 2 234 left = (x_tile * size - x_offset + self._screen.width // 4) * 2 235 if z == 0 and (left > self._screen.width or left + self._size * 2 < 0 or 236 top > self._screen.height or top + self._size < 0): 237 continue 238 239 if satellite: 240 self._get_satellite_tile(x_tile, y_tile, z_tile) 241 else: 242 self._get_vector_tile(x_tile, y_tile, z_tile) 243 # pylint: disable=broad-except 244 except Exception: 245 self._oops = "{} - tile loc: {} {} {}".format( 246 traceback.format_exc(), x_tile, y_tile, z_tile) 247 248 # Generally refresh screen after we've downloaded everything 249 self._screen.force_update() 250 251 def _get_features(self): 252 """Decide which layers to render based on current zoom level and view type.""" 253 if self._satellite: 254 return [("water", [], [])] 255 elif self._zoom <= 2: 256 return [ 257 ("water", [], []), 258 ("marine_label", [], [1]), 259 ] 260 elif self._zoom <= 7: 261 return [ 262 ("admin", [], []), 263 ("water", [], []), 264 ("road", ["motorway"], []), 265 ("country_label", [], []), 266 ("marine_label", [], [1]), 267 ("state_label", [], []), 268 ("place_label", [], ["city", "town"]), 269 ] 270 elif self._zoom <= 10: 271 return [ 272 ("admin", [], []), 273 ("water", [], []), 274 ("road", ["motorway", "motorway_link", "trunk"], []), 275 ("country_label", [], []), 276 ("marine_label", [], [1]), 277 ("state_label", [], []), 278 ("place_label", [], ["city", "town"]), 279 ] 280 else: 281 return [ 282 ("landuse", ["agriculture", "grass", "park"], []), 283 ("water", [], []), 284 ("waterway", ["river", "canal"], []), 285 ("building", [], []), 286 ("road", 287 ["motorway", "motorway_link", "trunk", "primary", "secondary"] 288 if self._zoom <= 14 else 289 ["motorway", "motorway_link", "trunk", "primary", "secondary", "tertiary", 290 "link", "street", "tunnel"], 291 []), 292 ("poi_label", [], []), 293 ] 294 295 def _draw_lines_internal(self, coords, colour, bg): 296 """Helper to draw lines connecting a set of nodes that are scaled for the Screen.""" 297 for i, (x, y) in enumerate(coords): 298 if i == 0: 299 self._screen.move(x, y) 300 else: 301 self._screen.draw(x, y, colour=colour, bg=bg, thin=True) 302 303 def _draw_polygons(self, feature, bg, colour, extent, polygons, xo, yo): 304 """Draw a set of polygons from a vector tile.""" 305 coords = [] 306 for polygon in polygons: 307 coords.append([self._scale_coords(x, y, extent, xo, yo) for x, y in polygon]) 308 # Polygons are expensive to draw and the buildings layer is huge - so we convert to 309 # lines in order to process updates fast enough to animate. 310 if "type" in feature["properties"] and "building" in feature["properties"]["type"]: 311 for line in coords: 312 self._draw_lines_internal(line, colour, bg) 313 else: 314 self._screen.fill_polygon(coords, colour=colour, bg=bg) 315 316 def _draw_lines(self, bg, colour, extent, line, xo, yo): 317 """Draw a set of lines from a vector tile.""" 318 coords = [self._scale_coords(x, y, extent, xo, yo) for x, y in line] 319 self._draw_lines_internal(coords, colour, bg) 320 321 def _draw_feature(self, feature, extent, colour, bg, xo, yo): 322 """Draw a single feature from a layer in a vector tile.""" 323 geometry = feature["geometry"] 324 if geometry["type"] == "Polygon": 325 self._draw_polygons(feature, bg, colour, extent, geometry["coordinates"], xo, yo) 326 elif feature["geometry"]["type"] == "MultiPolygon": 327 for multi_polygon in geometry["coordinates"]: 328 self._draw_polygons(feature, bg, colour, extent, multi_polygon, xo, yo) 329 elif feature["geometry"]["type"] == "LineString": 330 self._draw_lines(bg, colour, extent, geometry["coordinates"], xo, yo) 331 elif feature["geometry"]["type"] == "MultiLineString": 332 for line in geometry["coordinates"]: 333 self._draw_lines(bg, colour, extent, line, xo, yo) 334 elif feature["geometry"]["type"] == "Point": 335 x, y = self._scale_coords( 336 geometry["coordinates"][0], geometry["coordinates"][1], extent, xo, yo) 337 text = u" {} ".format(feature["properties"]["name_en"]) 338 self._screen.print_at(text, int(x - len(text) / 2), int(y), colour=colour, bg=bg) 339 340 def _draw_tile_layer(self, tile, layer_name, c_filters, colour, t_filters, x, y, bg): 341 """Draw the visible geometry in the specified map tile.""" 342 # Don't bother rendering if the tile is not visible 343 left = (x + self._screen.width // 4) * 2 344 top = y + self._screen.height // 2 345 if (left > self._screen.width or left + self._size * 2 < 0 or 346 top > self._screen.height or top + self._size < 0): 347 return 0 348 349 # Not all layers are available in every tile. 350 try: 351 _layer = tile[layer_name] 352 _extent = float(_layer["extent"]) 353 except KeyError: 354 return 0 355 356 for _feature in _layer["features"]: 357 try: 358 if c_filters and _feature["properties"]["class"] not in c_filters: 359 continue 360 if (t_filters and _feature["type"] not in t_filters and 361 _feature["properties"]["type"] not in t_filters): 362 continue 363 self._draw_feature( 364 _feature, _extent, colour, bg, 365 (x + self._screen.width // 4) * 2, y + self._screen.height // 2) 366 except KeyError: 367 pass 368 return 1 369 370 def _draw_satellite_tile(self, tile, x, y): 371 """Draw a satellite image tile to screen.""" 372 image, colours = tile.rendered_text 373 for (i, line) in enumerate(image): 374 self._screen.paint(line, x, y + i, colour_map=colours[i]) 375 return 1 376 377 def _draw_tiles(self, x_offset, y_offset, bg): 378 """Render all visible tiles a layer at a time.""" 379 count = 0 380 for layer_name, c_filters, t_filters in self._get_features(): 381 colour = (self._256_PALETTE[layer_name] 382 if self._screen.colours >= 256 else self._16_PALETTE[layer_name]) 383 for x, y, z, tile, satellite in sorted(self._tiles.values(), key=lambda k: k[0]): 384 # Don't draw the wrong type or zoom of tile. 385 if satellite != self._satellite or z != self._zoom: 386 continue 387 388 # Convert tile location into pixels and draw the tile. 389 x *= self._size 390 y *= self._size 391 if satellite: 392 count += self._draw_satellite_tile( 393 tile, 394 int((x-x_offset + self._screen.width // 4) * 2), 395 int(y-y_offset + self._screen.height // 2)) 396 else: 397 count += self._draw_tile_layer(tile, layer_name, c_filters, colour, t_filters, 398 x - x_offset, y - y_offset, bg) 399 return count 400 401 def _zoom_map(self, zoom_out=True): 402 """Animate the zoom in/out as appropriate for the displayed map tile.""" 403 size_step = 1 / _ZOOM_STEP if zoom_out else _ZOOM_STEP 404 self._next_update = 1 405 if self._satellite: 406 size_step **= _ZOOM_ANIMATION_STEPS 407 self._size *= size_step 408 if self._size <= _ZOOM_OUT_SIZE: 409 if self._zoom > 0: 410 self._zoom -= 1 411 self._size = _START_SIZE 412 else: 413 self._size = _ZOOM_OUT_SIZE 414 elif self._size >= _ZOOM_IN_SIZE: 415 if self._zoom < 20: 416 self._zoom += 1 417 self._size = _START_SIZE 418 else: 419 self._size = _ZOOM_IN_SIZE 420 421 def _move_to_desired_location(self): 422 """Animate movement to desired location on map.""" 423 self._next_update = 100000 424 x_start = self._convert_longitude(self._longitude) 425 y_start = self._convert_latitude(self._latitude) 426 x_end = self._convert_longitude(self._desired_longitude) 427 y_end = self._convert_latitude(self._desired_latitude) 428 if sqrt((x_end - x_start) ** 2 + (y_end - y_start) ** 2) > _START_SIZE // 4: 429 self._zoom_map(True) 430 elif self._zoom != self._desired_zoom: 431 self._zoom_map(self._desired_zoom < self._zoom) 432 if self._longitude != self._desired_longitude: 433 self._next_update = 1 434 if self._desired_longitude < self._longitude: 435 self._longitude = max(self._longitude - 360 / 2 ** self._zoom / self._size * 2, 436 self._desired_longitude) 437 else: 438 self._longitude = min(self._longitude + 360 / 2 ** self._zoom / self._size * 2, 439 self._desired_longitude) 440 if self._latitude != self._desired_latitude: 441 self._next_update = 1 442 if self._desired_latitude < self._latitude: 443 self._latitude = max(self._inc_lat(self._latitude, 2), self._desired_latitude) 444 else: 445 self._latitude = min(self._inc_lat(self._latitude, -2), self._desired_latitude) 446 if self._next_update == 1: 447 self._updated.set() 448 449 def _update(self, frame_no): 450 """Draw the latest set of tiles to the Screen.""" 451 # Check for any fatal errors from the background thread and quit if we hit anything. 452 if self._oops: 453 raise RuntimeError(self._oops) 454 455 # Calculate new positions for animated movement. 456 self._move_to_desired_location() 457 458 # Re-draw the tiles - if we have any suitable ones downloaded. 459 count = 0 460 x_offset = self._convert_longitude(self._longitude) 461 y_offset = self._convert_latitude(self._latitude) 462 if self._tiles: 463 # Clear the area first. 464 bg = 253 if self._screen.unicode_aware and self._screen.colours >= 256 else 0 465 for y in range(self._screen.height): 466 self._screen.print_at("." * self._screen.width, 0, y, colour=bg, bg=bg) 467 468 # Now draw all the available tiles. 469 count = self._draw_tiles(x_offset, y_offset, bg) 470 471 # Just a few pointers on what the user should do... 472 if count == 0: 473 self._screen.centre(" Loading - please wait... ", self._screen.height // 2, 1) 474 475 self._screen.centre("Press '?' for help.", 0, 1) 476 if _KEY == "": 477 footer = "Using local cached data - go to https://www.mapbox.com/ and get a free key." 478 else: 479 footer = u"Zoom: {} Location: {:.6}, {:.6} Maps: © Mapbox, © OpenStreetMap".format( 480 self._zoom, self._longitude, self._latitude) 481 self._screen.centre(footer, self._screen.height - 1, 1) 482 483 return count 484 485 def process_event(self, event): 486 """User input for the main map view.""" 487 if isinstance(event, KeyboardEvent): 488 if event.key_code in [Screen.ctrl("m"), Screen.ctrl("j")]: 489 self._scene.add_effect( 490 EnterLocation( 491 self._screen, self._longitude, self._latitude, self._on_new_location)) 492 elif event.key_code in [ord('q'), ord('Q'), Screen.ctrl("c")]: 493 raise StopApplication("User quit") 494 elif event.key_code in [ord('t'), ord('T')]: 495 self._satellite = not self._satellite 496 if self._satellite: 497 self._size = _START_SIZE 498 elif event.key_code == ord("?"): 499 self._scene.add_effect(PopUpDialog(self._screen, _HELP, ["OK"])) 500 elif event.key_code == ord("+") and self._zoom <= 20: 501 if self._desired_zoom < 20: 502 self._desired_zoom += 1 503 elif event.key_code == ord("-") and self._zoom >= 0: 504 if self._desired_zoom > 0: 505 self._desired_zoom -= 1 506 elif event.key_code == ord("0"): 507 self._desired_zoom = 0 508 elif event.key_code == ord("9"): 509 self._desired_zoom = 20 510 elif event.key_code == Screen.KEY_LEFT: 511 self._desired_longitude -= 360 / 2 ** self._zoom / self._size * 10 512 elif event.key_code == Screen.KEY_RIGHT: 513 self._desired_longitude += 360 / 2 ** self._zoom / self._size * 10 514 elif event.key_code == Screen.KEY_UP: 515 self._desired_latitude = self._inc_lat(self._desired_latitude, -self._size / 10) 516 elif event.key_code == Screen.KEY_DOWN: 517 self._desired_latitude = self._inc_lat(self._desired_latitude, self._size / 10) 518 else: 519 return 520 521 # Trigger a reload of the tiles and redraw map 522 self._updated.set() 523 self._screen.force_update() 524 525 def _on_new_location(self, form): 526 """Set a new desired location entered in the pop-up form.""" 527 self._desired_longitude = float(form.data["long"]) 528 self._desired_latitude = float(form.data["lat"]) 529 self._desired_zoom = 13 530 self._screen.force_update() 531 532 # noinspection PyUnusedLocal 533 # pylint: disable=unused-argument 534 def clone(self, new_screen, new_scene): 535 # On resize, there will be a new Map - kill the thread in this one. 536 self._running = False 537 self._updated.set() 538 539 @property 540 def frame_update_count(self): 541 # Only redraw if required - as determined by the update logic. 542 return self._next_update 543 544 @property 545 def stop_frame(self): 546 # No specific end point for this Effect. Carry on running forever. 547 return 0 548 549 def reset(self): 550 # Nothing special to do. Just need this to satisfy the ABC. 551 pass 552 553 554def demo(screen, scene): 555 screen.play([Scene([Map(screen)], -1)], stop_on_resize=True, start_scene=scene) 556 557 558if __name__ == "__main__": 559 last_scene = None 560 while True: 561 try: 562 Screen.wrapper(demo, catch_interrupt=False, arguments=[last_scene]) 563 sys.exit(0) 564 except ResizeScreenError as e: 565 last_scene = e.scene 566