1# Electrum - lightweight Bitcoin client 2# Copyright (C) 2018 The Electrum Developers 3# 4# Permission is hereby granted, free of charge, to any person 5# obtaining a copy of this software and associated documentation files 6# (the "Software"), to deal in the Software without restriction, 7# including without limitation the rights to use, copy, modify, merge, 8# publish, distribute, sublicense, and/or sell copies of the Software, 9# and to permit persons to whom the Software is furnished to do so, 10# subject to the following conditions: 11# 12# The above copyright notice and this permission notice shall be 13# included in all copies or substantial portions of the Software. 14# 15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 16# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 17# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 18# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 19# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 20# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 21# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 22# SOFTWARE. 23 24import asyncio 25import threading 26import asyncio 27import itertools 28from collections import defaultdict 29from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple, NamedTuple, Sequence, List 30 31from aiorpcx import TaskGroup 32 33from . import bitcoin, util 34from .bitcoin import COINBASE_MATURITY 35from .util import profiler, bfh, TxMinedInfo, UnrelatedTransactionException, with_lock 36from .transaction import Transaction, TxOutput, TxInput, PartialTxInput, TxOutpoint, PartialTransaction 37from .synchronizer import Synchronizer 38from .verifier import SPV 39from .blockchain import hash_header, Blockchain 40from .i18n import _ 41from .logging import Logger 42 43if TYPE_CHECKING: 44 from .network import Network 45 from .wallet_db import WalletDB 46 47 48TX_HEIGHT_FUTURE = -3 49TX_HEIGHT_LOCAL = -2 50TX_HEIGHT_UNCONF_PARENT = -1 51TX_HEIGHT_UNCONFIRMED = 0 52 53 54class HistoryItem(NamedTuple): 55 txid: str 56 tx_mined_status: TxMinedInfo 57 delta: int 58 fee: Optional[int] 59 balance: int 60 61 62class TxWalletDelta(NamedTuple): 63 is_relevant: bool # "related to wallet?" 64 is_any_input_ismine: bool 65 is_all_input_ismine: bool 66 delta: int 67 fee: Optional[int] 68 69 70class AddressSynchronizer(Logger): 71 """ 72 inherited by wallet 73 """ 74 75 network: Optional['Network'] 76 synchronizer: Optional['Synchronizer'] 77 verifier: Optional['SPV'] 78 79 def __init__(self, db: 'WalletDB'): 80 self.db = db 81 self.network = None 82 Logger.__init__(self) 83 # verifier (SPV) and synchronizer are started in start_network 84 self.synchronizer = None 85 self.verifier = None 86 # locks: if you need to take multiple ones, acquire them in the order they are defined here! 87 self.lock = threading.RLock() 88 self.transaction_lock = threading.RLock() 89 self.future_tx = {} # type: Dict[str, int] # txid -> wanted height 90 # Transactions pending verification. txid -> tx_height. Access with self.lock. 91 self.unverified_tx = defaultdict(int) 92 # true when synchronized 93 self.up_to_date = False 94 # thread local storage for caching stuff 95 self.threadlocal_cache = threading.local() 96 97 self._get_addr_balance_cache = {} 98 99 self.load_and_cleanup() 100 101 def with_transaction_lock(func): 102 def func_wrapper(self: 'AddressSynchronizer', *args, **kwargs): 103 with self.transaction_lock: 104 return func(self, *args, **kwargs) 105 return func_wrapper 106 107 def load_and_cleanup(self): 108 self.load_local_history() 109 self.check_history() 110 self.load_unverified_transactions() 111 self.remove_local_transactions_we_dont_have() 112 113 def is_mine(self, address: Optional[str]) -> bool: 114 if not address: return False 115 return self.db.is_addr_in_history(address) 116 117 def get_addresses(self): 118 return sorted(self.db.get_history()) 119 120 def get_address_history(self, addr: str) -> Sequence[Tuple[str, int]]: 121 """Returns the history for the address, in the format that would be returned by a server. 122 123 Note: The difference between db.get_addr_history and this method is that 124 db.get_addr_history stores the response from a server, so it only includes txns 125 a server sees, i.e. that does not contain local and future txns. 126 """ 127 h = [] 128 # we need self.transaction_lock but get_tx_height will take self.lock 129 # so we need to take that too here, to enforce order of locks 130 with self.lock, self.transaction_lock: 131 related_txns = self._history_local.get(addr, set()) 132 for tx_hash in related_txns: 133 tx_height = self.get_tx_height(tx_hash).height 134 h.append((tx_hash, tx_height)) 135 return h 136 137 def get_address_history_len(self, addr: str) -> int: 138 """Return number of transactions where address is involved.""" 139 return len(self._history_local.get(addr, ())) 140 141 def get_txin_address(self, txin: TxInput) -> Optional[str]: 142 if isinstance(txin, PartialTxInput): 143 if txin.address: 144 return txin.address 145 prevout_hash = txin.prevout.txid.hex() 146 prevout_n = txin.prevout.out_idx 147 for addr in self.db.get_txo_addresses(prevout_hash): 148 d = self.db.get_txo_addr(prevout_hash, addr) 149 if prevout_n in d: 150 return addr 151 tx = self.db.get_transaction(prevout_hash) 152 if tx: 153 return tx.outputs()[prevout_n].address 154 return None 155 156 def get_txin_value(self, txin: TxInput, *, address: str = None) -> Optional[int]: 157 if txin.value_sats() is not None: 158 return txin.value_sats() 159 prevout_hash = txin.prevout.txid.hex() 160 prevout_n = txin.prevout.out_idx 161 if address is None: 162 address = self.get_txin_address(txin) 163 if address: 164 d = self.db.get_txo_addr(prevout_hash, address) 165 try: 166 v, cb = d[prevout_n] 167 return v 168 except KeyError: 169 pass 170 tx = self.db.get_transaction(prevout_hash) 171 if tx: 172 return tx.outputs()[prevout_n].value 173 return None 174 175 def load_unverified_transactions(self): 176 # review transactions that are in the history 177 for addr in self.db.get_history(): 178 hist = self.db.get_addr_history(addr) 179 for tx_hash, tx_height in hist: 180 # add it in case it was previously unconfirmed 181 self.add_unverified_tx(tx_hash, tx_height) 182 183 def start_network(self, network: Optional['Network']) -> None: 184 self.network = network 185 if self.network is not None: 186 self.synchronizer = Synchronizer(self) 187 self.verifier = SPV(self.network, self) 188 util.register_callback(self.on_blockchain_updated, ['blockchain_updated']) 189 190 def on_blockchain_updated(self, event, *args): 191 self._get_addr_balance_cache = {} # invalidate cache 192 193 async def stop(self): 194 if self.network: 195 try: 196 async with TaskGroup() as group: 197 if self.synchronizer: 198 await group.spawn(self.synchronizer.stop()) 199 if self.verifier: 200 await group.spawn(self.verifier.stop()) 201 finally: # even if we get cancelled 202 self.synchronizer = None 203 self.verifier = None 204 util.unregister_callback(self.on_blockchain_updated) 205 self.db.put('stored_height', self.get_local_height()) 206 207 def add_address(self, address): 208 if not self.db.get_addr_history(address): 209 self.db.history[address] = [] 210 self.set_up_to_date(False) 211 if self.synchronizer: 212 self.synchronizer.add(address) 213 214 def get_conflicting_transactions(self, tx_hash, tx: Transaction, include_self=False): 215 """Returns a set of transaction hashes from the wallet history that are 216 directly conflicting with tx, i.e. they have common outpoints being 217 spent with tx. 218 219 include_self specifies whether the tx itself should be reported as a 220 conflict (if already in wallet history) 221 """ 222 conflicting_txns = set() 223 with self.transaction_lock: 224 for txin in tx.inputs(): 225 if txin.is_coinbase_input(): 226 continue 227 prevout_hash = txin.prevout.txid.hex() 228 prevout_n = txin.prevout.out_idx 229 spending_tx_hash = self.db.get_spent_outpoint(prevout_hash, prevout_n) 230 if spending_tx_hash is None: 231 continue 232 # this outpoint has already been spent, by spending_tx 233 # annoying assert that has revealed several bugs over time: 234 assert self.db.get_transaction(spending_tx_hash), "spending tx not in wallet db" 235 conflicting_txns |= {spending_tx_hash} 236 if tx_hash in conflicting_txns: 237 # this tx is already in history, so it conflicts with itself 238 if len(conflicting_txns) > 1: 239 raise Exception('Found conflicting transactions already in wallet history.') 240 if not include_self: 241 conflicting_txns -= {tx_hash} 242 return conflicting_txns 243 244 def add_transaction(self, tx: Transaction, *, allow_unrelated=False) -> bool: 245 """ 246 Returns whether the tx was successfully added to the wallet history. 247 Note that a transaction may need to be added several times, if our 248 list of addresses has increased. This will return True even if the 249 transaction was already in self.db. 250 """ 251 assert tx, tx 252 # note: tx.is_complete() is not necessarily True; tx might be partial 253 # but it *needs* to have a txid: 254 tx_hash = tx.txid() 255 if tx_hash is None: 256 raise Exception("cannot add tx without txid to wallet history") 257 # we need self.transaction_lock but get_tx_height will take self.lock 258 # so we need to take that too here, to enforce order of locks 259 with self.lock, self.transaction_lock: 260 # NOTE: returning if tx in self.transactions might seem like a good idea 261 # BUT we track is_mine inputs in a txn, and during subsequent calls 262 # of add_transaction tx, we might learn of more-and-more inputs of 263 # being is_mine, as we roll the gap_limit forward 264 is_coinbase = tx.inputs()[0].is_coinbase_input() 265 tx_height = self.get_tx_height(tx_hash).height 266 if not allow_unrelated: 267 # note that during sync, if the transactions are not properly sorted, 268 # it could happen that we think tx is unrelated but actually one of the inputs is is_mine. 269 # this is the main motivation for allow_unrelated 270 is_mine = any([self.is_mine(self.get_txin_address(txin)) for txin in tx.inputs()]) 271 is_for_me = any([self.is_mine(txo.address) for txo in tx.outputs()]) 272 if not is_mine and not is_for_me: 273 raise UnrelatedTransactionException() 274 # Find all conflicting transactions. 275 # In case of a conflict, 276 # 1. confirmed > mempool > local 277 # 2. this new txn has priority over existing ones 278 # When this method exits, there must NOT be any conflict, so 279 # either keep this txn and remove all conflicting (along with dependencies) 280 # or drop this txn 281 conflicting_txns = self.get_conflicting_transactions(tx_hash, tx) 282 if conflicting_txns: 283 existing_mempool_txn = any( 284 self.get_tx_height(tx_hash2).height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT) 285 for tx_hash2 in conflicting_txns) 286 existing_confirmed_txn = any( 287 self.get_tx_height(tx_hash2).height > 0 288 for tx_hash2 in conflicting_txns) 289 if existing_confirmed_txn and tx_height <= 0: 290 # this is a non-confirmed tx that conflicts with confirmed txns; drop. 291 return False 292 if existing_mempool_txn and tx_height == TX_HEIGHT_LOCAL: 293 # this is a local tx that conflicts with non-local txns; drop. 294 return False 295 # keep this txn and remove all conflicting 296 for tx_hash2 in conflicting_txns: 297 self.remove_transaction(tx_hash2) 298 # add inputs 299 def add_value_from_prev_output(): 300 # note: this takes linear time in num is_mine outputs of prev_tx 301 addr = self.get_txin_address(txi) 302 if addr and self.is_mine(addr): 303 outputs = self.db.get_txo_addr(prevout_hash, addr) 304 try: 305 v, is_cb = outputs[prevout_n] 306 except KeyError: 307 pass 308 else: 309 self.db.add_txi_addr(tx_hash, addr, ser, v) 310 self._get_addr_balance_cache.pop(addr, None) # invalidate cache 311 for txi in tx.inputs(): 312 if txi.is_coinbase_input(): 313 continue 314 prevout_hash = txi.prevout.txid.hex() 315 prevout_n = txi.prevout.out_idx 316 ser = txi.prevout.to_str() 317 self.db.set_spent_outpoint(prevout_hash, prevout_n, tx_hash) 318 add_value_from_prev_output() 319 # add outputs 320 for n, txo in enumerate(tx.outputs()): 321 v = txo.value 322 ser = tx_hash + ':%d'%n 323 scripthash = bitcoin.script_to_scripthash(txo.scriptpubkey.hex()) 324 self.db.add_prevout_by_scripthash(scripthash, prevout=TxOutpoint.from_str(ser), value=v) 325 addr = txo.address 326 if addr and self.is_mine(addr): 327 self.db.add_txo_addr(tx_hash, addr, n, v, is_coinbase) 328 self._get_addr_balance_cache.pop(addr, None) # invalidate cache 329 # give v to txi that spends me 330 next_tx = self.db.get_spent_outpoint(tx_hash, n) 331 if next_tx is not None: 332 self.db.add_txi_addr(next_tx, addr, ser, v) 333 self._add_tx_to_local_history(next_tx) 334 # add to local history 335 self._add_tx_to_local_history(tx_hash) 336 # save 337 self.db.add_transaction(tx_hash, tx) 338 self.db.add_num_inputs_to_tx(tx_hash, len(tx.inputs())) 339 return True 340 341 def remove_transaction(self, tx_hash: str) -> None: 342 """Removes a transaction AND all its dependents/children 343 from the wallet history. 344 """ 345 with self.lock, self.transaction_lock: 346 to_remove = {tx_hash} 347 to_remove |= self.get_depending_transactions(tx_hash) 348 for txid in to_remove: 349 self._remove_transaction(txid) 350 351 def _remove_transaction(self, tx_hash: str) -> None: 352 """Removes a single transaction from the wallet history, and attempts 353 to undo all effects of the tx (spending inputs, creating outputs, etc). 354 """ 355 def remove_from_spent_outpoints(): 356 # undo spends in spent_outpoints 357 if tx is not None: 358 # if we have the tx, this branch is faster 359 for txin in tx.inputs(): 360 if txin.is_coinbase_input(): 361 continue 362 prevout_hash = txin.prevout.txid.hex() 363 prevout_n = txin.prevout.out_idx 364 self.db.remove_spent_outpoint(prevout_hash, prevout_n) 365 else: 366 # expensive but always works 367 for prevout_hash, prevout_n in self.db.list_spent_outpoints(): 368 spending_txid = self.db.get_spent_outpoint(prevout_hash, prevout_n) 369 if spending_txid == tx_hash: 370 self.db.remove_spent_outpoint(prevout_hash, prevout_n) 371 372 with self.lock, self.transaction_lock: 373 self.logger.info(f"removing tx from history {tx_hash}") 374 tx = self.db.remove_transaction(tx_hash) 375 remove_from_spent_outpoints() 376 self._remove_tx_from_local_history(tx_hash) 377 for addr in itertools.chain(self.db.get_txi_addresses(tx_hash), self.db.get_txo_addresses(tx_hash)): 378 self._get_addr_balance_cache.pop(addr, None) # invalidate cache 379 self.db.remove_txi(tx_hash) 380 self.db.remove_txo(tx_hash) 381 self.db.remove_tx_fee(tx_hash) 382 self.db.remove_verified_tx(tx_hash) 383 self.unverified_tx.pop(tx_hash, None) 384 if tx: 385 for idx, txo in enumerate(tx.outputs()): 386 scripthash = bitcoin.script_to_scripthash(txo.scriptpubkey.hex()) 387 prevout = TxOutpoint(bfh(tx_hash), idx) 388 self.db.remove_prevout_by_scripthash(scripthash, prevout=prevout, value=txo.value) 389 390 def get_depending_transactions(self, tx_hash: str) -> Set[str]: 391 """Returns all (grand-)children of tx_hash in this wallet.""" 392 with self.transaction_lock: 393 children = set() 394 for n in self.db.get_spent_outpoints(tx_hash): 395 other_hash = self.db.get_spent_outpoint(tx_hash, n) 396 children.add(other_hash) 397 children |= self.get_depending_transactions(other_hash) 398 return children 399 400 def receive_tx_callback(self, tx_hash: str, tx: Transaction, tx_height: int) -> None: 401 self.add_unverified_tx(tx_hash, tx_height) 402 self.add_transaction(tx, allow_unrelated=True) 403 404 def receive_history_callback(self, addr: str, hist, tx_fees: Dict[str, int]): 405 with self.lock: 406 old_hist = self.get_address_history(addr) 407 for tx_hash, height in old_hist: 408 if (tx_hash, height) not in hist: 409 # make tx local 410 self.unverified_tx.pop(tx_hash, None) 411 self.db.remove_verified_tx(tx_hash) 412 if self.verifier: 413 self.verifier.remove_spv_proof_for_tx(tx_hash) 414 self.db.set_addr_history(addr, hist) 415 416 for tx_hash, tx_height in hist: 417 # add it in case it was previously unconfirmed 418 self.add_unverified_tx(tx_hash, tx_height) 419 # if addr is new, we have to recompute txi and txo 420 tx = self.db.get_transaction(tx_hash) 421 if tx is None: 422 continue 423 self.add_transaction(tx, allow_unrelated=True) 424 425 # Store fees 426 for tx_hash, fee_sat in tx_fees.items(): 427 self.db.add_tx_fee_from_server(tx_hash, fee_sat) 428 429 @profiler 430 def load_local_history(self): 431 self._history_local = {} # type: Dict[str, Set[str]] # address -> set(txid) 432 self._address_history_changed_events = defaultdict(asyncio.Event) # address -> Event 433 for txid in itertools.chain(self.db.list_txi(), self.db.list_txo()): 434 self._add_tx_to_local_history(txid) 435 436 @profiler 437 def check_history(self): 438 hist_addrs_mine = list(filter(lambda k: self.is_mine(k), self.db.get_history())) 439 hist_addrs_not_mine = list(filter(lambda k: not self.is_mine(k), self.db.get_history())) 440 for addr in hist_addrs_not_mine: 441 self.db.remove_addr_history(addr) 442 for addr in hist_addrs_mine: 443 hist = self.db.get_addr_history(addr) 444 for tx_hash, tx_height in hist: 445 if self.db.get_txi_addresses(tx_hash) or self.db.get_txo_addresses(tx_hash): 446 continue 447 tx = self.db.get_transaction(tx_hash) 448 if tx is not None: 449 self.add_transaction(tx, allow_unrelated=True) 450 451 def remove_local_transactions_we_dont_have(self): 452 for txid in itertools.chain(self.db.list_txi(), self.db.list_txo()): 453 tx_height = self.get_tx_height(txid).height 454 if tx_height == TX_HEIGHT_LOCAL and not self.db.get_transaction(txid): 455 self.remove_transaction(txid) 456 457 def clear_history(self): 458 with self.lock: 459 with self.transaction_lock: 460 self.db.clear_history() 461 self._history_local.clear() 462 self._get_addr_balance_cache = {} # invalidate cache 463 464 def get_txpos(self, tx_hash): 465 """Returns (height, txpos) tuple, even if the tx is unverified.""" 466 with self.lock: 467 verified_tx_mined_info = self.db.get_verified_tx(tx_hash) 468 if verified_tx_mined_info: 469 return verified_tx_mined_info.height, verified_tx_mined_info.txpos 470 elif tx_hash in self.unverified_tx: 471 height = self.unverified_tx[tx_hash] 472 return (height, -1) if height > 0 else ((1e9 - height), -1) 473 else: 474 return (1e9+1, -1) 475 476 def with_local_height_cached(func): 477 # get local height only once, as it's relatively expensive. 478 # take care that nested calls work as expected 479 def f(self, *args, **kwargs): 480 orig_val = getattr(self.threadlocal_cache, 'local_height', None) 481 self.threadlocal_cache.local_height = orig_val or self.get_local_height() 482 try: 483 return func(self, *args, **kwargs) 484 finally: 485 self.threadlocal_cache.local_height = orig_val 486 return f 487 488 @with_lock 489 @with_transaction_lock 490 @with_local_height_cached 491 def get_history(self, *, domain=None) -> Sequence[HistoryItem]: 492 # get domain 493 if domain is None: 494 domain = self.get_addresses() 495 domain = set(domain) 496 # 1. Get the history of each address in the domain, maintain the 497 # delta of a tx as the sum of its deltas on domain addresses 498 tx_deltas = defaultdict(int) # type: Dict[str, int] 499 for addr in domain: 500 h = self.get_address_history(addr) 501 for tx_hash, height in h: 502 tx_deltas[tx_hash] += self.get_tx_delta(tx_hash, addr) 503 # 2. create sorted history 504 history = [] 505 for tx_hash in tx_deltas: 506 delta = tx_deltas[tx_hash] 507 tx_mined_status = self.get_tx_height(tx_hash) 508 fee = self.get_tx_fee(tx_hash) 509 history.append((tx_hash, tx_mined_status, delta, fee)) 510 history.sort(key = lambda x: self.get_txpos(x[0]), reverse=True) 511 # 3. add balance 512 c, u, x = self.get_balance(domain) 513 balance = c + u + x 514 h2 = [] 515 for tx_hash, tx_mined_status, delta, fee in history: 516 h2.append(HistoryItem(txid=tx_hash, 517 tx_mined_status=tx_mined_status, 518 delta=delta, 519 fee=fee, 520 balance=balance)) 521 balance -= delta 522 h2.reverse() 523 524 if balance != 0: 525 raise Exception("wallet.get_history() failed balance sanity-check") 526 527 return h2 528 529 def _add_tx_to_local_history(self, txid): 530 with self.transaction_lock: 531 for addr in itertools.chain(self.db.get_txi_addresses(txid), self.db.get_txo_addresses(txid)): 532 cur_hist = self._history_local.get(addr, set()) 533 cur_hist.add(txid) 534 self._history_local[addr] = cur_hist 535 self._mark_address_history_changed(addr) 536 537 def _remove_tx_from_local_history(self, txid): 538 with self.transaction_lock: 539 for addr in itertools.chain(self.db.get_txi_addresses(txid), self.db.get_txo_addresses(txid)): 540 cur_hist = self._history_local.get(addr, set()) 541 try: 542 cur_hist.remove(txid) 543 except KeyError: 544 pass 545 else: 546 self._history_local[addr] = cur_hist 547 548 def _mark_address_history_changed(self, addr: str) -> None: 549 # history for this address changed, wake up coroutines: 550 self._address_history_changed_events[addr].set() 551 # clear event immediately so that coroutines can wait() for the next change: 552 self._address_history_changed_events[addr].clear() 553 554 async def wait_for_address_history_to_change(self, addr: str) -> None: 555 """Wait until the server tells us about a new transaction related to addr. 556 557 Unconfirmed and confirmed transactions are not distinguished, and so e.g. SPV 558 is not taken into account. 559 """ 560 assert self.is_mine(addr), "address needs to be is_mine to be watched" 561 await self._address_history_changed_events[addr].wait() 562 563 def add_unverified_tx(self, tx_hash, tx_height): 564 if self.db.is_in_verified_tx(tx_hash): 565 if tx_height in (TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT): 566 with self.lock: 567 self.db.remove_verified_tx(tx_hash) 568 if self.verifier: 569 self.verifier.remove_spv_proof_for_tx(tx_hash) 570 else: 571 with self.lock: 572 # tx will be verified only if height > 0 573 self.unverified_tx[tx_hash] = tx_height 574 575 def remove_unverified_tx(self, tx_hash, tx_height): 576 with self.lock: 577 new_height = self.unverified_tx.get(tx_hash) 578 if new_height == tx_height: 579 self.unverified_tx.pop(tx_hash, None) 580 581 def add_verified_tx(self, tx_hash: str, info: TxMinedInfo): 582 # Remove from the unverified map and add to the verified map 583 with self.lock: 584 self.unverified_tx.pop(tx_hash, None) 585 self.db.add_verified_tx(tx_hash, info) 586 tx_mined_status = self.get_tx_height(tx_hash) 587 util.trigger_callback('verified', self, tx_hash, tx_mined_status) 588 589 def get_unverified_txs(self): 590 '''Returns a map from tx hash to transaction height''' 591 with self.lock: 592 return dict(self.unverified_tx) # copy 593 594 def undo_verifications(self, blockchain: Blockchain, above_height: int) -> Set[str]: 595 '''Used by the verifier when a reorg has happened''' 596 txs = set() 597 with self.lock: 598 for tx_hash in self.db.list_verified_tx(): 599 info = self.db.get_verified_tx(tx_hash) 600 tx_height = info.height 601 if tx_height > above_height: 602 header = blockchain.read_header(tx_height) 603 if not header or hash_header(header) != info.header_hash: 604 self.db.remove_verified_tx(tx_hash) 605 # NOTE: we should add these txns to self.unverified_tx, 606 # but with what height? 607 # If on the new fork after the reorg, the txn is at the 608 # same height, we will not get a status update for the 609 # address. If the txn is not mined or at a diff height, 610 # we should get a status update. Unless we put tx into 611 # unverified_tx, it will turn into local. So we put it 612 # into unverified_tx with the old height, and if we get 613 # a status update, that will overwrite it. 614 self.unverified_tx[tx_hash] = tx_height 615 txs.add(tx_hash) 616 return txs 617 618 def get_local_height(self) -> int: 619 """ return last known height if we are offline """ 620 cached_local_height = getattr(self.threadlocal_cache, 'local_height', None) 621 if cached_local_height is not None: 622 return cached_local_height 623 return self.network.get_local_height() if self.network else self.db.get('stored_height', 0) 624 625 def add_future_tx(self, tx: Transaction, wanted_height: int) -> bool: 626 with self.lock: 627 tx_was_added = self.add_transaction(tx) 628 if tx_was_added: 629 self.future_tx[tx.txid()] = wanted_height 630 return tx_was_added 631 632 def get_tx_height(self, tx_hash: str) -> TxMinedInfo: 633 if tx_hash is None: # ugly backwards compat... 634 return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0) 635 with self.lock: 636 verified_tx_mined_info = self.db.get_verified_tx(tx_hash) 637 if verified_tx_mined_info: 638 conf = max(self.get_local_height() - verified_tx_mined_info.height + 1, 0) 639 return verified_tx_mined_info._replace(conf=conf) 640 elif tx_hash in self.unverified_tx: 641 height = self.unverified_tx[tx_hash] 642 return TxMinedInfo(height=height, conf=0) 643 elif tx_hash in self.future_tx: 644 num_blocks_remainining = self.future_tx[tx_hash] - self.get_local_height() 645 if num_blocks_remainining > 0: 646 return TxMinedInfo(height=TX_HEIGHT_FUTURE, conf=-num_blocks_remainining) 647 else: 648 return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0) 649 else: 650 # local transaction 651 return TxMinedInfo(height=TX_HEIGHT_LOCAL, conf=0) 652 653 def set_up_to_date(self, up_to_date): 654 with self.lock: 655 status_changed = self.up_to_date != up_to_date 656 self.up_to_date = up_to_date 657 if self.network: 658 self.network.notify('status') 659 if status_changed: 660 self.logger.info(f'set_up_to_date: {up_to_date}') 661 662 def is_up_to_date(self): 663 with self.lock: return self.up_to_date 664 665 def get_history_sync_state_details(self) -> Tuple[int, int]: 666 if self.synchronizer: 667 return self.synchronizer.num_requests_sent_and_answered() 668 else: 669 return 0, 0 670 671 @with_transaction_lock 672 def get_tx_delta(self, tx_hash: str, address: str) -> int: 673 """effect of tx on address""" 674 delta = 0 675 # subtract the value of coins sent from address 676 d = self.db.get_txi_addr(tx_hash, address) 677 for n, v in d: 678 delta -= v 679 # add the value of the coins received at address 680 d = self.db.get_txo_addr(tx_hash, address) 681 for n, (v, cb) in d.items(): 682 delta += v 683 return delta 684 685 def get_wallet_delta(self, tx: Transaction) -> TxWalletDelta: 686 """effect of tx on wallet""" 687 is_relevant = False # "related to wallet?" 688 num_input_ismine = 0 689 v_in = v_in_mine = v_out = v_out_mine = 0 690 with self.lock, self.transaction_lock: 691 for txin in tx.inputs(): 692 addr = self.get_txin_address(txin) 693 value = self.get_txin_value(txin, address=addr) 694 if self.is_mine(addr): 695 num_input_ismine += 1 696 is_relevant = True 697 assert value is not None 698 v_in_mine += value 699 if value is None: 700 v_in = None 701 elif v_in is not None: 702 v_in += value 703 for txout in tx.outputs(): 704 v_out += txout.value 705 if self.is_mine(txout.address): 706 v_out_mine += txout.value 707 is_relevant = True 708 delta = v_out_mine - v_in_mine 709 if v_in is not None: 710 fee = v_in - v_out 711 else: 712 fee = None 713 if fee is None and isinstance(tx, PartialTransaction): 714 fee = tx.get_fee() 715 return TxWalletDelta( 716 is_relevant=is_relevant, 717 is_any_input_ismine=num_input_ismine > 0, 718 is_all_input_ismine=num_input_ismine == len(tx.inputs()), 719 delta=delta, 720 fee=fee, 721 ) 722 723 def get_tx_fee(self, txid: str) -> Optional[int]: 724 """ Returns tx_fee or None. Use server fee only if tx is unconfirmed and not mine""" 725 # check if stored fee is available 726 fee = self.db.get_tx_fee(txid, trust_server=False) 727 if fee is not None: 728 return fee 729 # delete server-sent fee for confirmed txns 730 confirmed = self.get_tx_height(txid).conf > 0 731 if confirmed: 732 self.db.add_tx_fee_from_server(txid, None) 733 # if all inputs are ismine, try to calc fee now; 734 # otherwise, return stored value 735 num_all_inputs = self.db.get_num_all_inputs_of_tx(txid) 736 if num_all_inputs is not None: 737 # check if tx is mine 738 num_ismine_inputs = self.db.get_num_ismine_inputs_of_tx(txid) 739 assert num_ismine_inputs <= num_all_inputs, (num_ismine_inputs, num_all_inputs) 740 # trust server if tx is unconfirmed and not mine 741 if num_ismine_inputs < num_all_inputs: 742 return None if confirmed else self.db.get_tx_fee(txid, trust_server=True) 743 # lookup tx and deserialize it. 744 # note that deserializing is expensive, hence above hacks 745 tx = self.db.get_transaction(txid) 746 if not tx: 747 return None 748 fee = self.get_wallet_delta(tx).fee 749 # save result 750 self.db.add_tx_fee_we_calculated(txid, fee) 751 self.db.add_num_inputs_to_tx(txid, len(tx.inputs())) 752 return fee 753 754 def get_addr_io(self, address): 755 with self.lock, self.transaction_lock: 756 h = self.get_address_history(address) 757 received = {} 758 sent = {} 759 for tx_hash, height in h: 760 d = self.db.get_txo_addr(tx_hash, address) 761 for n, (v, is_cb) in d.items(): 762 received[tx_hash + ':%d'%n] = (height, v, is_cb) 763 for tx_hash, height in h: 764 l = self.db.get_txi_addr(tx_hash, address) 765 for txi, v in l: 766 sent[txi] = height 767 return received, sent 768 769 770 def get_addr_outputs(self, address: str) -> Dict[TxOutpoint, PartialTxInput]: 771 coins, spent = self.get_addr_io(address) 772 out = {} 773 for prevout_str, v in coins.items(): 774 tx_height, value, is_cb = v 775 prevout = TxOutpoint.from_str(prevout_str) 776 utxo = PartialTxInput(prevout=prevout, is_coinbase_output=is_cb) 777 utxo._trusted_address = address 778 utxo._trusted_value_sats = value 779 utxo.block_height = tx_height 780 utxo.spent_height = spent.get(prevout_str, None) 781 out[prevout] = utxo 782 return out 783 784 def get_addr_utxo(self, address: str) -> Dict[TxOutpoint, PartialTxInput]: 785 out = self.get_addr_outputs(address) 786 for k, v in list(out.items()): 787 if v.spent_height is not None: 788 out.pop(k) 789 return out 790 791 # return the total amount ever received by an address 792 def get_addr_received(self, address): 793 received, sent = self.get_addr_io(address) 794 return sum([v for height, v, is_cb in received.values()]) 795 796 @with_local_height_cached 797 def get_addr_balance(self, address, *, excluded_coins: Set[str] = None) -> Tuple[int, int, int]: 798 """Return the balance of a bitcoin address: 799 confirmed and matured, unconfirmed, unmatured 800 """ 801 if not excluded_coins: # cache is only used if there are no excluded_coins 802 cached_value = self._get_addr_balance_cache.get(address) 803 if cached_value: 804 return cached_value 805 if excluded_coins is None: 806 excluded_coins = set() 807 assert isinstance(excluded_coins, set), f"excluded_coins should be set, not {type(excluded_coins)}" 808 received, sent = self.get_addr_io(address) 809 c = u = x = 0 810 mempool_height = self.get_local_height() + 1 # height of next block 811 for txo, (tx_height, v, is_cb) in received.items(): 812 if txo in excluded_coins: 813 continue 814 if is_cb and tx_height + COINBASE_MATURITY > mempool_height: 815 x += v 816 elif tx_height > 0: 817 c += v 818 else: 819 u += v 820 if txo in sent: 821 if sent[txo] > 0: 822 c -= v 823 else: 824 u -= v 825 result = c, u, x 826 # cache result. 827 if not excluded_coins: 828 # Cache needs to be invalidated if a transaction is added to/ 829 # removed from history; or on new blocks (maturity...) 830 self._get_addr_balance_cache[address] = result 831 return result 832 833 @with_local_height_cached 834 def get_utxos( 835 self, 836 domain=None, 837 *, 838 excluded_addresses=None, 839 mature_only: bool = False, 840 confirmed_funding_only: bool = False, 841 confirmed_spending_only: bool = False, 842 nonlocal_only: bool = False, 843 block_height: int = None, 844 ) -> Sequence[PartialTxInput]: 845 if block_height is not None: 846 # caller wants the UTXOs we had at a given height; check other parameters 847 assert confirmed_funding_only 848 assert confirmed_spending_only 849 assert nonlocal_only 850 else: 851 block_height = self.get_local_height() 852 coins = [] 853 if domain is None: 854 domain = self.get_addresses() 855 domain = set(domain) 856 if excluded_addresses: 857 domain = set(domain) - set(excluded_addresses) 858 mempool_height = block_height + 1 # height of next block 859 for addr in domain: 860 txos = self.get_addr_outputs(addr) 861 for txo in txos.values(): 862 if txo.spent_height is not None: 863 if not confirmed_spending_only: 864 continue 865 if confirmed_spending_only and 0 < txo.spent_height <= block_height: 866 continue 867 if confirmed_funding_only and not (0 < txo.block_height <= block_height): 868 continue 869 if nonlocal_only and txo.block_height in (TX_HEIGHT_LOCAL, TX_HEIGHT_FUTURE): 870 continue 871 if (mature_only and txo.is_coinbase_output() 872 and txo.block_height + COINBASE_MATURITY > mempool_height): 873 continue 874 coins.append(txo) 875 continue 876 return coins 877 878 def get_balance(self, domain=None, *, excluded_addresses: Set[str] = None, 879 excluded_coins: Set[str] = None) -> Tuple[int, int, int]: 880 if domain is None: 881 domain = self.get_addresses() 882 if excluded_addresses is None: 883 excluded_addresses = set() 884 assert isinstance(excluded_addresses, set), f"excluded_addresses should be set, not {type(excluded_addresses)}" 885 domain = set(domain) - excluded_addresses 886 cc = uu = xx = 0 887 for addr in domain: 888 c, u, x = self.get_addr_balance(addr, excluded_coins=excluded_coins) 889 cc += c 890 uu += u 891 xx += x 892 return cc, uu, xx 893 894 def is_used(self, address: str) -> bool: 895 return self.get_address_history_len(address) != 0 896 897 def is_empty(self, address: str) -> bool: 898 c, u, x = self.get_addr_balance(address) 899 return c+u+x == 0 900 901 def synchronize(self): 902 pass 903