1# Electrum - lightweight Bitcoin client
2# Copyright (C) 2018 The Electrum Developers
3#
4# Permission is hereby granted, free of charge, to any person
5# obtaining a copy of this software and associated documentation files
6# (the "Software"), to deal in the Software without restriction,
7# including without limitation the rights to use, copy, modify, merge,
8# publish, distribute, sublicense, and/or sell copies of the Software,
9# and to permit persons to whom the Software is furnished to do so,
10# subject to the following conditions:
11#
12# The above copyright notice and this permission notice shall be
13# included in all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
19# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22# SOFTWARE.
23
24import asyncio
25import threading
26import asyncio
27import itertools
28from collections import defaultdict
29from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple, NamedTuple, Sequence, List
30
31from aiorpcx import TaskGroup
32
33from . import bitcoin, util
34from .bitcoin import COINBASE_MATURITY
35from .util import profiler, bfh, TxMinedInfo, UnrelatedTransactionException, with_lock
36from .transaction import Transaction, TxOutput, TxInput, PartialTxInput, TxOutpoint, PartialTransaction
37from .synchronizer import Synchronizer
38from .verifier import SPV
39from .blockchain import hash_header, Blockchain
40from .i18n import _
41from .logging import Logger
42
43if TYPE_CHECKING:
44    from .network import Network
45    from .wallet_db import WalletDB
46
47
48TX_HEIGHT_FUTURE = -3
49TX_HEIGHT_LOCAL = -2
50TX_HEIGHT_UNCONF_PARENT = -1
51TX_HEIGHT_UNCONFIRMED = 0
52
53
54class HistoryItem(NamedTuple):
55    txid: str
56    tx_mined_status: TxMinedInfo
57    delta: int
58    fee: Optional[int]
59    balance: int
60
61
62class TxWalletDelta(NamedTuple):
63    is_relevant: bool  # "related to wallet?"
64    is_any_input_ismine: bool
65    is_all_input_ismine: bool
66    delta: int
67    fee: Optional[int]
68
69
70class AddressSynchronizer(Logger):
71    """
72    inherited by wallet
73    """
74
75    network: Optional['Network']
76    synchronizer: Optional['Synchronizer']
77    verifier: Optional['SPV']
78
79    def __init__(self, db: 'WalletDB'):
80        self.db = db
81        self.network = None
82        Logger.__init__(self)
83        # verifier (SPV) and synchronizer are started in start_network
84        self.synchronizer = None
85        self.verifier = None
86        # locks: if you need to take multiple ones, acquire them in the order they are defined here!
87        self.lock = threading.RLock()
88        self.transaction_lock = threading.RLock()
89        self.future_tx = {}  # type: Dict[str, int]  # txid -> wanted height
90        # Transactions pending verification.  txid -> tx_height. Access with self.lock.
91        self.unverified_tx = defaultdict(int)
92        # true when synchronized
93        self.up_to_date = False
94        # thread local storage for caching stuff
95        self.threadlocal_cache = threading.local()
96
97        self._get_addr_balance_cache = {}
98
99        self.load_and_cleanup()
100
101    def with_transaction_lock(func):
102        def func_wrapper(self: 'AddressSynchronizer', *args, **kwargs):
103            with self.transaction_lock:
104                return func(self, *args, **kwargs)
105        return func_wrapper
106
107    def load_and_cleanup(self):
108        self.load_local_history()
109        self.check_history()
110        self.load_unverified_transactions()
111        self.remove_local_transactions_we_dont_have()
112
113    def is_mine(self, address: Optional[str]) -> bool:
114        if not address: return False
115        return self.db.is_addr_in_history(address)
116
117    def get_addresses(self):
118        return sorted(self.db.get_history())
119
120    def get_address_history(self, addr: str) -> Sequence[Tuple[str, int]]:
121        """Returns the history for the address, in the format that would be returned by a server.
122
123        Note: The difference between db.get_addr_history and this method is that
124        db.get_addr_history stores the response from a server, so it only includes txns
125        a server sees, i.e. that does not contain local and future txns.
126        """
127        h = []
128        # we need self.transaction_lock but get_tx_height will take self.lock
129        # so we need to take that too here, to enforce order of locks
130        with self.lock, self.transaction_lock:
131            related_txns = self._history_local.get(addr, set())
132            for tx_hash in related_txns:
133                tx_height = self.get_tx_height(tx_hash).height
134                h.append((tx_hash, tx_height))
135        return h
136
137    def get_address_history_len(self, addr: str) -> int:
138        """Return number of transactions where address is involved."""
139        return len(self._history_local.get(addr, ()))
140
141    def get_txin_address(self, txin: TxInput) -> Optional[str]:
142        if isinstance(txin, PartialTxInput):
143            if txin.address:
144                return txin.address
145        prevout_hash = txin.prevout.txid.hex()
146        prevout_n = txin.prevout.out_idx
147        for addr in self.db.get_txo_addresses(prevout_hash):
148            d = self.db.get_txo_addr(prevout_hash, addr)
149            if prevout_n in d:
150                return addr
151        tx = self.db.get_transaction(prevout_hash)
152        if tx:
153            return tx.outputs()[prevout_n].address
154        return None
155
156    def get_txin_value(self, txin: TxInput, *, address: str = None) -> Optional[int]:
157        if txin.value_sats() is not None:
158            return txin.value_sats()
159        prevout_hash = txin.prevout.txid.hex()
160        prevout_n = txin.prevout.out_idx
161        if address is None:
162            address = self.get_txin_address(txin)
163        if address:
164            d = self.db.get_txo_addr(prevout_hash, address)
165            try:
166                v, cb = d[prevout_n]
167                return v
168            except KeyError:
169                pass
170        tx = self.db.get_transaction(prevout_hash)
171        if tx:
172            return tx.outputs()[prevout_n].value
173        return None
174
175    def load_unverified_transactions(self):
176        # review transactions that are in the history
177        for addr in self.db.get_history():
178            hist = self.db.get_addr_history(addr)
179            for tx_hash, tx_height in hist:
180                # add it in case it was previously unconfirmed
181                self.add_unverified_tx(tx_hash, tx_height)
182
183    def start_network(self, network: Optional['Network']) -> None:
184        self.network = network
185        if self.network is not None:
186            self.synchronizer = Synchronizer(self)
187            self.verifier = SPV(self.network, self)
188            util.register_callback(self.on_blockchain_updated, ['blockchain_updated'])
189
190    def on_blockchain_updated(self, event, *args):
191        self._get_addr_balance_cache = {}  # invalidate cache
192
193    async def stop(self):
194        if self.network:
195            try:
196                async with TaskGroup() as group:
197                    if self.synchronizer:
198                        await group.spawn(self.synchronizer.stop())
199                    if self.verifier:
200                        await group.spawn(self.verifier.stop())
201            finally:  # even if we get cancelled
202                self.synchronizer = None
203                self.verifier = None
204                util.unregister_callback(self.on_blockchain_updated)
205                self.db.put('stored_height', self.get_local_height())
206
207    def add_address(self, address):
208        if not self.db.get_addr_history(address):
209            self.db.history[address] = []
210            self.set_up_to_date(False)
211        if self.synchronizer:
212            self.synchronizer.add(address)
213
214    def get_conflicting_transactions(self, tx_hash, tx: Transaction, include_self=False):
215        """Returns a set of transaction hashes from the wallet history that are
216        directly conflicting with tx, i.e. they have common outpoints being
217        spent with tx.
218
219        include_self specifies whether the tx itself should be reported as a
220        conflict (if already in wallet history)
221        """
222        conflicting_txns = set()
223        with self.transaction_lock:
224            for txin in tx.inputs():
225                if txin.is_coinbase_input():
226                    continue
227                prevout_hash = txin.prevout.txid.hex()
228                prevout_n = txin.prevout.out_idx
229                spending_tx_hash = self.db.get_spent_outpoint(prevout_hash, prevout_n)
230                if spending_tx_hash is None:
231                    continue
232                # this outpoint has already been spent, by spending_tx
233                # annoying assert that has revealed several bugs over time:
234                assert self.db.get_transaction(spending_tx_hash), "spending tx not in wallet db"
235                conflicting_txns |= {spending_tx_hash}
236            if tx_hash in conflicting_txns:
237                # this tx is already in history, so it conflicts with itself
238                if len(conflicting_txns) > 1:
239                    raise Exception('Found conflicting transactions already in wallet history.')
240                if not include_self:
241                    conflicting_txns -= {tx_hash}
242            return conflicting_txns
243
244    def add_transaction(self, tx: Transaction, *, allow_unrelated=False) -> bool:
245        """
246        Returns whether the tx was successfully added to the wallet history.
247        Note that a transaction may need to be added several times, if our
248        list of addresses has increased. This will return True even if the
249        transaction was already in self.db.
250        """
251        assert tx, tx
252        # note: tx.is_complete() is not necessarily True; tx might be partial
253        # but it *needs* to have a txid:
254        tx_hash = tx.txid()
255        if tx_hash is None:
256            raise Exception("cannot add tx without txid to wallet history")
257        # we need self.transaction_lock but get_tx_height will take self.lock
258        # so we need to take that too here, to enforce order of locks
259        with self.lock, self.transaction_lock:
260            # NOTE: returning if tx in self.transactions might seem like a good idea
261            # BUT we track is_mine inputs in a txn, and during subsequent calls
262            # of add_transaction tx, we might learn of more-and-more inputs of
263            # being is_mine, as we roll the gap_limit forward
264            is_coinbase = tx.inputs()[0].is_coinbase_input()
265            tx_height = self.get_tx_height(tx_hash).height
266            if not allow_unrelated:
267                # note that during sync, if the transactions are not properly sorted,
268                # it could happen that we think tx is unrelated but actually one of the inputs is is_mine.
269                # this is the main motivation for allow_unrelated
270                is_mine = any([self.is_mine(self.get_txin_address(txin)) for txin in tx.inputs()])
271                is_for_me = any([self.is_mine(txo.address) for txo in tx.outputs()])
272                if not is_mine and not is_for_me:
273                    raise UnrelatedTransactionException()
274            # Find all conflicting transactions.
275            # In case of a conflict,
276            #     1. confirmed > mempool > local
277            #     2. this new txn has priority over existing ones
278            # When this method exits, there must NOT be any conflict, so
279            # either keep this txn and remove all conflicting (along with dependencies)
280            #     or drop this txn
281            conflicting_txns = self.get_conflicting_transactions(tx_hash, tx)
282            if conflicting_txns:
283                existing_mempool_txn = any(
284                    self.get_tx_height(tx_hash2).height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT)
285                    for tx_hash2 in conflicting_txns)
286                existing_confirmed_txn = any(
287                    self.get_tx_height(tx_hash2).height > 0
288                    for tx_hash2 in conflicting_txns)
289                if existing_confirmed_txn and tx_height <= 0:
290                    # this is a non-confirmed tx that conflicts with confirmed txns; drop.
291                    return False
292                if existing_mempool_txn and tx_height == TX_HEIGHT_LOCAL:
293                    # this is a local tx that conflicts with non-local txns; drop.
294                    return False
295                # keep this txn and remove all conflicting
296                for tx_hash2 in conflicting_txns:
297                    self.remove_transaction(tx_hash2)
298            # add inputs
299            def add_value_from_prev_output():
300                # note: this takes linear time in num is_mine outputs of prev_tx
301                addr = self.get_txin_address(txi)
302                if addr and self.is_mine(addr):
303                    outputs = self.db.get_txo_addr(prevout_hash, addr)
304                    try:
305                        v, is_cb = outputs[prevout_n]
306                    except KeyError:
307                        pass
308                    else:
309                        self.db.add_txi_addr(tx_hash, addr, ser, v)
310                        self._get_addr_balance_cache.pop(addr, None)  # invalidate cache
311            for txi in tx.inputs():
312                if txi.is_coinbase_input():
313                    continue
314                prevout_hash = txi.prevout.txid.hex()
315                prevout_n = txi.prevout.out_idx
316                ser = txi.prevout.to_str()
317                self.db.set_spent_outpoint(prevout_hash, prevout_n, tx_hash)
318                add_value_from_prev_output()
319            # add outputs
320            for n, txo in enumerate(tx.outputs()):
321                v = txo.value
322                ser = tx_hash + ':%d'%n
323                scripthash = bitcoin.script_to_scripthash(txo.scriptpubkey.hex())
324                self.db.add_prevout_by_scripthash(scripthash, prevout=TxOutpoint.from_str(ser), value=v)
325                addr = txo.address
326                if addr and self.is_mine(addr):
327                    self.db.add_txo_addr(tx_hash, addr, n, v, is_coinbase)
328                    self._get_addr_balance_cache.pop(addr, None)  # invalidate cache
329                    # give v to txi that spends me
330                    next_tx = self.db.get_spent_outpoint(tx_hash, n)
331                    if next_tx is not None:
332                        self.db.add_txi_addr(next_tx, addr, ser, v)
333                        self._add_tx_to_local_history(next_tx)
334            # add to local history
335            self._add_tx_to_local_history(tx_hash)
336            # save
337            self.db.add_transaction(tx_hash, tx)
338            self.db.add_num_inputs_to_tx(tx_hash, len(tx.inputs()))
339            return True
340
341    def remove_transaction(self, tx_hash: str) -> None:
342        """Removes a transaction AND all its dependents/children
343        from the wallet history.
344        """
345        with self.lock, self.transaction_lock:
346            to_remove = {tx_hash}
347            to_remove |= self.get_depending_transactions(tx_hash)
348            for txid in to_remove:
349                self._remove_transaction(txid)
350
351    def _remove_transaction(self, tx_hash: str) -> None:
352        """Removes a single transaction from the wallet history, and attempts
353         to undo all effects of the tx (spending inputs, creating outputs, etc).
354        """
355        def remove_from_spent_outpoints():
356            # undo spends in spent_outpoints
357            if tx is not None:
358                # if we have the tx, this branch is faster
359                for txin in tx.inputs():
360                    if txin.is_coinbase_input():
361                        continue
362                    prevout_hash = txin.prevout.txid.hex()
363                    prevout_n = txin.prevout.out_idx
364                    self.db.remove_spent_outpoint(prevout_hash, prevout_n)
365            else:
366                # expensive but always works
367                for prevout_hash, prevout_n in self.db.list_spent_outpoints():
368                    spending_txid = self.db.get_spent_outpoint(prevout_hash, prevout_n)
369                    if spending_txid == tx_hash:
370                        self.db.remove_spent_outpoint(prevout_hash, prevout_n)
371
372        with self.lock, self.transaction_lock:
373            self.logger.info(f"removing tx from history {tx_hash}")
374            tx = self.db.remove_transaction(tx_hash)
375            remove_from_spent_outpoints()
376            self._remove_tx_from_local_history(tx_hash)
377            for addr in itertools.chain(self.db.get_txi_addresses(tx_hash), self.db.get_txo_addresses(tx_hash)):
378                self._get_addr_balance_cache.pop(addr, None)  # invalidate cache
379            self.db.remove_txi(tx_hash)
380            self.db.remove_txo(tx_hash)
381            self.db.remove_tx_fee(tx_hash)
382            self.db.remove_verified_tx(tx_hash)
383            self.unverified_tx.pop(tx_hash, None)
384            if tx:
385                for idx, txo in enumerate(tx.outputs()):
386                    scripthash = bitcoin.script_to_scripthash(txo.scriptpubkey.hex())
387                    prevout = TxOutpoint(bfh(tx_hash), idx)
388                    self.db.remove_prevout_by_scripthash(scripthash, prevout=prevout, value=txo.value)
389
390    def get_depending_transactions(self, tx_hash: str) -> Set[str]:
391        """Returns all (grand-)children of tx_hash in this wallet."""
392        with self.transaction_lock:
393            children = set()
394            for n in self.db.get_spent_outpoints(tx_hash):
395                other_hash = self.db.get_spent_outpoint(tx_hash, n)
396                children.add(other_hash)
397                children |= self.get_depending_transactions(other_hash)
398            return children
399
400    def receive_tx_callback(self, tx_hash: str, tx: Transaction, tx_height: int) -> None:
401        self.add_unverified_tx(tx_hash, tx_height)
402        self.add_transaction(tx, allow_unrelated=True)
403
404    def receive_history_callback(self, addr: str, hist, tx_fees: Dict[str, int]):
405        with self.lock:
406            old_hist = self.get_address_history(addr)
407            for tx_hash, height in old_hist:
408                if (tx_hash, height) not in hist:
409                    # make tx local
410                    self.unverified_tx.pop(tx_hash, None)
411                    self.db.remove_verified_tx(tx_hash)
412                    if self.verifier:
413                        self.verifier.remove_spv_proof_for_tx(tx_hash)
414            self.db.set_addr_history(addr, hist)
415
416        for tx_hash, tx_height in hist:
417            # add it in case it was previously unconfirmed
418            self.add_unverified_tx(tx_hash, tx_height)
419            # if addr is new, we have to recompute txi and txo
420            tx = self.db.get_transaction(tx_hash)
421            if tx is None:
422                continue
423            self.add_transaction(tx, allow_unrelated=True)
424
425        # Store fees
426        for tx_hash, fee_sat in tx_fees.items():
427            self.db.add_tx_fee_from_server(tx_hash, fee_sat)
428
429    @profiler
430    def load_local_history(self):
431        self._history_local = {}  # type: Dict[str, Set[str]]  # address -> set(txid)
432        self._address_history_changed_events = defaultdict(asyncio.Event)  # address -> Event
433        for txid in itertools.chain(self.db.list_txi(), self.db.list_txo()):
434            self._add_tx_to_local_history(txid)
435
436    @profiler
437    def check_history(self):
438        hist_addrs_mine = list(filter(lambda k: self.is_mine(k), self.db.get_history()))
439        hist_addrs_not_mine = list(filter(lambda k: not self.is_mine(k), self.db.get_history()))
440        for addr in hist_addrs_not_mine:
441            self.db.remove_addr_history(addr)
442        for addr in hist_addrs_mine:
443            hist = self.db.get_addr_history(addr)
444            for tx_hash, tx_height in hist:
445                if self.db.get_txi_addresses(tx_hash) or self.db.get_txo_addresses(tx_hash):
446                    continue
447                tx = self.db.get_transaction(tx_hash)
448                if tx is not None:
449                    self.add_transaction(tx, allow_unrelated=True)
450
451    def remove_local_transactions_we_dont_have(self):
452        for txid in itertools.chain(self.db.list_txi(), self.db.list_txo()):
453            tx_height = self.get_tx_height(txid).height
454            if tx_height == TX_HEIGHT_LOCAL and not self.db.get_transaction(txid):
455                self.remove_transaction(txid)
456
457    def clear_history(self):
458        with self.lock:
459            with self.transaction_lock:
460                self.db.clear_history()
461                self._history_local.clear()
462                self._get_addr_balance_cache = {}  # invalidate cache
463
464    def get_txpos(self, tx_hash):
465        """Returns (height, txpos) tuple, even if the tx is unverified."""
466        with self.lock:
467            verified_tx_mined_info = self.db.get_verified_tx(tx_hash)
468            if verified_tx_mined_info:
469                return verified_tx_mined_info.height, verified_tx_mined_info.txpos
470            elif tx_hash in self.unverified_tx:
471                height = self.unverified_tx[tx_hash]
472                return (height, -1) if height > 0 else ((1e9 - height), -1)
473            else:
474                return (1e9+1, -1)
475
476    def with_local_height_cached(func):
477        # get local height only once, as it's relatively expensive.
478        # take care that nested calls work as expected
479        def f(self, *args, **kwargs):
480            orig_val = getattr(self.threadlocal_cache, 'local_height', None)
481            self.threadlocal_cache.local_height = orig_val or self.get_local_height()
482            try:
483                return func(self, *args, **kwargs)
484            finally:
485                self.threadlocal_cache.local_height = orig_val
486        return f
487
488    @with_lock
489    @with_transaction_lock
490    @with_local_height_cached
491    def get_history(self, *, domain=None) -> Sequence[HistoryItem]:
492        # get domain
493        if domain is None:
494            domain = self.get_addresses()
495        domain = set(domain)
496        # 1. Get the history of each address in the domain, maintain the
497        #    delta of a tx as the sum of its deltas on domain addresses
498        tx_deltas = defaultdict(int)  # type: Dict[str, int]
499        for addr in domain:
500            h = self.get_address_history(addr)
501            for tx_hash, height in h:
502                tx_deltas[tx_hash] += self.get_tx_delta(tx_hash, addr)
503        # 2. create sorted history
504        history = []
505        for tx_hash in tx_deltas:
506            delta = tx_deltas[tx_hash]
507            tx_mined_status = self.get_tx_height(tx_hash)
508            fee = self.get_tx_fee(tx_hash)
509            history.append((tx_hash, tx_mined_status, delta, fee))
510        history.sort(key = lambda x: self.get_txpos(x[0]), reverse=True)
511        # 3. add balance
512        c, u, x = self.get_balance(domain)
513        balance = c + u + x
514        h2 = []
515        for tx_hash, tx_mined_status, delta, fee in history:
516            h2.append(HistoryItem(txid=tx_hash,
517                                  tx_mined_status=tx_mined_status,
518                                  delta=delta,
519                                  fee=fee,
520                                  balance=balance))
521            balance -= delta
522        h2.reverse()
523
524        if balance != 0:
525            raise Exception("wallet.get_history() failed balance sanity-check")
526
527        return h2
528
529    def _add_tx_to_local_history(self, txid):
530        with self.transaction_lock:
531            for addr in itertools.chain(self.db.get_txi_addresses(txid), self.db.get_txo_addresses(txid)):
532                cur_hist = self._history_local.get(addr, set())
533                cur_hist.add(txid)
534                self._history_local[addr] = cur_hist
535                self._mark_address_history_changed(addr)
536
537    def _remove_tx_from_local_history(self, txid):
538        with self.transaction_lock:
539            for addr in itertools.chain(self.db.get_txi_addresses(txid), self.db.get_txo_addresses(txid)):
540                cur_hist = self._history_local.get(addr, set())
541                try:
542                    cur_hist.remove(txid)
543                except KeyError:
544                    pass
545                else:
546                    self._history_local[addr] = cur_hist
547
548    def _mark_address_history_changed(self, addr: str) -> None:
549        # history for this address changed, wake up coroutines:
550        self._address_history_changed_events[addr].set()
551        # clear event immediately so that coroutines can wait() for the next change:
552        self._address_history_changed_events[addr].clear()
553
554    async def wait_for_address_history_to_change(self, addr: str) -> None:
555        """Wait until the server tells us about a new transaction related to addr.
556
557        Unconfirmed and confirmed transactions are not distinguished, and so e.g. SPV
558        is not taken into account.
559        """
560        assert self.is_mine(addr), "address needs to be is_mine to be watched"
561        await self._address_history_changed_events[addr].wait()
562
563    def add_unverified_tx(self, tx_hash, tx_height):
564        if self.db.is_in_verified_tx(tx_hash):
565            if tx_height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT):
566                with self.lock:
567                    self.db.remove_verified_tx(tx_hash)
568                if self.verifier:
569                    self.verifier.remove_spv_proof_for_tx(tx_hash)
570        else:
571            with self.lock:
572                # tx will be verified only if height > 0
573                self.unverified_tx[tx_hash] = tx_height
574
575    def remove_unverified_tx(self, tx_hash, tx_height):
576        with self.lock:
577            new_height = self.unverified_tx.get(tx_hash)
578            if new_height == tx_height:
579                self.unverified_tx.pop(tx_hash, None)
580
581    def add_verified_tx(self, tx_hash: str, info: TxMinedInfo):
582        # Remove from the unverified map and add to the verified map
583        with self.lock:
584            self.unverified_tx.pop(tx_hash, None)
585            self.db.add_verified_tx(tx_hash, info)
586        tx_mined_status = self.get_tx_height(tx_hash)
587        util.trigger_callback('verified', self, tx_hash, tx_mined_status)
588
589    def get_unverified_txs(self):
590        '''Returns a map from tx hash to transaction height'''
591        with self.lock:
592            return dict(self.unverified_tx)  # copy
593
594    def undo_verifications(self, blockchain: Blockchain, above_height: int) -> Set[str]:
595        '''Used by the verifier when a reorg has happened'''
596        txs = set()
597        with self.lock:
598            for tx_hash in self.db.list_verified_tx():
599                info = self.db.get_verified_tx(tx_hash)
600                tx_height = info.height
601                if tx_height > above_height:
602                    header = blockchain.read_header(tx_height)
603                    if not header or hash_header(header) != info.header_hash:
604                        self.db.remove_verified_tx(tx_hash)
605                        # NOTE: we should add these txns to self.unverified_tx,
606                        # but with what height?
607                        # If on the new fork after the reorg, the txn is at the
608                        # same height, we will not get a status update for the
609                        # address. If the txn is not mined or at a diff height,
610                        # we should get a status update. Unless we put tx into
611                        # unverified_tx, it will turn into local. So we put it
612                        # into unverified_tx with the old height, and if we get
613                        # a status update, that will overwrite it.
614                        self.unverified_tx[tx_hash] = tx_height
615                        txs.add(tx_hash)
616        return txs
617
618    def get_local_height(self) -> int:
619        """ return last known height if we are offline """
620        cached_local_height = getattr(self.threadlocal_cache, 'local_height', None)
621        if cached_local_height is not None:
622            return cached_local_height
623        return self.network.get_local_height() if self.network else self.db.get('stored_height', 0)
624
625    def add_future_tx(self, tx: Transaction, wanted_height: int) -> bool:
626        with self.lock:
627            tx_was_added = self.add_transaction(tx)
628            if tx_was_added:
629                self.future_tx[tx.txid()] = wanted_height
630            return tx_was_added
631
632    def get_tx_height(self, tx_hash: str) -> TxMinedInfo:
633        if tx_hash is None:  # ugly backwards compat...
634            return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0)
635        with self.lock:
636            verified_tx_mined_info = self.db.get_verified_tx(tx_hash)
637            if verified_tx_mined_info:
638                conf = max(self.get_local_height() - verified_tx_mined_info.height + 1, 0)
639                return verified_tx_mined_info._replace(conf=conf)
640            elif tx_hash in self.unverified_tx:
641                height = self.unverified_tx[tx_hash]
642                return TxMinedInfo(height=height, conf=0)
643            elif tx_hash in self.future_tx:
644                num_blocks_remainining = self.future_tx[tx_hash] - self.get_local_height()
645                if num_blocks_remainining > 0:
646                    return TxMinedInfo(height=TX_HEIGHT_FUTURE, conf=-num_blocks_remainining)
647                else:
648                    return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0)
649            else:
650                # local transaction
651                return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0)
652
653    def set_up_to_date(self, up_to_date):
654        with self.lock:
655            status_changed = self.up_to_date != up_to_date
656            self.up_to_date = up_to_date
657        if self.network:
658            self.network.notify('status')
659        if status_changed:
660            self.logger.info(f'set_up_to_date: {up_to_date}')
661
662    def is_up_to_date(self):
663        with self.lock: return self.up_to_date
664
665    def get_history_sync_state_details(self) -> Tuple[int, int]:
666        if self.synchronizer:
667            return self.synchronizer.num_requests_sent_and_answered()
668        else:
669            return 0, 0
670
671    @with_transaction_lock
672    def get_tx_delta(self, tx_hash: str, address: str) -> int:
673        """effect of tx on address"""
674        delta = 0
675        # subtract the value of coins sent from address
676        d = self.db.get_txi_addr(tx_hash, address)
677        for n, v in d:
678            delta -= v
679        # add the value of the coins received at address
680        d = self.db.get_txo_addr(tx_hash, address)
681        for n, (v, cb) in d.items():
682            delta += v
683        return delta
684
685    def get_wallet_delta(self, tx: Transaction) -> TxWalletDelta:
686        """effect of tx on wallet"""
687        is_relevant = False  # "related to wallet?"
688        num_input_ismine = 0
689        v_in = v_in_mine = v_out = v_out_mine = 0
690        with self.lock, self.transaction_lock:
691            for txin in tx.inputs():
692                addr = self.get_txin_address(txin)
693                value = self.get_txin_value(txin, address=addr)
694                if self.is_mine(addr):
695                    num_input_ismine += 1
696                    is_relevant = True
697                    assert value is not None
698                    v_in_mine += value
699                if value is None:
700                    v_in = None
701                elif v_in is not None:
702                    v_in += value
703            for txout in tx.outputs():
704                v_out += txout.value
705                if self.is_mine(txout.address):
706                    v_out_mine += txout.value
707                    is_relevant = True
708        delta = v_out_mine - v_in_mine
709        if v_in is not None:
710            fee = v_in - v_out
711        else:
712            fee = None
713        if fee is None and isinstance(tx, PartialTransaction):
714            fee = tx.get_fee()
715        return TxWalletDelta(
716            is_relevant=is_relevant,
717            is_any_input_ismine=num_input_ismine > 0,
718            is_all_input_ismine=num_input_ismine == len(tx.inputs()),
719            delta=delta,
720            fee=fee,
721        )
722
723    def get_tx_fee(self, txid: str) -> Optional[int]:
724        """ Returns tx_fee or None. Use server fee only if tx is unconfirmed and not mine"""
725        # check if stored fee is available
726        fee = self.db.get_tx_fee(txid, trust_server=False)
727        if fee is not None:
728            return fee
729        # delete server-sent fee for confirmed txns
730        confirmed = self.get_tx_height(txid).conf > 0
731        if confirmed:
732            self.db.add_tx_fee_from_server(txid, None)
733        # if all inputs are ismine, try to calc fee now;
734        # otherwise, return stored value
735        num_all_inputs = self.db.get_num_all_inputs_of_tx(txid)
736        if num_all_inputs is not None:
737            # check if tx is mine
738            num_ismine_inputs = self.db.get_num_ismine_inputs_of_tx(txid)
739            assert num_ismine_inputs <= num_all_inputs, (num_ismine_inputs, num_all_inputs)
740            # trust server if tx is unconfirmed and not mine
741            if num_ismine_inputs < num_all_inputs:
742                return None if confirmed else self.db.get_tx_fee(txid, trust_server=True)
743        # lookup tx and deserialize it.
744        # note that deserializing is expensive, hence above hacks
745        tx = self.db.get_transaction(txid)
746        if not tx:
747            return None
748        fee = self.get_wallet_delta(tx).fee
749        # save result
750        self.db.add_tx_fee_we_calculated(txid, fee)
751        self.db.add_num_inputs_to_tx(txid, len(tx.inputs()))
752        return fee
753
754    def get_addr_io(self, address):
755        with self.lock, self.transaction_lock:
756            h = self.get_address_history(address)
757            received = {}
758            sent = {}
759            for tx_hash, height in h:
760                d = self.db.get_txo_addr(tx_hash, address)
761                for n, (v, is_cb) in d.items():
762                    received[tx_hash + ':%d'%n] = (height, v, is_cb)
763            for tx_hash, height in h:
764                l = self.db.get_txi_addr(tx_hash, address)
765                for txi, v in l:
766                    sent[txi] = height
767        return received, sent
768
769
770    def get_addr_outputs(self, address: str) -> Dict[TxOutpoint, PartialTxInput]:
771        coins, spent = self.get_addr_io(address)
772        out = {}
773        for prevout_str, v in coins.items():
774            tx_height, value, is_cb = v
775            prevout = TxOutpoint.from_str(prevout_str)
776            utxo = PartialTxInput(prevout=prevout, is_coinbase_output=is_cb)
777            utxo._trusted_address = address
778            utxo._trusted_value_sats = value
779            utxo.block_height = tx_height
780            utxo.spent_height = spent.get(prevout_str, None)
781            out[prevout] = utxo
782        return out
783
784    def get_addr_utxo(self, address: str) -> Dict[TxOutpoint, PartialTxInput]:
785        out = self.get_addr_outputs(address)
786        for k, v in list(out.items()):
787            if v.spent_height is not None:
788                out.pop(k)
789        return out
790
791    # return the total amount ever received by an address
792    def get_addr_received(self, address):
793        received, sent = self.get_addr_io(address)
794        return sum([v for height, v, is_cb in received.values()])
795
796    @with_local_height_cached
797    def get_addr_balance(self, address, *, excluded_coins: Set[str] = None) -> Tuple[int, int, int]:
798        """Return the balance of a bitcoin address:
799        confirmed and matured, unconfirmed, unmatured
800        """
801        if not excluded_coins:  # cache is only used if there are no excluded_coins
802            cached_value = self._get_addr_balance_cache.get(address)
803            if cached_value:
804                return cached_value
805        if excluded_coins is None:
806            excluded_coins = set()
807        assert isinstance(excluded_coins, set), f"excluded_coins should be set, not {type(excluded_coins)}"
808        received, sent = self.get_addr_io(address)
809        c = u = x = 0
810        mempool_height = self.get_local_height() + 1  # height of next block
811        for txo, (tx_height, v, is_cb) in received.items():
812            if txo in excluded_coins:
813                continue
814            if is_cb and tx_height + COINBASE_MATURITY > mempool_height:
815                x += v
816            elif tx_height > 0:
817                c += v
818            else:
819                u += v
820            if txo in sent:
821                if sent[txo] > 0:
822                    c -= v
823                else:
824                    u -= v
825        result = c, u, x
826        # cache result.
827        if not excluded_coins:
828            # Cache needs to be invalidated if a transaction is added to/
829            # removed from history; or on new blocks (maturity...)
830            self._get_addr_balance_cache[address] = result
831        return result
832
833    @with_local_height_cached
834    def get_utxos(
835            self,
836            domain=None,
837            *,
838            excluded_addresses=None,
839            mature_only: bool = False,
840            confirmed_funding_only: bool = False,
841            confirmed_spending_only: bool = False,
842            nonlocal_only: bool = False,
843            block_height: int = None,
844    ) -> Sequence[PartialTxInput]:
845        if block_height is not None:
846            # caller wants the UTXOs we had at a given height; check other parameters
847            assert confirmed_funding_only
848            assert confirmed_spending_only
849            assert nonlocal_only
850        else:
851            block_height = self.get_local_height()
852        coins = []
853        if domain is None:
854            domain = self.get_addresses()
855        domain = set(domain)
856        if excluded_addresses:
857            domain = set(domain) - set(excluded_addresses)
858        mempool_height = block_height + 1  # height of next block
859        for addr in domain:
860            txos = self.get_addr_outputs(addr)
861            for txo in txos.values():
862                if txo.spent_height is not None:
863                    if not confirmed_spending_only:
864                        continue
865                    if confirmed_spending_only and 0 < txo.spent_height <= block_height:
866                        continue
867                if confirmed_funding_only and not (0 < txo.block_height <= block_height):
868                    continue
869                if nonlocal_only and txo.block_height in (TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE):
870                    continue
871                if (mature_only and txo.is_coinbase_output()
872                        and txo.block_height + COINBASE_MATURITY > mempool_height):
873                    continue
874                coins.append(txo)
875                continue
876        return coins
877
878    def get_balance(self, domain=None, *, excluded_addresses: Set[str] = None,
879                    excluded_coins: Set[str] = None) -> Tuple[int, int, int]:
880        if domain is None:
881            domain = self.get_addresses()
882        if excluded_addresses is None:
883            excluded_addresses = set()
884        assert isinstance(excluded_addresses, set), f"excluded_addresses should be set, not {type(excluded_addresses)}"
885        domain = set(domain) - excluded_addresses
886        cc = uu = xx = 0
887        for addr in domain:
888            c, u, x = self.get_addr_balance(addr, excluded_coins=excluded_coins)
889            cc += c
890            uu += u
891            xx += x
892        return cc, uu, xx
893
894    def is_used(self, address: str) -> bool:
895        return self.get_address_history_len(address) != 0
896
897    def is_empty(self, address: str) -> bool:
898        c, u, x = self.get_addr_balance(address)
899        return c+u+x == 0
900
901    def synchronize(self):
902        pass
903