1#!/usr/local/bin/python3.8
2#
3# Electrum - lightweight Bitcoin client
4# Copyright (C) 2015 Thomas Voegtlin
5#
6# Permission is hereby granted, free of charge, to any person
7# obtaining a copy of this software and associated documentation files
8# (the "Software"), to deal in the Software without restriction,
9# including without limitation the rights to use, copy, modify, merge,
10# publish, distribute, sublicense, and/or sell copies of the Software,
11# and to permit persons to whom the Software is furnished to do so,
12# subject to the following conditions:
13#
14# The above copyright notice and this permission notice shall be
15# included in all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24# SOFTWARE.
25import asyncio
26import ast
27import os
28import time
29import traceback
30import sys
31import threading
32from typing import Dict, Optional, Tuple, Iterable, Callable, Union, Sequence, Mapping, TYPE_CHECKING
33from base64 import b64decode, b64encode
34from collections import defaultdict
35import json
36
37import aiohttp
38from aiohttp import web, client_exceptions
39from aiorpcx import TaskGroup, timeout_after, TaskTimeout, ignore_after
40
41from . import util
42from .network import Network
43from .util import (json_decode, to_bytes, to_string, profiler, standardize_path, constant_time_compare)
44from .invoices import PR_PAID, PR_EXPIRED
45from .util import log_exceptions, ignore_exceptions, randrange
46from .wallet import Wallet, Abstract_Wallet
47from .storage import WalletStorage
48from .wallet_db import WalletDB
49from .commands import known_commands, Commands
50from .simple_config import SimpleConfig
51from .exchange_rate import FxThread
52from .logging import get_logger, Logger
53
54if TYPE_CHECKING:
55    from electrum import gui
56
57
58_logger = get_logger(__name__)
59
60
61class DaemonNotRunning(Exception):
62    pass
63
64def get_lockfile(config: SimpleConfig):
65    return os.path.join(config.path, 'daemon')
66
67
68def remove_lockfile(lockfile):
69    os.unlink(lockfile)
70
71
72def get_file_descriptor(config: SimpleConfig):
73    '''Tries to create the lockfile, using O_EXCL to
74    prevent races.  If it succeeds it returns the FD.
75    Otherwise try and connect to the server specified in the lockfile.
76    If this succeeds, the server is returned.  Otherwise remove the
77    lockfile and try again.'''
78    lockfile = get_lockfile(config)
79    while True:
80        try:
81            return os.open(lockfile, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
82        except OSError:
83            pass
84        try:
85            request(config, 'ping')
86            return None
87        except DaemonNotRunning:
88            # Couldn't connect; remove lockfile and try again.
89            remove_lockfile(lockfile)
90
91
92
93def request(config: SimpleConfig, endpoint, args=(), timeout=60):
94    lockfile = get_lockfile(config)
95    while True:
96        create_time = None
97        try:
98            with open(lockfile) as f:
99                (host, port), create_time = ast.literal_eval(f.read())
100        except Exception:
101            raise DaemonNotRunning()
102        rpc_user, rpc_password = get_rpc_credentials(config)
103        server_url = 'http://%s:%d' % (host, port)
104        auth = aiohttp.BasicAuth(login=rpc_user, password=rpc_password)
105        loop = asyncio.get_event_loop()
106        async def request_coroutine():
107            async with aiohttp.ClientSession(auth=auth) as session:
108                c = util.JsonRPCClient(session, server_url)
109                return await c.request(endpoint, *args)
110        try:
111            fut = asyncio.run_coroutine_threadsafe(request_coroutine(), loop)
112            return fut.result(timeout=timeout)
113        except aiohttp.client_exceptions.ClientConnectorError as e:
114            _logger.info(f"failed to connect to JSON-RPC server {e}")
115            if not create_time or create_time < time.time() - 1.0:
116                raise DaemonNotRunning()
117        # Sleep a bit and try again; it might have just been started
118        time.sleep(1.0)
119
120
121def get_rpc_credentials(config: SimpleConfig) -> Tuple[str, str]:
122    rpc_user = config.get('rpcuser', None)
123    rpc_password = config.get('rpcpassword', None)
124    if rpc_user == '':
125        rpc_user = None
126    if rpc_password == '':
127        rpc_password = None
128    if rpc_user is None or rpc_password is None:
129        rpc_user = 'user'
130        bits = 128
131        nbytes = bits // 8 + (bits % 8 > 0)
132        pw_int = randrange(pow(2, bits))
133        pw_b64 = b64encode(
134            pw_int.to_bytes(nbytes, 'big'), b'-_')
135        rpc_password = to_string(pw_b64, 'ascii')
136        config.set_key('rpcuser', rpc_user)
137        config.set_key('rpcpassword', rpc_password, save=True)
138    return rpc_user, rpc_password
139
140
141class AuthenticationError(Exception):
142    pass
143
144class AuthenticationInvalidOrMissing(AuthenticationError):
145    pass
146
147class AuthenticationCredentialsInvalid(AuthenticationError):
148    pass
149
150class AuthenticatedServer(Logger):
151
152    def __init__(self, rpc_user, rpc_password):
153        Logger.__init__(self)
154        self.rpc_user = rpc_user
155        self.rpc_password = rpc_password
156        self.auth_lock = asyncio.Lock()
157        self._methods = {}  # type: Dict[str, Callable]
158
159    def register_method(self, f):
160        assert f.__name__ not in self._methods, f"name collision for {f.__name__}"
161        self._methods[f.__name__] = f
162
163    async def authenticate(self, headers):
164        if self.rpc_password == '':
165            # RPC authentication is disabled
166            return
167        auth_string = headers.get('Authorization', None)
168        if auth_string is None:
169            raise AuthenticationInvalidOrMissing('CredentialsMissing')
170        basic, _, encoded = auth_string.partition(' ')
171        if basic != 'Basic':
172            raise AuthenticationInvalidOrMissing('UnsupportedType')
173        encoded = to_bytes(encoded, 'utf8')
174        credentials = to_string(b64decode(encoded), 'utf8')
175        username, _, password = credentials.partition(':')
176        if not (constant_time_compare(username, self.rpc_user)
177                and constant_time_compare(password, self.rpc_password)):
178            await asyncio.sleep(0.050)
179            raise AuthenticationCredentialsInvalid('Invalid Credentials')
180
181    async def handle(self, request):
182        async with self.auth_lock:
183            try:
184                await self.authenticate(request.headers)
185            except AuthenticationInvalidOrMissing:
186                return web.Response(headers={"WWW-Authenticate": "Basic realm=Electrum"},
187                                    text='Unauthorized', status=401)
188            except AuthenticationCredentialsInvalid:
189                return web.Response(text='Forbidden', status=403)
190        try:
191            request = await request.text()
192            request = json.loads(request)
193            method = request['method']
194            _id = request['id']
195            params = request.get('params', [])  # type: Union[Sequence, Mapping]
196            if method not in self._methods:
197                raise Exception(f"attempting to use unregistered method: {method}")
198            f = self._methods[method]
199        except Exception as e:
200            self.logger.exception("invalid request")
201            return web.Response(text='Invalid Request', status=500)
202        response = {
203            'id': _id,
204            'jsonrpc': '2.0',
205        }
206        try:
207            if isinstance(params, dict):
208                response['result'] = await f(**params)
209            else:
210                response['result'] = await f(*params)
211        except BaseException as e:
212            self.logger.exception("internal error while executing RPC")
213            response['error'] = {
214                'code': 1,
215                'message': str(e),
216            }
217        return web.json_response(response)
218
219
220class CommandsServer(AuthenticatedServer):
221
222    def __init__(self, daemon, fd):
223        rpc_user, rpc_password = get_rpc_credentials(daemon.config)
224        AuthenticatedServer.__init__(self, rpc_user, rpc_password)
225        self.daemon = daemon
226        self.fd = fd
227        self.config = daemon.config
228        self.host = self.config.get('rpchost', '127.0.0.1')
229        self.port = self.config.get('rpcport', 0)
230        self.app = web.Application()
231        self.app.router.add_post("/", self.handle)
232        self.register_method(self.ping)
233        self.register_method(self.gui)
234        self.cmd_runner = Commands(config=self.config, network=self.daemon.network, daemon=self.daemon)
235        for cmdname in known_commands:
236            self.register_method(getattr(self.cmd_runner, cmdname))
237        self.register_method(self.run_cmdline)
238
239    async def run(self):
240        self.runner = web.AppRunner(self.app)
241        await self.runner.setup()
242        site = web.TCPSite(self.runner, self.host, self.port)
243        await site.start()
244        socket = site._server.sockets[0]
245        os.write(self.fd, bytes(repr((socket.getsockname(), time.time())), 'utf8'))
246        os.close(self.fd)
247
248    async def ping(self):
249        return True
250
251    async def gui(self, config_options):
252        if self.daemon.gui_object:
253            if hasattr(self.daemon.gui_object, 'new_window'):
254                path = self.config.get_wallet_path(use_gui_last_wallet=True)
255                self.daemon.gui_object.new_window(path, config_options.get('url'))
256                response = "ok"
257            else:
258                response = "error: current GUI does not support multiple windows"
259        else:
260            response = "Error: Electrum is running in daemon mode. Please stop the daemon first."
261        return response
262
263    async def run_cmdline(self, config_options):
264        cmdname = config_options['cmd']
265        cmd = known_commands[cmdname]
266        # arguments passed to function
267        args = [config_options.get(x) for x in cmd.params]
268        # decode json arguments
269        args = [json_decode(i) for i in args]
270        # options
271        kwargs = {}
272        for x in cmd.options:
273            kwargs[x] = config_options.get(x)
274        if 'wallet_path' in cmd.options:
275            kwargs['wallet_path'] = config_options.get('wallet_path')
276        elif 'wallet' in cmd.options:
277            kwargs['wallet'] = config_options.get('wallet_path')
278        func = getattr(self.cmd_runner, cmd.name)
279        # fixme: not sure how to retrieve message in jsonrpcclient
280        try:
281            result = await func(*args, **kwargs)
282        except Exception as e:
283            result = {'error':str(e)}
284        return result
285
286
287class WatchTowerServer(AuthenticatedServer):
288
289    def __init__(self, network, netaddress):
290        self.addr = netaddress
291        self.config = network.config
292        self.network = network
293        watchtower_user = self.config.get('watchtower_user', '')
294        watchtower_password = self.config.get('watchtower_password', '')
295        AuthenticatedServer.__init__(self, watchtower_user, watchtower_password)
296        self.lnwatcher = network.local_watchtower
297        self.app = web.Application()
298        self.app.router.add_post("/", self.handle)
299        self.register_method(self.get_ctn)
300        self.register_method(self.add_sweep_tx)
301
302    async def run(self):
303        self.runner = web.AppRunner(self.app)
304        await self.runner.setup()
305        site = web.TCPSite(self.runner, host=str(self.addr.host), port=self.addr.port, ssl_context=self.config.get_ssl_context())
306        await site.start()
307
308    async def get_ctn(self, *args):
309        return await self.lnwatcher.sweepstore.get_ctn(*args)
310
311    async def add_sweep_tx(self, *args):
312        return await self.lnwatcher.sweepstore.add_sweep_tx(*args)
313
314
315class PayServer(Logger):
316
317    def __init__(self, daemon: 'Daemon', netaddress):
318        Logger.__init__(self)
319        self.addr = netaddress
320        self.daemon = daemon
321        self.config = daemon.config
322        self.pending = defaultdict(asyncio.Event)
323        util.register_callback(self.on_payment, ['request_status'])
324
325    @property
326    def wallet(self):
327        # FIXME specify wallet somehow?
328        return list(self.daemon.get_wallets().values())[0]
329
330    async def on_payment(self, evt, wallet, key, status):
331        if status == PR_PAID:
332            self.pending[key].set()
333
334    @ignore_exceptions
335    @log_exceptions
336    async def run(self):
337        root = self.config.get('payserver_root', '/r')
338        app = web.Application()
339        app.add_routes([web.get('/api/get_invoice', self.get_request)])
340        app.add_routes([web.get('/api/get_status', self.get_status)])
341        app.add_routes([web.get('/bip70/{key}.bip70', self.get_bip70_request)])
342        app.add_routes([web.static(root, os.path.join(os.path.dirname(__file__), 'www'))])
343        if self.config.get('payserver_allow_create_invoice'):
344            app.add_routes([web.post('/api/create_invoice', self.create_request)])
345        runner = web.AppRunner(app)
346        await runner.setup()
347        site = web.TCPSite(runner, host=str(self.addr.host), port=self.addr.port, ssl_context=self.config.get_ssl_context())
348        await site.start()
349
350    async def create_request(self, request):
351        params = await request.post()
352        wallet = self.wallet
353        if 'amount_sat' not in params or not params['amount_sat'].isdigit():
354            raise web.HTTPUnsupportedMediaType()
355        amount = int(params['amount_sat'])
356        message = params['message'] or "donation"
357        payment_hash = wallet.lnworker.add_request(
358            amount_sat=amount,
359            message=message,
360            expiry=3600)
361        key = payment_hash.hex()
362        raise web.HTTPFound(self.root + '/pay?id=' + key)
363
364    async def get_request(self, r):
365        key = r.query_string
366        request = self.wallet.get_formatted_request(key)
367        return web.json_response(request)
368
369    async def get_bip70_request(self, r):
370        from .paymentrequest import make_request
371        key = r.match_info['key']
372        request = self.wallet.get_request(key)
373        if not request:
374            return web.HTTPNotFound()
375        pr = make_request(self.config, request)
376        return web.Response(body=pr.SerializeToString(), content_type='application/bitcoin-paymentrequest')
377
378    async def get_status(self, request):
379        ws = web.WebSocketResponse()
380        await ws.prepare(request)
381        key = request.query_string
382        info = self.wallet.get_formatted_request(key)
383        if not info:
384            await ws.send_str('unknown invoice')
385            await ws.close()
386            return ws
387        if info.get('status') == PR_PAID:
388            await ws.send_str(f'paid')
389            await ws.close()
390            return ws
391        if info.get('status') == PR_EXPIRED:
392            await ws.send_str(f'expired')
393            await ws.close()
394            return ws
395        while True:
396            try:
397                await asyncio.wait_for(self.pending[key].wait(), 1)
398                break
399            except asyncio.TimeoutError:
400                # send data on the websocket, to keep it alive
401                await ws.send_str('waiting')
402        await ws.send_str('paid')
403        await ws.close()
404        return ws
405
406
407
408class Daemon(Logger):
409
410    network: Optional[Network]
411    gui_object: Optional[Union['gui.qt.ElectrumGui', 'gui.kivy.ElectrumGui']]
412
413    @profiler
414    def __init__(self, config: SimpleConfig, fd=None, *, listen_jsonrpc=True):
415        Logger.__init__(self)
416        self.config = config
417        self.listen_jsonrpc = listen_jsonrpc
418        if fd is None and listen_jsonrpc:
419            fd = get_file_descriptor(config)
420            if fd is None:
421                raise Exception('failed to lock daemon; already running?')
422        if 'wallet_path' in config.cmdline_options:
423            self.logger.warning("Ignoring parameter 'wallet_path' for daemon. "
424                                "Use the load_wallet command instead.")
425        self.asyncio_loop = asyncio.get_event_loop()
426        self.network = None
427        if not config.get('offline'):
428            self.network = Network(config, daemon=self)
429        self.fx = FxThread(config, self.network)
430        self.gui_object = None
431        # path -> wallet;   make sure path is standardized.
432        self._wallets = {}  # type: Dict[str, Abstract_Wallet]
433        daemon_jobs = []
434        # Setup commands server
435        self.commands_server = None
436        if listen_jsonrpc:
437            self.commands_server = CommandsServer(self, fd)
438            daemon_jobs.append(self.commands_server.run())
439        # pay server
440        self.pay_server = None
441        payserver_address = self.config.get_netaddress('payserver_address')
442        if not config.get('offline') and payserver_address:
443            self.pay_server = PayServer(self, payserver_address)
444            daemon_jobs.append(self.pay_server.run())
445        # server-side watchtower
446        self.watchtower = None
447        watchtower_address = self.config.get_netaddress('watchtower_address')
448        if not config.get('offline') and watchtower_address:
449            self.watchtower = WatchTowerServer(self.network, watchtower_address)
450            daemon_jobs.append(self.watchtower.run)
451        if self.network:
452            self.network.start(jobs=[self.fx.run])
453            # prepare lightning functionality, also load channel db early
454            if self.config.get('use_gossip', False):
455                self.network.start_gossip()
456
457        self.stopping_soon = threading.Event()
458        self.stopped_event = asyncio.Event()
459        self.taskgroup = TaskGroup()
460        asyncio.run_coroutine_threadsafe(self._run(jobs=daemon_jobs), self.asyncio_loop)
461
462    @log_exceptions
463    async def _run(self, jobs: Iterable = None):
464        if jobs is None:
465            jobs = []
466        self.logger.info("starting taskgroup.")
467        try:
468            async with self.taskgroup as group:
469                [await group.spawn(job) for job in jobs]
470                await group.spawn(asyncio.Event().wait)  # run forever (until cancel)
471        except asyncio.CancelledError:
472            raise
473        except Exception as e:
474            self.logger.exception("taskgroup died.")
475        finally:
476            self.logger.info("taskgroup stopped.")
477            self.stopping_soon.set()
478
479    def load_wallet(self, path, password, *, manual_upgrades=True) -> Optional[Abstract_Wallet]:
480        path = standardize_path(path)
481        # wizard will be launched if we return
482        if path in self._wallets:
483            wallet = self._wallets[path]
484            return wallet
485        storage = WalletStorage(path)
486        if not storage.file_exists():
487            return
488        if storage.is_encrypted():
489            if not password:
490                return
491            storage.decrypt(password)
492        # read data, pass it to db
493        db = WalletDB(storage.read(), manual_upgrades=manual_upgrades)
494        if db.requires_split():
495            return
496        if db.requires_upgrade():
497            return
498        if db.get_action():
499            return
500        wallet = Wallet(db, storage, config=self.config)
501        wallet.start_network(self.network)
502        self._wallets[path] = wallet
503        return wallet
504
505    def add_wallet(self, wallet: Abstract_Wallet) -> None:
506        path = wallet.storage.path
507        path = standardize_path(path)
508        self._wallets[path] = wallet
509
510    def get_wallet(self, path: str) -> Optional[Abstract_Wallet]:
511        path = standardize_path(path)
512        return self._wallets.get(path)
513
514    def get_wallets(self) -> Dict[str, Abstract_Wallet]:
515        return dict(self._wallets)  # copy
516
517    def delete_wallet(self, path: str) -> bool:
518        self.stop_wallet(path)
519        if os.path.exists(path):
520            os.unlink(path)
521            return True
522        return False
523
524    def stop_wallet(self, path: str) -> bool:
525        """Returns True iff a wallet was found."""
526        # note: this must not be called from the event loop. # TODO raise if so
527        fut = asyncio.run_coroutine_threadsafe(self._stop_wallet(path), self.asyncio_loop)
528        return fut.result()
529
530    async def _stop_wallet(self, path: str) -> bool:
531        """Returns True iff a wallet was found."""
532        path = standardize_path(path)
533        wallet = self._wallets.pop(path, None)
534        if not wallet:
535            return False
536        await wallet.stop()
537        return True
538
539    def run_daemon(self):
540        try:
541            self.stopping_soon.wait()
542        except KeyboardInterrupt:
543            self.stopping_soon.set()
544        self.on_stop()
545
546    async def stop(self):
547        self.stopping_soon.set()
548        await self.stopped_event.wait()
549
550    def on_stop(self):
551        try:
552            self.logger.info("on_stop() entered. initiating shutdown")
553            if self.gui_object:
554                self.gui_object.stop()
555
556            async def stop_async():
557                self.logger.info("stopping all wallets")
558                async with TaskGroup() as group:
559                    for k, wallet in self._wallets.items():
560                        await group.spawn(wallet.stop())
561                self.logger.info("stopping network and taskgroup")
562                async with ignore_after(2):
563                    async with TaskGroup() as group:
564                        if self.network:
565                            await group.spawn(self.network.stop(full_shutdown=True))
566                        await group.spawn(self.taskgroup.cancel_remaining())
567
568            fut = asyncio.run_coroutine_threadsafe(stop_async(), self.asyncio_loop)
569            fut.result()
570        finally:
571            if self.listen_jsonrpc:
572                self.logger.info("removing lockfile")
573                remove_lockfile(get_lockfile(self.config))
574            self.logger.info("stopped")
575            self.asyncio_loop.call_soon_threadsafe(self.stopped_event.set)
576
577    def run_gui(self, config, plugins):
578        threading.current_thread().setName('GUI')
579        gui_name = config.get('gui', 'qt')
580        if gui_name in ['lite', 'classic']:
581            gui_name = 'qt'
582        self.logger.info(f'launching GUI: {gui_name}')
583        try:
584            gui = __import__('electrum.gui.' + gui_name, fromlist=['electrum'])
585            self.gui_object = gui.ElectrumGui(config, self, plugins)
586            self.gui_object.main()
587        except BaseException as e:
588            self.logger.error(f'GUI raised exception: {repr(e)}. shutting down.')
589            raise
590        finally:
591            # app will exit now
592            self.on_stop()
593