1#!/usr/local/bin/python3.8 2# 3# Electrum - lightweight Bitcoin client 4# Copyright (C) 2015 Thomas Voegtlin 5# 6# Permission is hereby granted, free of charge, to any person 7# obtaining a copy of this software and associated documentation files 8# (the "Software"), to deal in the Software without restriction, 9# including without limitation the rights to use, copy, modify, merge, 10# publish, distribute, sublicense, and/or sell copies of the Software, 11# and to permit persons to whom the Software is furnished to do so, 12# subject to the following conditions: 13# 14# The above copyright notice and this permission notice shall be 15# included in all copies or substantial portions of the Software. 16# 17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 21# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 22# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 23# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 24# SOFTWARE. 25import asyncio 26import ast 27import os 28import time 29import traceback 30import sys 31import threading 32from typing import Dict, Optional, Tuple, Iterable, Callable, Union, Sequence, Mapping, TYPE_CHECKING 33from base64 import b64decode, b64encode 34from collections import defaultdict 35import json 36 37import aiohttp 38from aiohttp import web, client_exceptions 39from aiorpcx import TaskGroup, timeout_after, TaskTimeout, ignore_after 40 41from . import util 42from .network import Network 43from .util import (json_decode, to_bytes, to_string, profiler, standardize_path, constant_time_compare) 44from .invoices import PR_PAID, PR_EXPIRED 45from .util import log_exceptions, ignore_exceptions, randrange 46from .wallet import Wallet, Abstract_Wallet 47from .storage import WalletStorage 48from .wallet_db import WalletDB 49from .commands import known_commands, Commands 50from .simple_config import SimpleConfig 51from .exchange_rate import FxThread 52from .logging import get_logger, Logger 53 54if TYPE_CHECKING: 55 from electrum import gui 56 57 58_logger = get_logger(__name__) 59 60 61class DaemonNotRunning(Exception): 62 pass 63 64def get_lockfile(config: SimpleConfig): 65 return os.path.join(config.path, 'daemon') 66 67 68def remove_lockfile(lockfile): 69 os.unlink(lockfile) 70 71 72def get_file_descriptor(config: SimpleConfig): 73 '''Tries to create the lockfile, using O_EXCL to 74 prevent races. If it succeeds it returns the FD. 75 Otherwise try and connect to the server specified in the lockfile. 76 If this succeeds, the server is returned. Otherwise remove the 77 lockfile and try again.''' 78 lockfile = get_lockfile(config) 79 while True: 80 try: 81 return os.open(lockfile, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644) 82 except OSError: 83 pass 84 try: 85 request(config, 'ping') 86 return None 87 except DaemonNotRunning: 88 # Couldn't connect; remove lockfile and try again. 89 remove_lockfile(lockfile) 90 91 92 93def request(config: SimpleConfig, endpoint, args=(), timeout=60): 94 lockfile = get_lockfile(config) 95 while True: 96 create_time = None 97 try: 98 with open(lockfile) as f: 99 (host, port), create_time = ast.literal_eval(f.read()) 100 except Exception: 101 raise DaemonNotRunning() 102 rpc_user, rpc_password = get_rpc_credentials(config) 103 server_url = 'http://%s:%d' % (host, port) 104 auth = aiohttp.BasicAuth(login=rpc_user, password=rpc_password) 105 loop = asyncio.get_event_loop() 106 async def request_coroutine(): 107 async with aiohttp.ClientSession(auth=auth) as session: 108 c = util.JsonRPCClient(session, server_url) 109 return await c.request(endpoint, *args) 110 try: 111 fut = asyncio.run_coroutine_threadsafe(request_coroutine(), loop) 112 return fut.result(timeout=timeout) 113 except aiohttp.client_exceptions.ClientConnectorError as e: 114 _logger.info(f"failed to connect to JSON-RPC server {e}") 115 if not create_time or create_time < time.time() - 1.0: 116 raise DaemonNotRunning() 117 # Sleep a bit and try again; it might have just been started 118 time.sleep(1.0) 119 120 121def get_rpc_credentials(config: SimpleConfig) -> Tuple[str, str]: 122 rpc_user = config.get('rpcuser', None) 123 rpc_password = config.get('rpcpassword', None) 124 if rpc_user == '': 125 rpc_user = None 126 if rpc_password == '': 127 rpc_password = None 128 if rpc_user is None or rpc_password is None: 129 rpc_user = 'user' 130 bits = 128 131 nbytes = bits // 8 + (bits % 8 > 0) 132 pw_int = randrange(pow(2, bits)) 133 pw_b64 = b64encode( 134 pw_int.to_bytes(nbytes, 'big'), b'-_') 135 rpc_password = to_string(pw_b64, 'ascii') 136 config.set_key('rpcuser', rpc_user) 137 config.set_key('rpcpassword', rpc_password, save=True) 138 return rpc_user, rpc_password 139 140 141class AuthenticationError(Exception): 142 pass 143 144class AuthenticationInvalidOrMissing(AuthenticationError): 145 pass 146 147class AuthenticationCredentialsInvalid(AuthenticationError): 148 pass 149 150class AuthenticatedServer(Logger): 151 152 def __init__(self, rpc_user, rpc_password): 153 Logger.__init__(self) 154 self.rpc_user = rpc_user 155 self.rpc_password = rpc_password 156 self.auth_lock = asyncio.Lock() 157 self._methods = {} # type: Dict[str, Callable] 158 159 def register_method(self, f): 160 assert f.__name__ not in self._methods, f"name collision for {f.__name__}" 161 self._methods[f.__name__] = f 162 163 async def authenticate(self, headers): 164 if self.rpc_password == '': 165 # RPC authentication is disabled 166 return 167 auth_string = headers.get('Authorization', None) 168 if auth_string is None: 169 raise AuthenticationInvalidOrMissing('CredentialsMissing') 170 basic, _, encoded = auth_string.partition(' ') 171 if basic != 'Basic': 172 raise AuthenticationInvalidOrMissing('UnsupportedType') 173 encoded = to_bytes(encoded, 'utf8') 174 credentials = to_string(b64decode(encoded), 'utf8') 175 username, _, password = credentials.partition(':') 176 if not (constant_time_compare(username, self.rpc_user) 177 and constant_time_compare(password, self.rpc_password)): 178 await asyncio.sleep(0.050) 179 raise AuthenticationCredentialsInvalid('Invalid Credentials') 180 181 async def handle(self, request): 182 async with self.auth_lock: 183 try: 184 await self.authenticate(request.headers) 185 except AuthenticationInvalidOrMissing: 186 return web.Response(headers={"WWW-Authenticate": "Basic realm=Electrum"}, 187 text='Unauthorized', status=401) 188 except AuthenticationCredentialsInvalid: 189 return web.Response(text='Forbidden', status=403) 190 try: 191 request = await request.text() 192 request = json.loads(request) 193 method = request['method'] 194 _id = request['id'] 195 params = request.get('params', []) # type: Union[Sequence, Mapping] 196 if method not in self._methods: 197 raise Exception(f"attempting to use unregistered method: {method}") 198 f = self._methods[method] 199 except Exception as e: 200 self.logger.exception("invalid request") 201 return web.Response(text='Invalid Request', status=500) 202 response = { 203 'id': _id, 204 'jsonrpc': '2.0', 205 } 206 try: 207 if isinstance(params, dict): 208 response['result'] = await f(**params) 209 else: 210 response['result'] = await f(*params) 211 except BaseException as e: 212 self.logger.exception("internal error while executing RPC") 213 response['error'] = { 214 'code': 1, 215 'message': str(e), 216 } 217 return web.json_response(response) 218 219 220class CommandsServer(AuthenticatedServer): 221 222 def __init__(self, daemon, fd): 223 rpc_user, rpc_password = get_rpc_credentials(daemon.config) 224 AuthenticatedServer.__init__(self, rpc_user, rpc_password) 225 self.daemon = daemon 226 self.fd = fd 227 self.config = daemon.config 228 self.host = self.config.get('rpchost', '127.0.0.1') 229 self.port = self.config.get('rpcport', 0) 230 self.app = web.Application() 231 self.app.router.add_post("/", self.handle) 232 self.register_method(self.ping) 233 self.register_method(self.gui) 234 self.cmd_runner = Commands(config=self.config, network=self.daemon.network, daemon=self.daemon) 235 for cmdname in known_commands: 236 self.register_method(getattr(self.cmd_runner, cmdname)) 237 self.register_method(self.run_cmdline) 238 239 async def run(self): 240 self.runner = web.AppRunner(self.app) 241 await self.runner.setup() 242 site = web.TCPSite(self.runner, self.host, self.port) 243 await site.start() 244 socket = site._server.sockets[0] 245 os.write(self.fd, bytes(repr((socket.getsockname(), time.time())), 'utf8')) 246 os.close(self.fd) 247 248 async def ping(self): 249 return True 250 251 async def gui(self, config_options): 252 if self.daemon.gui_object: 253 if hasattr(self.daemon.gui_object, 'new_window'): 254 path = self.config.get_wallet_path(use_gui_last_wallet=True) 255 self.daemon.gui_object.new_window(path, config_options.get('url')) 256 response = "ok" 257 else: 258 response = "error: current GUI does not support multiple windows" 259 else: 260 response = "Error: Electrum is running in daemon mode. Please stop the daemon first." 261 return response 262 263 async def run_cmdline(self, config_options): 264 cmdname = config_options['cmd'] 265 cmd = known_commands[cmdname] 266 # arguments passed to function 267 args = [config_options.get(x) for x in cmd.params] 268 # decode json arguments 269 args = [json_decode(i) for i in args] 270 # options 271 kwargs = {} 272 for x in cmd.options: 273 kwargs[x] = config_options.get(x) 274 if 'wallet_path' in cmd.options: 275 kwargs['wallet_path'] = config_options.get('wallet_path') 276 elif 'wallet' in cmd.options: 277 kwargs['wallet'] = config_options.get('wallet_path') 278 func = getattr(self.cmd_runner, cmd.name) 279 # fixme: not sure how to retrieve message in jsonrpcclient 280 try: 281 result = await func(*args, **kwargs) 282 except Exception as e: 283 result = {'error':str(e)} 284 return result 285 286 287class WatchTowerServer(AuthenticatedServer): 288 289 def __init__(self, network, netaddress): 290 self.addr = netaddress 291 self.config = network.config 292 self.network = network 293 watchtower_user = self.config.get('watchtower_user', '') 294 watchtower_password = self.config.get('watchtower_password', '') 295 AuthenticatedServer.__init__(self, watchtower_user, watchtower_password) 296 self.lnwatcher = network.local_watchtower 297 self.app = web.Application() 298 self.app.router.add_post("/", self.handle) 299 self.register_method(self.get_ctn) 300 self.register_method(self.add_sweep_tx) 301 302 async def run(self): 303 self.runner = web.AppRunner(self.app) 304 await self.runner.setup() 305 site = web.TCPSite(self.runner, host=str(self.addr.host), port=self.addr.port, ssl_context=self.config.get_ssl_context()) 306 await site.start() 307 308 async def get_ctn(self, *args): 309 return await self.lnwatcher.sweepstore.get_ctn(*args) 310 311 async def add_sweep_tx(self, *args): 312 return await self.lnwatcher.sweepstore.add_sweep_tx(*args) 313 314 315class PayServer(Logger): 316 317 def __init__(self, daemon: 'Daemon', netaddress): 318 Logger.__init__(self) 319 self.addr = netaddress 320 self.daemon = daemon 321 self.config = daemon.config 322 self.pending = defaultdict(asyncio.Event) 323 util.register_callback(self.on_payment, ['request_status']) 324 325 @property 326 def wallet(self): 327 # FIXME specify wallet somehow? 328 return list(self.daemon.get_wallets().values())[0] 329 330 async def on_payment(self, evt, wallet, key, status): 331 if status == PR_PAID: 332 self.pending[key].set() 333 334 @ignore_exceptions 335 @log_exceptions 336 async def run(self): 337 root = self.config.get('payserver_root', '/r') 338 app = web.Application() 339 app.add_routes([web.get('/api/get_invoice', self.get_request)]) 340 app.add_routes([web.get('/api/get_status', self.get_status)]) 341 app.add_routes([web.get('/bip70/{key}.bip70', self.get_bip70_request)]) 342 app.add_routes([web.static(root, os.path.join(os.path.dirname(__file__), 'www'))]) 343 if self.config.get('payserver_allow_create_invoice'): 344 app.add_routes([web.post('/api/create_invoice', self.create_request)]) 345 runner = web.AppRunner(app) 346 await runner.setup() 347 site = web.TCPSite(runner, host=str(self.addr.host), port=self.addr.port, ssl_context=self.config.get_ssl_context()) 348 await site.start() 349 350 async def create_request(self, request): 351 params = await request.post() 352 wallet = self.wallet 353 if 'amount_sat' not in params or not params['amount_sat'].isdigit(): 354 raise web.HTTPUnsupportedMediaType() 355 amount = int(params['amount_sat']) 356 message = params['message'] or "donation" 357 payment_hash = wallet.lnworker.add_request( 358 amount_sat=amount, 359 message=message, 360 expiry=3600) 361 key = payment_hash.hex() 362 raise web.HTTPFound(self.root + '/pay?id=' + key) 363 364 async def get_request(self, r): 365 key = r.query_string 366 request = self.wallet.get_formatted_request(key) 367 return web.json_response(request) 368 369 async def get_bip70_request(self, r): 370 from .paymentrequest import make_request 371 key = r.match_info['key'] 372 request = self.wallet.get_request(key) 373 if not request: 374 return web.HTTPNotFound() 375 pr = make_request(self.config, request) 376 return web.Response(body=pr.SerializeToString(), content_type='application/bitcoin-paymentrequest') 377 378 async def get_status(self, request): 379 ws = web.WebSocketResponse() 380 await ws.prepare(request) 381 key = request.query_string 382 info = self.wallet.get_formatted_request(key) 383 if not info: 384 await ws.send_str('unknown invoice') 385 await ws.close() 386 return ws 387 if info.get('status') == PR_PAID: 388 await ws.send_str(f'paid') 389 await ws.close() 390 return ws 391 if info.get('status') == PR_EXPIRED: 392 await ws.send_str(f'expired') 393 await ws.close() 394 return ws 395 while True: 396 try: 397 await asyncio.wait_for(self.pending[key].wait(), 1) 398 break 399 except asyncio.TimeoutError: 400 # send data on the websocket, to keep it alive 401 await ws.send_str('waiting') 402 await ws.send_str('paid') 403 await ws.close() 404 return ws 405 406 407 408class Daemon(Logger): 409 410 network: Optional[Network] 411 gui_object: Optional[Union['gui.qt.ElectrumGui', 'gui.kivy.ElectrumGui']] 412 413 @profiler 414 def __init__(self, config: SimpleConfig, fd=None, *, listen_jsonrpc=True): 415 Logger.__init__(self) 416 self.config = config 417 self.listen_jsonrpc = listen_jsonrpc 418 if fd is None and listen_jsonrpc: 419 fd = get_file_descriptor(config) 420 if fd is None: 421 raise Exception('failed to lock daemon; already running?') 422 if 'wallet_path' in config.cmdline_options: 423 self.logger.warning("Ignoring parameter 'wallet_path' for daemon. " 424 "Use the load_wallet command instead.") 425 self.asyncio_loop = asyncio.get_event_loop() 426 self.network = None 427 if not config.get('offline'): 428 self.network = Network(config, daemon=self) 429 self.fx = FxThread(config, self.network) 430 self.gui_object = None 431 # path -> wallet; make sure path is standardized. 432 self._wallets = {} # type: Dict[str, Abstract_Wallet] 433 daemon_jobs = [] 434 # Setup commands server 435 self.commands_server = None 436 if listen_jsonrpc: 437 self.commands_server = CommandsServer(self, fd) 438 daemon_jobs.append(self.commands_server.run()) 439 # pay server 440 self.pay_server = None 441 payserver_address = self.config.get_netaddress('payserver_address') 442 if not config.get('offline') and payserver_address: 443 self.pay_server = PayServer(self, payserver_address) 444 daemon_jobs.append(self.pay_server.run()) 445 # server-side watchtower 446 self.watchtower = None 447 watchtower_address = self.config.get_netaddress('watchtower_address') 448 if not config.get('offline') and watchtower_address: 449 self.watchtower = WatchTowerServer(self.network, watchtower_address) 450 daemon_jobs.append(self.watchtower.run) 451 if self.network: 452 self.network.start(jobs=[self.fx.run]) 453 # prepare lightning functionality, also load channel db early 454 if self.config.get('use_gossip', False): 455 self.network.start_gossip() 456 457 self.stopping_soon = threading.Event() 458 self.stopped_event = asyncio.Event() 459 self.taskgroup = TaskGroup() 460 asyncio.run_coroutine_threadsafe(self._run(jobs=daemon_jobs), self.asyncio_loop) 461 462 @log_exceptions 463 async def _run(self, jobs: Iterable = None): 464 if jobs is None: 465 jobs = [] 466 self.logger.info("starting taskgroup.") 467 try: 468 async with self.taskgroup as group: 469 [await group.spawn(job) for job in jobs] 470 await group.spawn(asyncio.Event().wait) # run forever (until cancel) 471 except asyncio.CancelledError: 472 raise 473 except Exception as e: 474 self.logger.exception("taskgroup died.") 475 finally: 476 self.logger.info("taskgroup stopped.") 477 self.stopping_soon.set() 478 479 def load_wallet(self, path, password, *, manual_upgrades=True) -> Optional[Abstract_Wallet]: 480 path = standardize_path(path) 481 # wizard will be launched if we return 482 if path in self._wallets: 483 wallet = self._wallets[path] 484 return wallet 485 storage = WalletStorage(path) 486 if not storage.file_exists(): 487 return 488 if storage.is_encrypted(): 489 if not password: 490 return 491 storage.decrypt(password) 492 # read data, pass it to db 493 db = WalletDB(storage.read(), manual_upgrades=manual_upgrades) 494 if db.requires_split(): 495 return 496 if db.requires_upgrade(): 497 return 498 if db.get_action(): 499 return 500 wallet = Wallet(db, storage, config=self.config) 501 wallet.start_network(self.network) 502 self._wallets[path] = wallet 503 return wallet 504 505 def add_wallet(self, wallet: Abstract_Wallet) -> None: 506 path = wallet.storage.path 507 path = standardize_path(path) 508 self._wallets[path] = wallet 509 510 def get_wallet(self, path: str) -> Optional[Abstract_Wallet]: 511 path = standardize_path(path) 512 return self._wallets.get(path) 513 514 def get_wallets(self) -> Dict[str, Abstract_Wallet]: 515 return dict(self._wallets) # copy 516 517 def delete_wallet(self, path: str) -> bool: 518 self.stop_wallet(path) 519 if os.path.exists(path): 520 os.unlink(path) 521 return True 522 return False 523 524 def stop_wallet(self, path: str) -> bool: 525 """Returns True iff a wallet was found.""" 526 # note: this must not be called from the event loop. # TODO raise if so 527 fut = asyncio.run_coroutine_threadsafe(self._stop_wallet(path), self.asyncio_loop) 528 return fut.result() 529 530 async def _stop_wallet(self, path: str) -> bool: 531 """Returns True iff a wallet was found.""" 532 path = standardize_path(path) 533 wallet = self._wallets.pop(path, None) 534 if not wallet: 535 return False 536 await wallet.stop() 537 return True 538 539 def run_daemon(self): 540 try: 541 self.stopping_soon.wait() 542 except KeyboardInterrupt: 543 self.stopping_soon.set() 544 self.on_stop() 545 546 async def stop(self): 547 self.stopping_soon.set() 548 await self.stopped_event.wait() 549 550 def on_stop(self): 551 try: 552 self.logger.info("on_stop() entered. initiating shutdown") 553 if self.gui_object: 554 self.gui_object.stop() 555 556 async def stop_async(): 557 self.logger.info("stopping all wallets") 558 async with TaskGroup() as group: 559 for k, wallet in self._wallets.items(): 560 await group.spawn(wallet.stop()) 561 self.logger.info("stopping network and taskgroup") 562 async with ignore_after(2): 563 async with TaskGroup() as group: 564 if self.network: 565 await group.spawn(self.network.stop(full_shutdown=True)) 566 await group.spawn(self.taskgroup.cancel_remaining()) 567 568 fut = asyncio.run_coroutine_threadsafe(stop_async(), self.asyncio_loop) 569 fut.result() 570 finally: 571 if self.listen_jsonrpc: 572 self.logger.info("removing lockfile") 573 remove_lockfile(get_lockfile(self.config)) 574 self.logger.info("stopped") 575 self.asyncio_loop.call_soon_threadsafe(self.stopped_event.set) 576 577 def run_gui(self, config, plugins): 578 threading.current_thread().setName('GUI') 579 gui_name = config.get('gui', 'qt') 580 if gui_name in ['lite', 'classic']: 581 gui_name = 'qt' 582 self.logger.info(f'launching GUI: {gui_name}') 583 try: 584 gui = __import__('electrum.gui.' + gui_name, fromlist=['electrum']) 585 self.gui_object = gui.ElectrumGui(config, self, plugins) 586 self.gui_object.main() 587 except BaseException as e: 588 self.logger.error(f'GUI raised exception: {repr(e)}. shutting down.') 589 raise 590 finally: 591 # app will exit now 592 self.on_stop() 593