1// Copyright (c) 2013-2017 The btcsuite developers 2// Copyright (c) 2015-2017 The Decred developers 3// Use of this source code is governed by an ISC 4// license that can be found in the LICENSE file. 5 6package main 7 8import ( 9 "bytes" 10 "container/list" 11 "crypto/sha256" 12 "crypto/subtle" 13 "encoding/base64" 14 "encoding/hex" 15 "encoding/json" 16 "errors" 17 "fmt" 18 "io" 19 "math" 20 "sync" 21 "time" 22 23 "github.com/btcsuite/btcd/blockchain" 24 "github.com/btcsuite/btcd/btcjson" 25 "github.com/btcsuite/btcd/chaincfg" 26 "github.com/btcsuite/btcd/chaincfg/chainhash" 27 "github.com/btcsuite/btcd/database" 28 "github.com/btcsuite/btcd/txscript" 29 "github.com/btcsuite/btcd/wire" 30 "github.com/btcsuite/btcutil" 31 "github.com/btcsuite/websocket" 32 "golang.org/x/crypto/ripemd160" 33) 34 35const ( 36 // websocketSendBufferSize is the number of elements the send channel 37 // can queue before blocking. Note that this only applies to requests 38 // handled directly in the websocket client input handler or the async 39 // handler since notifications have their own queuing mechanism 40 // independent of the send channel buffer. 41 websocketSendBufferSize = 50 42) 43 44type semaphore chan struct{} 45 46func makeSemaphore(n int) semaphore { 47 return make(chan struct{}, n) 48} 49 50func (s semaphore) acquire() { s <- struct{}{} } 51func (s semaphore) release() { <-s } 52 53// timeZeroVal is simply the zero value for a time.Time and is used to avoid 54// creating multiple instances. 55var timeZeroVal time.Time 56 57// wsCommandHandler describes a callback function used to handle a specific 58// command. 59type wsCommandHandler func(*wsClient, interface{}) (interface{}, error) 60 61// wsHandlers maps RPC command strings to appropriate websocket handler 62// functions. This is set by init because help references wsHandlers and thus 63// causes a dependency loop. 64var wsHandlers map[string]wsCommandHandler 65var wsHandlersBeforeInit = map[string]wsCommandHandler{ 66 "loadtxfilter": handleLoadTxFilter, 67 "help": handleWebsocketHelp, 68 "notifyblocks": handleNotifyBlocks, 69 "notifynewtransactions": handleNotifyNewTransactions, 70 "notifyreceived": handleNotifyReceived, 71 "notifyspent": handleNotifySpent, 72 "session": handleSession, 73 "stopnotifyblocks": handleStopNotifyBlocks, 74 "stopnotifynewtransactions": handleStopNotifyNewTransactions, 75 "stopnotifyspent": handleStopNotifySpent, 76 "stopnotifyreceived": handleStopNotifyReceived, 77 "rescan": handleRescan, 78 "rescanblocks": handleRescanBlocks, 79} 80 81// WebsocketHandler handles a new websocket client by creating a new wsClient, 82// starting it, and blocking until the connection closes. Since it blocks, it 83// must be run in a separate goroutine. It should be invoked from the websocket 84// server handler which runs each new connection in a new goroutine thereby 85// satisfying the requirement. 86func (s *rpcServer) WebsocketHandler(conn *websocket.Conn, remoteAddr string, 87 authenticated bool, isAdmin bool) { 88 89 // Clear the read deadline that was set before the websocket hijacked 90 // the connection. 91 conn.SetReadDeadline(timeZeroVal) 92 93 // Limit max number of websocket clients. 94 rpcsLog.Infof("New websocket client %s", remoteAddr) 95 if s.ntfnMgr.NumClients()+1 > cfg.RPCMaxWebsockets { 96 rpcsLog.Infof("Max websocket clients exceeded [%d] - "+ 97 "disconnecting client %s", cfg.RPCMaxWebsockets, 98 remoteAddr) 99 conn.Close() 100 return 101 } 102 103 // Create a new websocket client to handle the new websocket connection 104 // and wait for it to shutdown. Once it has shutdown (and hence 105 // disconnected), remove it and any notifications it registered for. 106 client, err := newWebsocketClient(s, conn, remoteAddr, authenticated, isAdmin) 107 if err != nil { 108 rpcsLog.Errorf("Failed to serve client %s: %v", remoteAddr, err) 109 conn.Close() 110 return 111 } 112 s.ntfnMgr.AddClient(client) 113 client.Start() 114 client.WaitForShutdown() 115 s.ntfnMgr.RemoveClient(client) 116 rpcsLog.Infof("Disconnected websocket client %s", remoteAddr) 117} 118 119// wsNotificationManager is a connection and notification manager used for 120// websockets. It allows websocket clients to register for notifications they 121// are interested in. When an event happens elsewhere in the code such as 122// transactions being added to the memory pool or block connects/disconnects, 123// the notification manager is provided with the relevant details needed to 124// figure out which websocket clients need to be notified based on what they 125// have registered for and notifies them accordingly. It is also used to keep 126// track of all connected websocket clients. 127type wsNotificationManager struct { 128 // server is the RPC server the notification manager is associated with. 129 server *rpcServer 130 131 // queueNotification queues a notification for handling. 132 queueNotification chan interface{} 133 134 // notificationMsgs feeds notificationHandler with notifications 135 // and client (un)registeration requests from a queue as well as 136 // registeration and unregisteration requests from clients. 137 notificationMsgs chan interface{} 138 139 // Access channel for current number of connected clients. 140 numClients chan int 141 142 // Shutdown handling 143 wg sync.WaitGroup 144 quit chan struct{} 145} 146 147// queueHandler manages a queue of empty interfaces, reading from in and 148// sending the oldest unsent to out. This handler stops when either of the 149// in or quit channels are closed, and closes out before returning, without 150// waiting to send any variables still remaining in the queue. 151func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) { 152 var q []interface{} 153 var dequeue chan<- interface{} 154 skipQueue := out 155 var next interface{} 156out: 157 for { 158 select { 159 case n, ok := <-in: 160 if !ok { 161 // Sender closed input channel. 162 break out 163 } 164 165 // Either send to out immediately if skipQueue is 166 // non-nil (queue is empty) and reader is ready, 167 // or append to the queue and send later. 168 select { 169 case skipQueue <- n: 170 default: 171 q = append(q, n) 172 dequeue = out 173 skipQueue = nil 174 next = q[0] 175 } 176 177 case dequeue <- next: 178 copy(q, q[1:]) 179 q[len(q)-1] = nil // avoid leak 180 q = q[:len(q)-1] 181 if len(q) == 0 { 182 dequeue = nil 183 skipQueue = out 184 } else { 185 next = q[0] 186 } 187 188 case <-quit: 189 break out 190 } 191 } 192 close(out) 193} 194 195// queueHandler maintains a queue of notifications and notification handler 196// control messages. 197func (m *wsNotificationManager) queueHandler() { 198 queueHandler(m.queueNotification, m.notificationMsgs, m.quit) 199 m.wg.Done() 200} 201 202// NotifyBlockConnected passes a block newly-connected to the best chain 203// to the notification manager for block and transaction notification 204// processing. 205func (m *wsNotificationManager) NotifyBlockConnected(block *btcutil.Block) { 206 // As NotifyBlockConnected will be called by the block manager 207 // and the RPC server may no longer be running, use a select 208 // statement to unblock enqueuing the notification once the RPC 209 // server has begun shutting down. 210 select { 211 case m.queueNotification <- (*notificationBlockConnected)(block): 212 case <-m.quit: 213 } 214} 215 216// NotifyBlockDisconnected passes a block disconnected from the best chain 217// to the notification manager for block notification processing. 218func (m *wsNotificationManager) NotifyBlockDisconnected(block *btcutil.Block) { 219 // As NotifyBlockDisconnected will be called by the block manager 220 // and the RPC server may no longer be running, use a select 221 // statement to unblock enqueuing the notification once the RPC 222 // server has begun shutting down. 223 select { 224 case m.queueNotification <- (*notificationBlockDisconnected)(block): 225 case <-m.quit: 226 } 227} 228 229// NotifyMempoolTx passes a transaction accepted by mempool to the 230// notification manager for transaction notification processing. If 231// isNew is true, the tx is is a new transaction, rather than one 232// added to the mempool during a reorg. 233func (m *wsNotificationManager) NotifyMempoolTx(tx *btcutil.Tx, isNew bool) { 234 n := ¬ificationTxAcceptedByMempool{ 235 isNew: isNew, 236 tx: tx, 237 } 238 239 // As NotifyMempoolTx will be called by mempool and the RPC server 240 // may no longer be running, use a select statement to unblock 241 // enqueuing the notification once the RPC server has begun 242 // shutting down. 243 select { 244 case m.queueNotification <- n: 245 case <-m.quit: 246 } 247} 248 249// wsClientFilter tracks relevant addresses for each websocket client for 250// the `rescanblocks` extension. It is modified by the `loadtxfilter` command. 251// 252// NOTE: This extension was ported from github.com/decred/dcrd 253type wsClientFilter struct { 254 mu sync.Mutex 255 256 // Implemented fast paths for address lookup. 257 pubKeyHashes map[[ripemd160.Size]byte]struct{} 258 scriptHashes map[[ripemd160.Size]byte]struct{} 259 compressedPubKeys map[[33]byte]struct{} 260 uncompressedPubKeys map[[65]byte]struct{} 261 262 // A fallback address lookup map in case a fast path doesn't exist. 263 // Only exists for completeness. If using this shows up in a profile, 264 // there's a good chance a fast path should be added. 265 otherAddresses map[string]struct{} 266 267 // Outpoints of unspent outputs. 268 unspent map[wire.OutPoint]struct{} 269} 270 271// newWSClientFilter creates a new, empty wsClientFilter struct to be used 272// for a websocket client. 273// 274// NOTE: This extension was ported from github.com/decred/dcrd 275func newWSClientFilter(addresses []string, unspentOutPoints []wire.OutPoint, params *chaincfg.Params) *wsClientFilter { 276 filter := &wsClientFilter{ 277 pubKeyHashes: map[[ripemd160.Size]byte]struct{}{}, 278 scriptHashes: map[[ripemd160.Size]byte]struct{}{}, 279 compressedPubKeys: map[[33]byte]struct{}{}, 280 uncompressedPubKeys: map[[65]byte]struct{}{}, 281 otherAddresses: map[string]struct{}{}, 282 unspent: make(map[wire.OutPoint]struct{}, len(unspentOutPoints)), 283 } 284 285 for _, s := range addresses { 286 filter.addAddressStr(s, params) 287 } 288 for i := range unspentOutPoints { 289 filter.addUnspentOutPoint(&unspentOutPoints[i]) 290 } 291 292 return filter 293} 294 295// addAddress adds an address to a wsClientFilter, treating it correctly based 296// on the type of address passed as an argument. 297// 298// NOTE: This extension was ported from github.com/decred/dcrd 299func (f *wsClientFilter) addAddress(a btcutil.Address) { 300 switch a := a.(type) { 301 case *btcutil.AddressPubKeyHash: 302 f.pubKeyHashes[*a.Hash160()] = struct{}{} 303 return 304 case *btcutil.AddressScriptHash: 305 f.scriptHashes[*a.Hash160()] = struct{}{} 306 return 307 case *btcutil.AddressPubKey: 308 serializedPubKey := a.ScriptAddress() 309 switch len(serializedPubKey) { 310 case 33: // compressed 311 var compressedPubKey [33]byte 312 copy(compressedPubKey[:], serializedPubKey) 313 f.compressedPubKeys[compressedPubKey] = struct{}{} 314 return 315 case 65: // uncompressed 316 var uncompressedPubKey [65]byte 317 copy(uncompressedPubKey[:], serializedPubKey) 318 f.uncompressedPubKeys[uncompressedPubKey] = struct{}{} 319 return 320 } 321 } 322 323 f.otherAddresses[a.EncodeAddress()] = struct{}{} 324} 325 326// addAddressStr parses an address from a string and then adds it to the 327// wsClientFilter using addAddress. 328// 329// NOTE: This extension was ported from github.com/decred/dcrd 330func (f *wsClientFilter) addAddressStr(s string, params *chaincfg.Params) { 331 // If address can't be decoded, no point in saving it since it should also 332 // impossible to create the address from an inspected transaction output 333 // script. 334 a, err := btcutil.DecodeAddress(s, params) 335 if err != nil { 336 return 337 } 338 f.addAddress(a) 339} 340 341// existsAddress returns true if the passed address has been added to the 342// wsClientFilter. 343// 344// NOTE: This extension was ported from github.com/decred/dcrd 345func (f *wsClientFilter) existsAddress(a btcutil.Address) bool { 346 switch a := a.(type) { 347 case *btcutil.AddressPubKeyHash: 348 _, ok := f.pubKeyHashes[*a.Hash160()] 349 return ok 350 case *btcutil.AddressScriptHash: 351 _, ok := f.scriptHashes[*a.Hash160()] 352 return ok 353 case *btcutil.AddressPubKey: 354 serializedPubKey := a.ScriptAddress() 355 switch len(serializedPubKey) { 356 case 33: // compressed 357 var compressedPubKey [33]byte 358 copy(compressedPubKey[:], serializedPubKey) 359 _, ok := f.compressedPubKeys[compressedPubKey] 360 if !ok { 361 _, ok = f.pubKeyHashes[*a.AddressPubKeyHash().Hash160()] 362 } 363 return ok 364 case 65: // uncompressed 365 var uncompressedPubKey [65]byte 366 copy(uncompressedPubKey[:], serializedPubKey) 367 _, ok := f.uncompressedPubKeys[uncompressedPubKey] 368 if !ok { 369 _, ok = f.pubKeyHashes[*a.AddressPubKeyHash().Hash160()] 370 } 371 return ok 372 } 373 } 374 375 _, ok := f.otherAddresses[a.EncodeAddress()] 376 return ok 377} 378 379// removeAddress removes the passed address, if it exists, from the 380// wsClientFilter. 381// 382// NOTE: This extension was ported from github.com/decred/dcrd 383func (f *wsClientFilter) removeAddress(a btcutil.Address) { 384 switch a := a.(type) { 385 case *btcutil.AddressPubKeyHash: 386 delete(f.pubKeyHashes, *a.Hash160()) 387 return 388 case *btcutil.AddressScriptHash: 389 delete(f.scriptHashes, *a.Hash160()) 390 return 391 case *btcutil.AddressPubKey: 392 serializedPubKey := a.ScriptAddress() 393 switch len(serializedPubKey) { 394 case 33: // compressed 395 var compressedPubKey [33]byte 396 copy(compressedPubKey[:], serializedPubKey) 397 delete(f.compressedPubKeys, compressedPubKey) 398 return 399 case 65: // uncompressed 400 var uncompressedPubKey [65]byte 401 copy(uncompressedPubKey[:], serializedPubKey) 402 delete(f.uncompressedPubKeys, uncompressedPubKey) 403 return 404 } 405 } 406 407 delete(f.otherAddresses, a.EncodeAddress()) 408} 409 410// removeAddressStr parses an address from a string and then removes it from the 411// wsClientFilter using removeAddress. 412// 413// NOTE: This extension was ported from github.com/decred/dcrd 414func (f *wsClientFilter) removeAddressStr(s string, params *chaincfg.Params) { 415 a, err := btcutil.DecodeAddress(s, params) 416 if err == nil { 417 f.removeAddress(a) 418 } else { 419 delete(f.otherAddresses, s) 420 } 421} 422 423// addUnspentOutPoint adds an outpoint to the wsClientFilter. 424// 425// NOTE: This extension was ported from github.com/decred/dcrd 426func (f *wsClientFilter) addUnspentOutPoint(op *wire.OutPoint) { 427 f.unspent[*op] = struct{}{} 428} 429 430// existsUnspentOutPoint returns true if the passed outpoint has been added to 431// the wsClientFilter. 432// 433// NOTE: This extension was ported from github.com/decred/dcrd 434func (f *wsClientFilter) existsUnspentOutPoint(op *wire.OutPoint) bool { 435 _, ok := f.unspent[*op] 436 return ok 437} 438 439// removeUnspentOutPoint removes the passed outpoint, if it exists, from the 440// wsClientFilter. 441// 442// NOTE: This extension was ported from github.com/decred/dcrd 443func (f *wsClientFilter) removeUnspentOutPoint(op *wire.OutPoint) { 444 delete(f.unspent, *op) 445} 446 447// Notification types 448type notificationBlockConnected btcutil.Block 449type notificationBlockDisconnected btcutil.Block 450type notificationTxAcceptedByMempool struct { 451 isNew bool 452 tx *btcutil.Tx 453} 454 455// Notification control requests 456type notificationRegisterClient wsClient 457type notificationUnregisterClient wsClient 458type notificationRegisterBlocks wsClient 459type notificationUnregisterBlocks wsClient 460type notificationRegisterNewMempoolTxs wsClient 461type notificationUnregisterNewMempoolTxs wsClient 462type notificationRegisterSpent struct { 463 wsc *wsClient 464 ops []*wire.OutPoint 465} 466type notificationUnregisterSpent struct { 467 wsc *wsClient 468 op *wire.OutPoint 469} 470type notificationRegisterAddr struct { 471 wsc *wsClient 472 addrs []string 473} 474type notificationUnregisterAddr struct { 475 wsc *wsClient 476 addr string 477} 478 479// notificationHandler reads notifications and control messages from the queue 480// handler and processes one at a time. 481func (m *wsNotificationManager) notificationHandler() { 482 // clients is a map of all currently connected websocket clients. 483 clients := make(map[chan struct{}]*wsClient) 484 485 // Maps used to hold lists of websocket clients to be notified on 486 // certain events. Each websocket client also keeps maps for the events 487 // which have multiple triggers to make removal from these lists on 488 // connection close less horrendously expensive. 489 // 490 // Where possible, the quit channel is used as the unique id for a client 491 // since it is quite a bit more efficient than using the entire struct. 492 blockNotifications := make(map[chan struct{}]*wsClient) 493 txNotifications := make(map[chan struct{}]*wsClient) 494 watchedOutPoints := make(map[wire.OutPoint]map[chan struct{}]*wsClient) 495 watchedAddrs := make(map[string]map[chan struct{}]*wsClient) 496 497out: 498 for { 499 select { 500 case n, ok := <-m.notificationMsgs: 501 if !ok { 502 // queueHandler quit. 503 break out 504 } 505 switch n := n.(type) { 506 case *notificationBlockConnected: 507 block := (*btcutil.Block)(n) 508 509 // Skip iterating through all txs if no 510 // tx notification requests exist. 511 if len(watchedOutPoints) != 0 || len(watchedAddrs) != 0 { 512 for _, tx := range block.Transactions() { 513 m.notifyForTx(watchedOutPoints, 514 watchedAddrs, tx, block) 515 } 516 } 517 518 if len(blockNotifications) != 0 { 519 m.notifyBlockConnected(blockNotifications, 520 block) 521 m.notifyFilteredBlockConnected(blockNotifications, 522 block) 523 } 524 525 case *notificationBlockDisconnected: 526 block := (*btcutil.Block)(n) 527 528 if len(blockNotifications) != 0 { 529 m.notifyBlockDisconnected(blockNotifications, 530 block) 531 m.notifyFilteredBlockDisconnected(blockNotifications, 532 block) 533 } 534 535 case *notificationTxAcceptedByMempool: 536 if n.isNew && len(txNotifications) != 0 { 537 m.notifyForNewTx(txNotifications, n.tx) 538 } 539 m.notifyForTx(watchedOutPoints, watchedAddrs, n.tx, nil) 540 m.notifyRelevantTxAccepted(n.tx, clients) 541 542 case *notificationRegisterBlocks: 543 wsc := (*wsClient)(n) 544 blockNotifications[wsc.quit] = wsc 545 546 case *notificationUnregisterBlocks: 547 wsc := (*wsClient)(n) 548 delete(blockNotifications, wsc.quit) 549 550 case *notificationRegisterClient: 551 wsc := (*wsClient)(n) 552 clients[wsc.quit] = wsc 553 554 case *notificationUnregisterClient: 555 wsc := (*wsClient)(n) 556 // Remove any requests made by the client as well as 557 // the client itself. 558 delete(blockNotifications, wsc.quit) 559 delete(txNotifications, wsc.quit) 560 for k := range wsc.spentRequests { 561 op := k 562 m.removeSpentRequest(watchedOutPoints, wsc, &op) 563 } 564 for addr := range wsc.addrRequests { 565 m.removeAddrRequest(watchedAddrs, wsc, addr) 566 } 567 delete(clients, wsc.quit) 568 569 case *notificationRegisterSpent: 570 m.addSpentRequests(watchedOutPoints, n.wsc, n.ops) 571 572 case *notificationUnregisterSpent: 573 m.removeSpentRequest(watchedOutPoints, n.wsc, n.op) 574 575 case *notificationRegisterAddr: 576 m.addAddrRequests(watchedAddrs, n.wsc, n.addrs) 577 578 case *notificationUnregisterAddr: 579 m.removeAddrRequest(watchedAddrs, n.wsc, n.addr) 580 581 case *notificationRegisterNewMempoolTxs: 582 wsc := (*wsClient)(n) 583 txNotifications[wsc.quit] = wsc 584 585 case *notificationUnregisterNewMempoolTxs: 586 wsc := (*wsClient)(n) 587 delete(txNotifications, wsc.quit) 588 589 default: 590 rpcsLog.Warn("Unhandled notification type") 591 } 592 593 case m.numClients <- len(clients): 594 595 case <-m.quit: 596 // RPC server shutting down. 597 break out 598 } 599 } 600 601 for _, c := range clients { 602 c.Disconnect() 603 } 604 m.wg.Done() 605} 606 607// NumClients returns the number of clients actively being served. 608func (m *wsNotificationManager) NumClients() (n int) { 609 select { 610 case n = <-m.numClients: 611 case <-m.quit: // Use default n (0) if server has shut down. 612 } 613 return 614} 615 616// RegisterBlockUpdates requests block update notifications to the passed 617// websocket client. 618func (m *wsNotificationManager) RegisterBlockUpdates(wsc *wsClient) { 619 m.queueNotification <- (*notificationRegisterBlocks)(wsc) 620} 621 622// UnregisterBlockUpdates removes block update notifications for the passed 623// websocket client. 624func (m *wsNotificationManager) UnregisterBlockUpdates(wsc *wsClient) { 625 m.queueNotification <- (*notificationUnregisterBlocks)(wsc) 626} 627 628// subscribedClients returns the set of all websocket client quit channels that 629// are registered to receive notifications regarding tx, either due to tx 630// spending a watched output or outputting to a watched address. Matching 631// client's filters are updated based on this transaction's outputs and output 632// addresses that may be relevant for a client. 633func (m *wsNotificationManager) subscribedClients(tx *btcutil.Tx, 634 clients map[chan struct{}]*wsClient) map[chan struct{}]struct{} { 635 636 // Use a map of client quit channels as keys to prevent duplicates when 637 // multiple inputs and/or outputs are relevant to the client. 638 subscribed := make(map[chan struct{}]struct{}) 639 640 msgTx := tx.MsgTx() 641 for _, input := range msgTx.TxIn { 642 for quitChan, wsc := range clients { 643 wsc.Lock() 644 filter := wsc.filterData 645 wsc.Unlock() 646 if filter == nil { 647 continue 648 } 649 filter.mu.Lock() 650 if filter.existsUnspentOutPoint(&input.PreviousOutPoint) { 651 subscribed[quitChan] = struct{}{} 652 } 653 filter.mu.Unlock() 654 } 655 } 656 657 for i, output := range msgTx.TxOut { 658 _, addrs, _, err := txscript.ExtractPkScriptAddrs( 659 output.PkScript, m.server.cfg.ChainParams) 660 if err != nil { 661 // Clients are not able to subscribe to 662 // nonstandard or non-address outputs. 663 continue 664 } 665 for quitChan, wsc := range clients { 666 wsc.Lock() 667 filter := wsc.filterData 668 wsc.Unlock() 669 if filter == nil { 670 continue 671 } 672 filter.mu.Lock() 673 for _, a := range addrs { 674 if filter.existsAddress(a) { 675 subscribed[quitChan] = struct{}{} 676 op := wire.OutPoint{ 677 Hash: *tx.Hash(), 678 Index: uint32(i), 679 } 680 filter.addUnspentOutPoint(&op) 681 } 682 } 683 filter.mu.Unlock() 684 } 685 } 686 687 return subscribed 688} 689 690// notifyBlockConnected notifies websocket clients that have registered for 691// block updates when a block is connected to the main chain. 692func (*wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*wsClient, 693 block *btcutil.Block) { 694 695 // Notify interested websocket clients about the connected block. 696 ntfn := btcjson.NewBlockConnectedNtfn(block.Hash().String(), block.Height(), 697 block.MsgBlock().Header.Timestamp.Unix()) 698 marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) 699 if err != nil { 700 rpcsLog.Errorf("Failed to marshal block connected notification: "+ 701 "%v", err) 702 return 703 } 704 for _, wsc := range clients { 705 wsc.QueueNotification(marshalledJSON) 706 } 707} 708 709// notifyBlockDisconnected notifies websocket clients that have registered for 710// block updates when a block is disconnected from the main chain (due to a 711// reorganize). 712func (*wsNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*wsClient, block *btcutil.Block) { 713 // Skip notification creation if no clients have requested block 714 // connected/disconnected notifications. 715 if len(clients) == 0 { 716 return 717 } 718 719 // Notify interested websocket clients about the disconnected block. 720 ntfn := btcjson.NewBlockDisconnectedNtfn(block.Hash().String(), 721 block.Height(), block.MsgBlock().Header.Timestamp.Unix()) 722 marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) 723 if err != nil { 724 rpcsLog.Errorf("Failed to marshal block disconnected "+ 725 "notification: %v", err) 726 return 727 } 728 for _, wsc := range clients { 729 wsc.QueueNotification(marshalledJSON) 730 } 731} 732 733// notifyFilteredBlockConnected notifies websocket clients that have registered for 734// block updates when a block is connected to the main chain. 735func (m *wsNotificationManager) notifyFilteredBlockConnected(clients map[chan struct{}]*wsClient, 736 block *btcutil.Block) { 737 738 // Create the common portion of the notification that is the same for 739 // every client. 740 var w bytes.Buffer 741 err := block.MsgBlock().Header.Serialize(&w) 742 if err != nil { 743 rpcsLog.Errorf("Failed to serialize header for filtered block "+ 744 "connected notification: %v", err) 745 return 746 } 747 ntfn := btcjson.NewFilteredBlockConnectedNtfn(block.Height(), 748 hex.EncodeToString(w.Bytes()), nil) 749 750 // Search for relevant transactions for each client and save them 751 // serialized in hex encoding for the notification. 752 subscribedTxs := make(map[chan struct{}][]string) 753 for _, tx := range block.Transactions() { 754 var txHex string 755 for quitChan := range m.subscribedClients(tx, clients) { 756 if txHex == "" { 757 txHex = txHexString(tx.MsgTx()) 758 } 759 subscribedTxs[quitChan] = append(subscribedTxs[quitChan], txHex) 760 } 761 } 762 for quitChan, wsc := range clients { 763 // Add all discovered transactions for this client. For clients 764 // that have no new-style filter, add the empty string slice. 765 ntfn.SubscribedTxs = subscribedTxs[quitChan] 766 767 // Marshal and queue notification. 768 marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) 769 if err != nil { 770 rpcsLog.Errorf("Failed to marshal filtered block "+ 771 "connected notification: %v", err) 772 return 773 } 774 wsc.QueueNotification(marshalledJSON) 775 } 776} 777 778// notifyFilteredBlockDisconnected notifies websocket clients that have registered for 779// block updates when a block is disconnected from the main chain (due to a 780// reorganize). 781func (*wsNotificationManager) notifyFilteredBlockDisconnected(clients map[chan struct{}]*wsClient, 782 block *btcutil.Block) { 783 // Skip notification creation if no clients have requested block 784 // connected/disconnected notifications. 785 if len(clients) == 0 { 786 return 787 } 788 789 // Notify interested websocket clients about the disconnected block. 790 var w bytes.Buffer 791 err := block.MsgBlock().Header.Serialize(&w) 792 if err != nil { 793 rpcsLog.Errorf("Failed to serialize header for filtered block "+ 794 "disconnected notification: %v", err) 795 return 796 } 797 ntfn := btcjson.NewFilteredBlockDisconnectedNtfn(block.Height(), 798 hex.EncodeToString(w.Bytes())) 799 marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) 800 if err != nil { 801 rpcsLog.Errorf("Failed to marshal filtered block disconnected "+ 802 "notification: %v", err) 803 return 804 } 805 for _, wsc := range clients { 806 wsc.QueueNotification(marshalledJSON) 807 } 808} 809 810// RegisterNewMempoolTxsUpdates requests notifications to the passed websocket 811// client when new transactions are added to the memory pool. 812func (m *wsNotificationManager) RegisterNewMempoolTxsUpdates(wsc *wsClient) { 813 m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc) 814} 815 816// UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket 817// client when new transaction are added to the memory pool. 818func (m *wsNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *wsClient) { 819 m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc) 820} 821 822// notifyForNewTx notifies websocket clients that have registered for updates 823// when a new transaction is added to the memory pool. 824func (m *wsNotificationManager) notifyForNewTx(clients map[chan struct{}]*wsClient, tx *btcutil.Tx) { 825 txHashStr := tx.Hash().String() 826 mtx := tx.MsgTx() 827 828 var amount int64 829 for _, txOut := range mtx.TxOut { 830 amount += txOut.Value 831 } 832 833 ntfn := btcjson.NewTxAcceptedNtfn(txHashStr, btcutil.Amount(amount).ToBTC()) 834 marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) 835 if err != nil { 836 rpcsLog.Errorf("Failed to marshal tx notification: %s", err.Error()) 837 return 838 } 839 840 var verboseNtfn *btcjson.TxAcceptedVerboseNtfn 841 var marshalledJSONVerbose []byte 842 for _, wsc := range clients { 843 if wsc.verboseTxUpdates { 844 if marshalledJSONVerbose != nil { 845 wsc.QueueNotification(marshalledJSONVerbose) 846 continue 847 } 848 849 net := m.server.cfg.ChainParams 850 rawTx, err := createTxRawResult(net, mtx, txHashStr, nil, 851 "", 0, 0) 852 if err != nil { 853 return 854 } 855 856 verboseNtfn = btcjson.NewTxAcceptedVerboseNtfn(*rawTx) 857 marshalledJSONVerbose, err = btcjson.MarshalCmd(nil, 858 verboseNtfn) 859 if err != nil { 860 rpcsLog.Errorf("Failed to marshal verbose tx "+ 861 "notification: %s", err.Error()) 862 return 863 } 864 wsc.QueueNotification(marshalledJSONVerbose) 865 } else { 866 wsc.QueueNotification(marshalledJSON) 867 } 868 } 869} 870 871// RegisterSpentRequests requests a notification when each of the passed 872// outpoints is confirmed spent (contained in a block connected to the main 873// chain) for the passed websocket client. The request is automatically 874// removed once the notification has been sent. 875func (m *wsNotificationManager) RegisterSpentRequests(wsc *wsClient, ops []*wire.OutPoint) { 876 m.queueNotification <- ¬ificationRegisterSpent{ 877 wsc: wsc, 878 ops: ops, 879 } 880} 881 882// addSpentRequests modifies a map of watched outpoints to sets of websocket 883// clients to add a new request watch all of the outpoints in ops and create 884// and send a notification when spent to the websocket client wsc. 885func (m *wsNotificationManager) addSpentRequests(opMap map[wire.OutPoint]map[chan struct{}]*wsClient, 886 wsc *wsClient, ops []*wire.OutPoint) { 887 888 for _, op := range ops { 889 // Track the request in the client as well so it can be quickly 890 // be removed on disconnect. 891 wsc.spentRequests[*op] = struct{}{} 892 893 // Add the client to the list to notify when the outpoint is seen. 894 // Create the list as needed. 895 cmap, ok := opMap[*op] 896 if !ok { 897 cmap = make(map[chan struct{}]*wsClient) 898 opMap[*op] = cmap 899 } 900 cmap[wsc.quit] = wsc 901 } 902 903 // Check if any transactions spending these outputs already exists in 904 // the mempool, if so send the notification immediately. 905 spends := make(map[chainhash.Hash]*btcutil.Tx) 906 for _, op := range ops { 907 spend := m.server.cfg.TxMemPool.CheckSpend(*op) 908 if spend != nil { 909 rpcsLog.Debugf("Found existing mempool spend for "+ 910 "outpoint<%v>: %v", op, spend.Hash()) 911 spends[*spend.Hash()] = spend 912 } 913 } 914 915 for _, spend := range spends { 916 m.notifyForTx(opMap, nil, spend, nil) 917 } 918} 919 920// UnregisterSpentRequest removes a request from the passed websocket client 921// to be notified when the passed outpoint is confirmed spent (contained in a 922// block connected to the main chain). 923func (m *wsNotificationManager) UnregisterSpentRequest(wsc *wsClient, op *wire.OutPoint) { 924 m.queueNotification <- ¬ificationUnregisterSpent{ 925 wsc: wsc, 926 op: op, 927 } 928} 929 930// removeSpentRequest modifies a map of watched outpoints to remove the 931// websocket client wsc from the set of clients to be notified when a 932// watched outpoint is spent. If wsc is the last client, the outpoint 933// key is removed from the map. 934func (*wsNotificationManager) removeSpentRequest(ops map[wire.OutPoint]map[chan struct{}]*wsClient, 935 wsc *wsClient, op *wire.OutPoint) { 936 937 // Remove the request tracking from the client. 938 delete(wsc.spentRequests, *op) 939 940 // Remove the client from the list to notify. 941 notifyMap, ok := ops[*op] 942 if !ok { 943 rpcsLog.Warnf("Attempt to remove nonexistent spent request "+ 944 "for websocket client %s", wsc.addr) 945 return 946 } 947 delete(notifyMap, wsc.quit) 948 949 // Remove the map entry altogether if there are 950 // no more clients interested in it. 951 if len(notifyMap) == 0 { 952 delete(ops, *op) 953 } 954} 955 956// txHexString returns the serialized transaction encoded in hexadecimal. 957func txHexString(tx *wire.MsgTx) string { 958 buf := bytes.NewBuffer(make([]byte, 0, tx.SerializeSize())) 959 // Ignore Serialize's error, as writing to a bytes.buffer cannot fail. 960 tx.Serialize(buf) 961 return hex.EncodeToString(buf.Bytes()) 962} 963 964// blockDetails creates a BlockDetails struct to include in btcws notifications 965// from a block and a transaction's block index. 966func blockDetails(block *btcutil.Block, txIndex int) *btcjson.BlockDetails { 967 if block == nil { 968 return nil 969 } 970 return &btcjson.BlockDetails{ 971 Height: block.Height(), 972 Hash: block.Hash().String(), 973 Index: txIndex, 974 Time: block.MsgBlock().Header.Timestamp.Unix(), 975 } 976} 977 978// newRedeemingTxNotification returns a new marshalled redeemingtx notification 979// with the passed parameters. 980func newRedeemingTxNotification(txHex string, index int, block *btcutil.Block) ([]byte, error) { 981 // Create and marshal the notification. 982 ntfn := btcjson.NewRedeemingTxNtfn(txHex, blockDetails(block, index)) 983 return btcjson.MarshalCmd(nil, ntfn) 984} 985 986// notifyForTxOuts examines each transaction output, notifying interested 987// websocket clients of the transaction if an output spends to a watched 988// address. A spent notification request is automatically registered for 989// the client for each matching output. 990func (m *wsNotificationManager) notifyForTxOuts(ops map[wire.OutPoint]map[chan struct{}]*wsClient, 991 addrs map[string]map[chan struct{}]*wsClient, tx *btcutil.Tx, block *btcutil.Block) { 992 993 // Nothing to do if nobody is listening for address notifications. 994 if len(addrs) == 0 { 995 return 996 } 997 998 txHex := "" 999 wscNotified := make(map[chan struct{}]struct{}) 1000 for i, txOut := range tx.MsgTx().TxOut { 1001 _, txAddrs, _, err := txscript.ExtractPkScriptAddrs( 1002 txOut.PkScript, m.server.cfg.ChainParams) 1003 if err != nil { 1004 continue 1005 } 1006 1007 for _, txAddr := range txAddrs { 1008 cmap, ok := addrs[txAddr.EncodeAddress()] 1009 if !ok { 1010 continue 1011 } 1012 1013 if txHex == "" { 1014 txHex = txHexString(tx.MsgTx()) 1015 } 1016 ntfn := btcjson.NewRecvTxNtfn(txHex, blockDetails(block, 1017 tx.Index())) 1018 1019 marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) 1020 if err != nil { 1021 rpcsLog.Errorf("Failed to marshal processedtx notification: %v", err) 1022 continue 1023 } 1024 1025 op := []*wire.OutPoint{wire.NewOutPoint(tx.Hash(), uint32(i))} 1026 for wscQuit, wsc := range cmap { 1027 m.addSpentRequests(ops, wsc, op) 1028 1029 if _, ok := wscNotified[wscQuit]; !ok { 1030 wscNotified[wscQuit] = struct{}{} 1031 wsc.QueueNotification(marshalledJSON) 1032 } 1033 } 1034 } 1035 } 1036} 1037 1038// notifyRelevantTxAccepted examines the inputs and outputs of the passed 1039// transaction, notifying websocket clients of outputs spending to a watched 1040// address and inputs spending a watched outpoint. Any outputs paying to a 1041// watched address result in the output being watched as well for future 1042// notifications. 1043func (m *wsNotificationManager) notifyRelevantTxAccepted(tx *btcutil.Tx, 1044 clients map[chan struct{}]*wsClient) { 1045 1046 clientsToNotify := m.subscribedClients(tx, clients) 1047 1048 if len(clientsToNotify) != 0 { 1049 n := btcjson.NewRelevantTxAcceptedNtfn(txHexString(tx.MsgTx())) 1050 marshalled, err := btcjson.MarshalCmd(nil, n) 1051 if err != nil { 1052 rpcsLog.Errorf("Failed to marshal notification: %v", err) 1053 return 1054 } 1055 for quitChan := range clientsToNotify { 1056 clients[quitChan].QueueNotification(marshalled) 1057 } 1058 } 1059} 1060 1061// notifyForTx examines the inputs and outputs of the passed transaction, 1062// notifying websocket clients of outputs spending to a watched address 1063// and inputs spending a watched outpoint. 1064func (m *wsNotificationManager) notifyForTx(ops map[wire.OutPoint]map[chan struct{}]*wsClient, 1065 addrs map[string]map[chan struct{}]*wsClient, tx *btcutil.Tx, block *btcutil.Block) { 1066 1067 if len(ops) != 0 { 1068 m.notifyForTxIns(ops, tx, block) 1069 } 1070 if len(addrs) != 0 { 1071 m.notifyForTxOuts(ops, addrs, tx, block) 1072 } 1073} 1074 1075// notifyForTxIns examines the inputs of the passed transaction and sends 1076// interested websocket clients a redeemingtx notification if any inputs 1077// spend a watched output. If block is non-nil, any matching spent 1078// requests are removed. 1079func (m *wsNotificationManager) notifyForTxIns(ops map[wire.OutPoint]map[chan struct{}]*wsClient, 1080 tx *btcutil.Tx, block *btcutil.Block) { 1081 1082 // Nothing to do if nobody is watching outpoints. 1083 if len(ops) == 0 { 1084 return 1085 } 1086 1087 txHex := "" 1088 wscNotified := make(map[chan struct{}]struct{}) 1089 for _, txIn := range tx.MsgTx().TxIn { 1090 prevOut := &txIn.PreviousOutPoint 1091 if cmap, ok := ops[*prevOut]; ok { 1092 if txHex == "" { 1093 txHex = txHexString(tx.MsgTx()) 1094 } 1095 marshalledJSON, err := newRedeemingTxNotification(txHex, tx.Index(), block) 1096 if err != nil { 1097 rpcsLog.Warnf("Failed to marshal redeemingtx notification: %v", err) 1098 continue 1099 } 1100 for wscQuit, wsc := range cmap { 1101 if block != nil { 1102 m.removeSpentRequest(ops, wsc, prevOut) 1103 } 1104 1105 if _, ok := wscNotified[wscQuit]; !ok { 1106 wscNotified[wscQuit] = struct{}{} 1107 wsc.QueueNotification(marshalledJSON) 1108 } 1109 } 1110 } 1111 } 1112} 1113 1114// RegisterTxOutAddressRequests requests notifications to the passed websocket 1115// client when a transaction output spends to the passed address. 1116func (m *wsNotificationManager) RegisterTxOutAddressRequests(wsc *wsClient, addrs []string) { 1117 m.queueNotification <- ¬ificationRegisterAddr{ 1118 wsc: wsc, 1119 addrs: addrs, 1120 } 1121} 1122 1123// addAddrRequests adds the websocket client wsc to the address to client set 1124// addrMap so wsc will be notified for any mempool or block transaction outputs 1125// spending to any of the addresses in addrs. 1126func (*wsNotificationManager) addAddrRequests(addrMap map[string]map[chan struct{}]*wsClient, 1127 wsc *wsClient, addrs []string) { 1128 1129 for _, addr := range addrs { 1130 // Track the request in the client as well so it can be quickly be 1131 // removed on disconnect. 1132 wsc.addrRequests[addr] = struct{}{} 1133 1134 // Add the client to the set of clients to notify when the 1135 // outpoint is seen. Create map as needed. 1136 cmap, ok := addrMap[addr] 1137 if !ok { 1138 cmap = make(map[chan struct{}]*wsClient) 1139 addrMap[addr] = cmap 1140 } 1141 cmap[wsc.quit] = wsc 1142 } 1143} 1144 1145// UnregisterTxOutAddressRequest removes a request from the passed websocket 1146// client to be notified when a transaction spends to the passed address. 1147func (m *wsNotificationManager) UnregisterTxOutAddressRequest(wsc *wsClient, addr string) { 1148 m.queueNotification <- ¬ificationUnregisterAddr{ 1149 wsc: wsc, 1150 addr: addr, 1151 } 1152} 1153 1154// removeAddrRequest removes the websocket client wsc from the address to 1155// client set addrs so it will no longer receive notification updates for 1156// any transaction outputs send to addr. 1157func (*wsNotificationManager) removeAddrRequest(addrs map[string]map[chan struct{}]*wsClient, 1158 wsc *wsClient, addr string) { 1159 1160 // Remove the request tracking from the client. 1161 delete(wsc.addrRequests, addr) 1162 1163 // Remove the client from the list to notify. 1164 cmap, ok := addrs[addr] 1165 if !ok { 1166 rpcsLog.Warnf("Attempt to remove nonexistent addr request "+ 1167 "<%s> for websocket client %s", addr, wsc.addr) 1168 return 1169 } 1170 delete(cmap, wsc.quit) 1171 1172 // Remove the map entry altogether if there are no more clients 1173 // interested in it. 1174 if len(cmap) == 0 { 1175 delete(addrs, addr) 1176 } 1177} 1178 1179// AddClient adds the passed websocket client to the notification manager. 1180func (m *wsNotificationManager) AddClient(wsc *wsClient) { 1181 m.queueNotification <- (*notificationRegisterClient)(wsc) 1182} 1183 1184// RemoveClient removes the passed websocket client and all notifications 1185// registered for it. 1186func (m *wsNotificationManager) RemoveClient(wsc *wsClient) { 1187 select { 1188 case m.queueNotification <- (*notificationUnregisterClient)(wsc): 1189 case <-m.quit: 1190 } 1191} 1192 1193// Start starts the goroutines required for the manager to queue and process 1194// websocket client notifications. 1195func (m *wsNotificationManager) Start() { 1196 m.wg.Add(2) 1197 go m.queueHandler() 1198 go m.notificationHandler() 1199} 1200 1201// WaitForShutdown blocks until all notification manager goroutines have 1202// finished. 1203func (m *wsNotificationManager) WaitForShutdown() { 1204 m.wg.Wait() 1205} 1206 1207// Shutdown shuts down the manager, stopping the notification queue and 1208// notification handler goroutines. 1209func (m *wsNotificationManager) Shutdown() { 1210 close(m.quit) 1211} 1212 1213// newWsNotificationManager returns a new notification manager ready for use. 1214// See wsNotificationManager for more details. 1215func newWsNotificationManager(server *rpcServer) *wsNotificationManager { 1216 return &wsNotificationManager{ 1217 server: server, 1218 queueNotification: make(chan interface{}), 1219 notificationMsgs: make(chan interface{}), 1220 numClients: make(chan int), 1221 quit: make(chan struct{}), 1222 } 1223} 1224 1225// wsResponse houses a message to send to a connected websocket client as 1226// well as a channel to reply on when the message is sent. 1227type wsResponse struct { 1228 msg []byte 1229 doneChan chan bool 1230} 1231 1232// wsClient provides an abstraction for handling a websocket client. The 1233// overall data flow is split into 3 main goroutines, a possible 4th goroutine 1234// for long-running operations (only started if request is made), and a 1235// websocket manager which is used to allow things such as broadcasting 1236// requested notifications to all connected websocket clients. Inbound 1237// messages are read via the inHandler goroutine and generally dispatched to 1238// their own handler. However, certain potentially long-running operations such 1239// as rescans, are sent to the asyncHander goroutine and are limited to one at a 1240// time. There are two outbound message types - one for responding to client 1241// requests and another for async notifications. Responses to client requests 1242// use SendMessage which employs a buffered channel thereby limiting the number 1243// of outstanding requests that can be made. Notifications are sent via 1244// QueueNotification which implements a queue via notificationQueueHandler to 1245// ensure sending notifications from other subsystems can't block. Ultimately, 1246// all messages are sent via the outHandler. 1247type wsClient struct { 1248 sync.Mutex 1249 1250 // server is the RPC server that is servicing the client. 1251 server *rpcServer 1252 1253 // conn is the underlying websocket connection. 1254 conn *websocket.Conn 1255 1256 // disconnected indicated whether or not the websocket client is 1257 // disconnected. 1258 disconnected bool 1259 1260 // addr is the remote address of the client. 1261 addr string 1262 1263 // authenticated specifies whether a client has been authenticated 1264 // and therefore is allowed to communicated over the websocket. 1265 authenticated bool 1266 1267 // isAdmin specifies whether a client may change the state of the server; 1268 // false means its access is only to the limited set of RPC calls. 1269 isAdmin bool 1270 1271 // sessionID is a random ID generated for each client when connected. 1272 // These IDs may be queried by a client using the session RPC. A change 1273 // to the session ID indicates that the client reconnected. 1274 sessionID uint64 1275 1276 // verboseTxUpdates specifies whether a client has requested verbose 1277 // information about all new transactions. 1278 verboseTxUpdates bool 1279 1280 // addrRequests is a set of addresses the caller has requested to be 1281 // notified about. It is maintained here so all requests can be removed 1282 // when a wallet disconnects. Owned by the notification manager. 1283 addrRequests map[string]struct{} 1284 1285 // spentRequests is a set of unspent Outpoints a wallet has requested 1286 // notifications for when they are spent by a processed transaction. 1287 // Owned by the notification manager. 1288 spentRequests map[wire.OutPoint]struct{} 1289 1290 // filterData is the new generation transaction filter backported from 1291 // github.com/decred/dcrd for the new backported `loadtxfilter` and 1292 // `rescanblocks` methods. 1293 filterData *wsClientFilter 1294 1295 // Networking infrastructure. 1296 serviceRequestSem semaphore 1297 ntfnChan chan []byte 1298 sendChan chan wsResponse 1299 quit chan struct{} 1300 wg sync.WaitGroup 1301} 1302 1303// inHandler handles all incoming messages for the websocket connection. It 1304// must be run as a goroutine. 1305func (c *wsClient) inHandler() { 1306out: 1307 for { 1308 // Break out of the loop once the quit channel has been closed. 1309 // Use a non-blocking select here so we fall through otherwise. 1310 select { 1311 case <-c.quit: 1312 break out 1313 default: 1314 } 1315 1316 _, msg, err := c.conn.ReadMessage() 1317 if err != nil { 1318 // Log the error if it's not due to disconnecting. 1319 if err != io.EOF { 1320 rpcsLog.Errorf("Websocket receive error from "+ 1321 "%s: %v", c.addr, err) 1322 } 1323 break out 1324 } 1325 1326 var request btcjson.Request 1327 err = json.Unmarshal(msg, &request) 1328 if err != nil { 1329 if !c.authenticated { 1330 break out 1331 } 1332 1333 jsonErr := &btcjson.RPCError{ 1334 Code: btcjson.ErrRPCParse.Code, 1335 Message: "Failed to parse request: " + err.Error(), 1336 } 1337 reply, err := createMarshalledReply(nil, nil, jsonErr) 1338 if err != nil { 1339 rpcsLog.Errorf("Failed to marshal parse failure "+ 1340 "reply: %v", err) 1341 continue 1342 } 1343 c.SendMessage(reply, nil) 1344 continue 1345 } 1346 1347 // The JSON-RPC 1.0 spec defines that notifications must have their "id" 1348 // set to null and states that notifications do not have a response. 1349 // 1350 // A JSON-RPC 2.0 notification is a request with "json-rpc":"2.0", and 1351 // without an "id" member. The specification states that notifications 1352 // must not be responded to. JSON-RPC 2.0 permits the null value as a 1353 // valid request id, therefore such requests are not notifications. 1354 // 1355 // Bitcoin Core serves requests with "id":null or even an absent "id", 1356 // and responds to such requests with "id":null in the response. 1357 // 1358 // Btcd does not respond to any request without and "id" or "id":null, 1359 // regardless the indicated JSON-RPC protocol version unless RPC quirks 1360 // are enabled. With RPC quirks enabled, such requests will be responded 1361 // to if the reqeust does not indicate JSON-RPC version. 1362 // 1363 // RPC quirks can be enabled by the user to avoid compatibility issues 1364 // with software relying on Core's behavior. 1365 if request.ID == nil && !(cfg.RPCQuirks && request.Jsonrpc == "") { 1366 if !c.authenticated { 1367 break out 1368 } 1369 continue 1370 } 1371 1372 cmd := parseCmd(&request) 1373 if cmd.err != nil { 1374 if !c.authenticated { 1375 break out 1376 } 1377 1378 reply, err := createMarshalledReply(cmd.id, nil, cmd.err) 1379 if err != nil { 1380 rpcsLog.Errorf("Failed to marshal parse failure "+ 1381 "reply: %v", err) 1382 continue 1383 } 1384 c.SendMessage(reply, nil) 1385 continue 1386 } 1387 rpcsLog.Debugf("Received command <%s> from %s", cmd.method, c.addr) 1388 1389 // Check auth. The client is immediately disconnected if the 1390 // first request of an unauthentiated websocket client is not 1391 // the authenticate request, an authenticate request is received 1392 // when the client is already authenticated, or incorrect 1393 // authentication credentials are provided in the request. 1394 switch authCmd, ok := cmd.cmd.(*btcjson.AuthenticateCmd); { 1395 case c.authenticated && ok: 1396 rpcsLog.Warnf("Websocket client %s is already authenticated", 1397 c.addr) 1398 break out 1399 case !c.authenticated && !ok: 1400 rpcsLog.Warnf("Unauthenticated websocket message " + 1401 "received") 1402 break out 1403 case !c.authenticated: 1404 // Check credentials. 1405 login := authCmd.Username + ":" + authCmd.Passphrase 1406 auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) 1407 authSha := sha256.Sum256([]byte(auth)) 1408 cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:]) 1409 limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:]) 1410 if cmp != 1 && limitcmp != 1 { 1411 rpcsLog.Warnf("Auth failure.") 1412 break out 1413 } 1414 c.authenticated = true 1415 c.isAdmin = cmp == 1 1416 1417 // Marshal and send response. 1418 reply, err := createMarshalledReply(cmd.id, nil, nil) 1419 if err != nil { 1420 rpcsLog.Errorf("Failed to marshal authenticate reply: "+ 1421 "%v", err.Error()) 1422 continue 1423 } 1424 c.SendMessage(reply, nil) 1425 continue 1426 } 1427 1428 // Check if the client is using limited RPC credentials and 1429 // error when not authorized to call this RPC. 1430 if !c.isAdmin { 1431 if _, ok := rpcLimited[request.Method]; !ok { 1432 jsonErr := &btcjson.RPCError{ 1433 Code: btcjson.ErrRPCInvalidParams.Code, 1434 Message: "limited user not authorized for this method", 1435 } 1436 // Marshal and send response. 1437 reply, err := createMarshalledReply(request.ID, nil, jsonErr) 1438 if err != nil { 1439 rpcsLog.Errorf("Failed to marshal parse failure "+ 1440 "reply: %v", err) 1441 continue 1442 } 1443 c.SendMessage(reply, nil) 1444 continue 1445 } 1446 } 1447 1448 // Asynchronously handle the request. A semaphore is used to 1449 // limit the number of concurrent requests currently being 1450 // serviced. If the semaphore can not be acquired, simply wait 1451 // until a request finished before reading the next RPC request 1452 // from the websocket client. 1453 // 1454 // This could be a little fancier by timing out and erroring 1455 // when it takes too long to service the request, but if that is 1456 // done, the read of the next request should not be blocked by 1457 // this semaphore, otherwise the next request will be read and 1458 // will probably sit here for another few seconds before timing 1459 // out as well. This will cause the total timeout duration for 1460 // later requests to be much longer than the check here would 1461 // imply. 1462 // 1463 // If a timeout is added, the semaphore acquiring should be 1464 // moved inside of the new goroutine with a select statement 1465 // that also reads a time.After channel. This will unblock the 1466 // read of the next request from the websocket client and allow 1467 // many requests to be waited on concurrently. 1468 c.serviceRequestSem.acquire() 1469 go func() { 1470 c.serviceRequest(cmd) 1471 c.serviceRequestSem.release() 1472 }() 1473 } 1474 1475 // Ensure the connection is closed. 1476 c.Disconnect() 1477 c.wg.Done() 1478 rpcsLog.Tracef("Websocket client input handler done for %s", c.addr) 1479} 1480 1481// serviceRequest services a parsed RPC request by looking up and executing the 1482// appropriate RPC handler. The response is marshalled and sent to the 1483// websocket client. 1484func (c *wsClient) serviceRequest(r *parsedRPCCmd) { 1485 var ( 1486 result interface{} 1487 err error 1488 ) 1489 1490 // Lookup the websocket extension for the command and if it doesn't 1491 // exist fallback to handling the command as a standard command. 1492 wsHandler, ok := wsHandlers[r.method] 1493 if ok { 1494 result, err = wsHandler(c, r.cmd) 1495 } else { 1496 result, err = c.server.standardCmdResult(r, nil) 1497 } 1498 reply, err := createMarshalledReply(r.id, result, err) 1499 if err != nil { 1500 rpcsLog.Errorf("Failed to marshal reply for <%s> "+ 1501 "command: %v", r.method, err) 1502 return 1503 } 1504 c.SendMessage(reply, nil) 1505} 1506 1507// notificationQueueHandler handles the queuing of outgoing notifications for 1508// the websocket client. This runs as a muxer for various sources of input to 1509// ensure that queuing up notifications to be sent will not block. Otherwise, 1510// slow clients could bog down the other systems (such as the mempool or block 1511// manager) which are queuing the data. The data is passed on to outHandler to 1512// actually be written. It must be run as a goroutine. 1513func (c *wsClient) notificationQueueHandler() { 1514 ntfnSentChan := make(chan bool, 1) // nonblocking sync 1515 1516 // pendingNtfns is used as a queue for notifications that are ready to 1517 // be sent once there are no outstanding notifications currently being 1518 // sent. The waiting flag is used over simply checking for items in the 1519 // pending list to ensure cleanup knows what has and hasn't been sent 1520 // to the outHandler. Currently no special cleanup is needed, however 1521 // if something like a done channel is added to notifications in the 1522 // future, not knowing what has and hasn't been sent to the outHandler 1523 // (and thus who should respond to the done channel) would be 1524 // problematic without using this approach. 1525 pendingNtfns := list.New() 1526 waiting := false 1527out: 1528 for { 1529 select { 1530 // This channel is notified when a message is being queued to 1531 // be sent across the network socket. It will either send the 1532 // message immediately if a send is not already in progress, or 1533 // queue the message to be sent once the other pending messages 1534 // are sent. 1535 case msg := <-c.ntfnChan: 1536 if !waiting { 1537 c.SendMessage(msg, ntfnSentChan) 1538 } else { 1539 pendingNtfns.PushBack(msg) 1540 } 1541 waiting = true 1542 1543 // This channel is notified when a notification has been sent 1544 // across the network socket. 1545 case <-ntfnSentChan: 1546 // No longer waiting if there are no more messages in 1547 // the pending messages queue. 1548 next := pendingNtfns.Front() 1549 if next == nil { 1550 waiting = false 1551 continue 1552 } 1553 1554 // Notify the outHandler about the next item to 1555 // asynchronously send. 1556 msg := pendingNtfns.Remove(next).([]byte) 1557 c.SendMessage(msg, ntfnSentChan) 1558 1559 case <-c.quit: 1560 break out 1561 } 1562 } 1563 1564 // Drain any wait channels before exiting so nothing is left waiting 1565 // around to send. 1566cleanup: 1567 for { 1568 select { 1569 case <-c.ntfnChan: 1570 case <-ntfnSentChan: 1571 default: 1572 break cleanup 1573 } 1574 } 1575 c.wg.Done() 1576 rpcsLog.Tracef("Websocket client notification queue handler done "+ 1577 "for %s", c.addr) 1578} 1579 1580// outHandler handles all outgoing messages for the websocket connection. It 1581// must be run as a goroutine. It uses a buffered channel to serialize output 1582// messages while allowing the sender to continue running asynchronously. It 1583// must be run as a goroutine. 1584func (c *wsClient) outHandler() { 1585out: 1586 for { 1587 // Send any messages ready for send until the quit channel is 1588 // closed. 1589 select { 1590 case r := <-c.sendChan: 1591 err := c.conn.WriteMessage(websocket.TextMessage, r.msg) 1592 if err != nil { 1593 c.Disconnect() 1594 break out 1595 } 1596 if r.doneChan != nil { 1597 r.doneChan <- true 1598 } 1599 1600 case <-c.quit: 1601 break out 1602 } 1603 } 1604 1605 // Drain any wait channels before exiting so nothing is left waiting 1606 // around to send. 1607cleanup: 1608 for { 1609 select { 1610 case r := <-c.sendChan: 1611 if r.doneChan != nil { 1612 r.doneChan <- false 1613 } 1614 default: 1615 break cleanup 1616 } 1617 } 1618 c.wg.Done() 1619 rpcsLog.Tracef("Websocket client output handler done for %s", c.addr) 1620} 1621 1622// SendMessage sends the passed json to the websocket client. It is backed 1623// by a buffered channel, so it will not block until the send channel is full. 1624// Note however that QueueNotification must be used for sending async 1625// notifications instead of the this function. This approach allows a limit to 1626// the number of outstanding requests a client can make without preventing or 1627// blocking on async notifications. 1628func (c *wsClient) SendMessage(marshalledJSON []byte, doneChan chan bool) { 1629 // Don't send the message if disconnected. 1630 if c.Disconnected() { 1631 if doneChan != nil { 1632 doneChan <- false 1633 } 1634 return 1635 } 1636 1637 c.sendChan <- wsResponse{msg: marshalledJSON, doneChan: doneChan} 1638} 1639 1640// ErrClientQuit describes the error where a client send is not processed due 1641// to the client having already been disconnected or dropped. 1642var ErrClientQuit = errors.New("client quit") 1643 1644// QueueNotification queues the passed notification to be sent to the websocket 1645// client. This function, as the name implies, is only intended for 1646// notifications since it has additional logic to prevent other subsystems, such 1647// as the memory pool and block manager, from blocking even when the send 1648// channel is full. 1649// 1650// If the client is in the process of shutting down, this function returns 1651// ErrClientQuit. This is intended to be checked by long-running notification 1652// handlers to stop processing if there is no more work needed to be done. 1653func (c *wsClient) QueueNotification(marshalledJSON []byte) error { 1654 // Don't queue the message if disconnected. 1655 if c.Disconnected() { 1656 return ErrClientQuit 1657 } 1658 1659 c.ntfnChan <- marshalledJSON 1660 return nil 1661} 1662 1663// Disconnected returns whether or not the websocket client is disconnected. 1664func (c *wsClient) Disconnected() bool { 1665 c.Lock() 1666 isDisconnected := c.disconnected 1667 c.Unlock() 1668 1669 return isDisconnected 1670} 1671 1672// Disconnect disconnects the websocket client. 1673func (c *wsClient) Disconnect() { 1674 c.Lock() 1675 defer c.Unlock() 1676 1677 // Nothing to do if already disconnected. 1678 if c.disconnected { 1679 return 1680 } 1681 1682 rpcsLog.Tracef("Disconnecting websocket client %s", c.addr) 1683 close(c.quit) 1684 c.conn.Close() 1685 c.disconnected = true 1686} 1687 1688// Start begins processing input and output messages. 1689func (c *wsClient) Start() { 1690 rpcsLog.Tracef("Starting websocket client %s", c.addr) 1691 1692 // Start processing input and output. 1693 c.wg.Add(3) 1694 go c.inHandler() 1695 go c.notificationQueueHandler() 1696 go c.outHandler() 1697} 1698 1699// WaitForShutdown blocks until the websocket client goroutines are stopped 1700// and the connection is closed. 1701func (c *wsClient) WaitForShutdown() { 1702 c.wg.Wait() 1703} 1704 1705// newWebsocketClient returns a new websocket client given the notification 1706// manager, websocket connection, remote address, and whether or not the client 1707// has already been authenticated (via HTTP Basic access authentication). The 1708// returned client is ready to start. Once started, the client will process 1709// incoming and outgoing messages in separate goroutines complete with queuing 1710// and asynchrous handling for long-running operations. 1711func newWebsocketClient(server *rpcServer, conn *websocket.Conn, 1712 remoteAddr string, authenticated bool, isAdmin bool) (*wsClient, error) { 1713 1714 sessionID, err := wire.RandomUint64() 1715 if err != nil { 1716 return nil, err 1717 } 1718 1719 client := &wsClient{ 1720 conn: conn, 1721 addr: remoteAddr, 1722 authenticated: authenticated, 1723 isAdmin: isAdmin, 1724 sessionID: sessionID, 1725 server: server, 1726 addrRequests: make(map[string]struct{}), 1727 spentRequests: make(map[wire.OutPoint]struct{}), 1728 serviceRequestSem: makeSemaphore(cfg.RPCMaxConcurrentReqs), 1729 ntfnChan: make(chan []byte, 1), // nonblocking sync 1730 sendChan: make(chan wsResponse, websocketSendBufferSize), 1731 quit: make(chan struct{}), 1732 } 1733 return client, nil 1734} 1735 1736// handleWebsocketHelp implements the help command for websocket connections. 1737func handleWebsocketHelp(wsc *wsClient, icmd interface{}) (interface{}, error) { 1738 cmd, ok := icmd.(*btcjson.HelpCmd) 1739 if !ok { 1740 return nil, btcjson.ErrRPCInternal 1741 } 1742 1743 // Provide a usage overview of all commands when no specific command 1744 // was specified. 1745 var command string 1746 if cmd.Command != nil { 1747 command = *cmd.Command 1748 } 1749 if command == "" { 1750 usage, err := wsc.server.helpCacher.rpcUsage(true) 1751 if err != nil { 1752 context := "Failed to generate RPC usage" 1753 return nil, internalRPCError(err.Error(), context) 1754 } 1755 return usage, nil 1756 } 1757 1758 // Check that the command asked for is supported and implemented. 1759 // Search the list of websocket handlers as well as the main list of 1760 // handlers since help should only be provided for those cases. 1761 valid := true 1762 if _, ok := rpcHandlers[command]; !ok { 1763 if _, ok := wsHandlers[command]; !ok { 1764 valid = false 1765 } 1766 } 1767 if !valid { 1768 return nil, &btcjson.RPCError{ 1769 Code: btcjson.ErrRPCInvalidParameter, 1770 Message: "Unknown command: " + command, 1771 } 1772 } 1773 1774 // Get the help for the command. 1775 help, err := wsc.server.helpCacher.rpcMethodHelp(command) 1776 if err != nil { 1777 context := "Failed to generate help" 1778 return nil, internalRPCError(err.Error(), context) 1779 } 1780 return help, nil 1781} 1782 1783// handleLoadTxFilter implements the loadtxfilter command extension for 1784// websocket connections. 1785// 1786// NOTE: This extension is ported from github.com/decred/dcrd 1787func handleLoadTxFilter(wsc *wsClient, icmd interface{}) (interface{}, error) { 1788 cmd := icmd.(*btcjson.LoadTxFilterCmd) 1789 1790 outPoints := make([]wire.OutPoint, len(cmd.OutPoints)) 1791 for i := range cmd.OutPoints { 1792 hash, err := chainhash.NewHashFromStr(cmd.OutPoints[i].Hash) 1793 if err != nil { 1794 return nil, &btcjson.RPCError{ 1795 Code: btcjson.ErrRPCInvalidParameter, 1796 Message: err.Error(), 1797 } 1798 } 1799 outPoints[i] = wire.OutPoint{ 1800 Hash: *hash, 1801 Index: cmd.OutPoints[i].Index, 1802 } 1803 } 1804 1805 params := wsc.server.cfg.ChainParams 1806 1807 wsc.Lock() 1808 if cmd.Reload || wsc.filterData == nil { 1809 wsc.filterData = newWSClientFilter(cmd.Addresses, outPoints, 1810 params) 1811 wsc.Unlock() 1812 } else { 1813 wsc.Unlock() 1814 1815 wsc.filterData.mu.Lock() 1816 for _, a := range cmd.Addresses { 1817 wsc.filterData.addAddressStr(a, params) 1818 } 1819 for i := range outPoints { 1820 wsc.filterData.addUnspentOutPoint(&outPoints[i]) 1821 } 1822 wsc.filterData.mu.Unlock() 1823 } 1824 1825 return nil, nil 1826} 1827 1828// handleNotifyBlocks implements the notifyblocks command extension for 1829// websocket connections. 1830func handleNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) { 1831 wsc.server.ntfnMgr.RegisterBlockUpdates(wsc) 1832 return nil, nil 1833} 1834 1835// handleSession implements the session command extension for websocket 1836// connections. 1837func handleSession(wsc *wsClient, icmd interface{}) (interface{}, error) { 1838 return &btcjson.SessionResult{SessionID: wsc.sessionID}, nil 1839} 1840 1841// handleStopNotifyBlocks implements the stopnotifyblocks command extension for 1842// websocket connections. 1843func handleStopNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) { 1844 wsc.server.ntfnMgr.UnregisterBlockUpdates(wsc) 1845 return nil, nil 1846} 1847 1848// handleNotifySpent implements the notifyspent command extension for 1849// websocket connections. 1850func handleNotifySpent(wsc *wsClient, icmd interface{}) (interface{}, error) { 1851 cmd, ok := icmd.(*btcjson.NotifySpentCmd) 1852 if !ok { 1853 return nil, btcjson.ErrRPCInternal 1854 } 1855 1856 outpoints, err := deserializeOutpoints(cmd.OutPoints) 1857 if err != nil { 1858 return nil, err 1859 } 1860 1861 wsc.server.ntfnMgr.RegisterSpentRequests(wsc, outpoints) 1862 return nil, nil 1863} 1864 1865// handleNotifyNewTransations implements the notifynewtransactions command 1866// extension for websocket connections. 1867func handleNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, error) { 1868 cmd, ok := icmd.(*btcjson.NotifyNewTransactionsCmd) 1869 if !ok { 1870 return nil, btcjson.ErrRPCInternal 1871 } 1872 1873 wsc.verboseTxUpdates = cmd.Verbose != nil && *cmd.Verbose 1874 wsc.server.ntfnMgr.RegisterNewMempoolTxsUpdates(wsc) 1875 return nil, nil 1876} 1877 1878// handleStopNotifyNewTransations implements the stopnotifynewtransactions 1879// command extension for websocket connections. 1880func handleStopNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, error) { 1881 wsc.server.ntfnMgr.UnregisterNewMempoolTxsUpdates(wsc) 1882 return nil, nil 1883} 1884 1885// handleNotifyReceived implements the notifyreceived command extension for 1886// websocket connections. 1887func handleNotifyReceived(wsc *wsClient, icmd interface{}) (interface{}, error) { 1888 cmd, ok := icmd.(*btcjson.NotifyReceivedCmd) 1889 if !ok { 1890 return nil, btcjson.ErrRPCInternal 1891 } 1892 1893 // Decode addresses to validate input, but the strings slice is used 1894 // directly if these are all ok. 1895 err := checkAddressValidity(cmd.Addresses, wsc.server.cfg.ChainParams) 1896 if err != nil { 1897 return nil, err 1898 } 1899 1900 wsc.server.ntfnMgr.RegisterTxOutAddressRequests(wsc, cmd.Addresses) 1901 return nil, nil 1902} 1903 1904// handleStopNotifySpent implements the stopnotifyspent command extension for 1905// websocket connections. 1906func handleStopNotifySpent(wsc *wsClient, icmd interface{}) (interface{}, error) { 1907 cmd, ok := icmd.(*btcjson.StopNotifySpentCmd) 1908 if !ok { 1909 return nil, btcjson.ErrRPCInternal 1910 } 1911 1912 outpoints, err := deserializeOutpoints(cmd.OutPoints) 1913 if err != nil { 1914 return nil, err 1915 } 1916 1917 for _, outpoint := range outpoints { 1918 wsc.server.ntfnMgr.UnregisterSpentRequest(wsc, outpoint) 1919 } 1920 1921 return nil, nil 1922} 1923 1924// handleStopNotifyReceived implements the stopnotifyreceived command extension 1925// for websocket connections. 1926func handleStopNotifyReceived(wsc *wsClient, icmd interface{}) (interface{}, error) { 1927 cmd, ok := icmd.(*btcjson.StopNotifyReceivedCmd) 1928 if !ok { 1929 return nil, btcjson.ErrRPCInternal 1930 } 1931 1932 // Decode addresses to validate input, but the strings slice is used 1933 // directly if these are all ok. 1934 err := checkAddressValidity(cmd.Addresses, wsc.server.cfg.ChainParams) 1935 if err != nil { 1936 return nil, err 1937 } 1938 1939 for _, addr := range cmd.Addresses { 1940 wsc.server.ntfnMgr.UnregisterTxOutAddressRequest(wsc, addr) 1941 } 1942 1943 return nil, nil 1944} 1945 1946// checkAddressValidity checks the validity of each address in the passed 1947// string slice. It does this by attempting to decode each address using the 1948// current active network parameters. If any single address fails to decode 1949// properly, the function returns an error. Otherwise, nil is returned. 1950func checkAddressValidity(addrs []string, params *chaincfg.Params) error { 1951 for _, addr := range addrs { 1952 _, err := btcutil.DecodeAddress(addr, params) 1953 if err != nil { 1954 return &btcjson.RPCError{ 1955 Code: btcjson.ErrRPCInvalidAddressOrKey, 1956 Message: fmt.Sprintf("Invalid address or key: %v", 1957 addr), 1958 } 1959 } 1960 } 1961 return nil 1962} 1963 1964// deserializeOutpoints deserializes each serialized outpoint. 1965func deserializeOutpoints(serializedOuts []btcjson.OutPoint) ([]*wire.OutPoint, error) { 1966 outpoints := make([]*wire.OutPoint, 0, len(serializedOuts)) 1967 for i := range serializedOuts { 1968 blockHash, err := chainhash.NewHashFromStr(serializedOuts[i].Hash) 1969 if err != nil { 1970 return nil, rpcDecodeHexError(serializedOuts[i].Hash) 1971 } 1972 index := serializedOuts[i].Index 1973 outpoints = append(outpoints, wire.NewOutPoint(blockHash, index)) 1974 } 1975 1976 return outpoints, nil 1977} 1978 1979type rescanKeys struct { 1980 addrs map[string]struct{} 1981 unspent map[wire.OutPoint]struct{} 1982} 1983 1984// unspentSlice returns a slice of currently-unspent outpoints for the rescan 1985// lookup keys. This is primarily intended to be used to register outpoints 1986// for continuous notifications after a rescan has completed. 1987func (r *rescanKeys) unspentSlice() []*wire.OutPoint { 1988 ops := make([]*wire.OutPoint, 0, len(r.unspent)) 1989 for op := range r.unspent { 1990 opCopy := op 1991 ops = append(ops, &opCopy) 1992 } 1993 return ops 1994} 1995 1996// ErrRescanReorg defines the error that is returned when an unrecoverable 1997// reorganize is detected during a rescan. 1998var ErrRescanReorg = btcjson.RPCError{ 1999 Code: btcjson.ErrRPCDatabase, 2000 Message: "Reorganize", 2001} 2002 2003// rescanBlock rescans all transactions in a single block. This is a helper 2004// function for handleRescan. 2005func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *btcutil.Block) { 2006 for _, tx := range blk.Transactions() { 2007 // Hexadecimal representation of this tx. Only created if 2008 // needed, and reused for later notifications if already made. 2009 var txHex string 2010 2011 // All inputs and outputs must be iterated through to correctly 2012 // modify the unspent map, however, just a single notification 2013 // for any matching transaction inputs or outputs should be 2014 // created and sent. 2015 spentNotified := false 2016 recvNotified := false 2017 2018 // notifySpend is a closure we'll use when we first detect that 2019 // a transactions spends an outpoint/script in our filter list. 2020 notifySpend := func() error { 2021 if txHex == "" { 2022 txHex = txHexString(tx.MsgTx()) 2023 } 2024 marshalledJSON, err := newRedeemingTxNotification( 2025 txHex, tx.Index(), blk, 2026 ) 2027 if err != nil { 2028 return fmt.Errorf("unable to marshal "+ 2029 "btcjson.RedeeminTxNtfn: %v", err) 2030 } 2031 2032 return wsc.QueueNotification(marshalledJSON) 2033 } 2034 2035 // We'll start by iterating over the transaction's inputs to 2036 // determine if it spends an outpoint/script in our filter list. 2037 for _, txin := range tx.MsgTx().TxIn { 2038 // If it spends an outpoint, we'll dispatch a spend 2039 // notification for the transaction. 2040 if _, ok := lookups.unspent[txin.PreviousOutPoint]; ok { 2041 delete(lookups.unspent, txin.PreviousOutPoint) 2042 2043 if spentNotified { 2044 continue 2045 } 2046 2047 err := notifySpend() 2048 2049 // Stop the rescan early if the websocket client 2050 // disconnected. 2051 if err == ErrClientQuit { 2052 return 2053 } 2054 if err != nil { 2055 rpcsLog.Errorf("Unable to notify "+ 2056 "redeeming transaction %v: %v", 2057 tx.Hash(), err) 2058 continue 2059 } 2060 2061 spentNotified = true 2062 } 2063 2064 // We'll also recompute the pkScript the input is 2065 // attempting to spend to determine whether it is 2066 // relevant to us. 2067 pkScript, err := txscript.ComputePkScript( 2068 txin.SignatureScript, txin.Witness, 2069 ) 2070 if err != nil { 2071 continue 2072 } 2073 addr, err := pkScript.Address(wsc.server.cfg.ChainParams) 2074 if err != nil { 2075 continue 2076 } 2077 2078 // If it is, we'll also dispatch a spend notification 2079 // for this transaction if we haven't already. 2080 if _, ok := lookups.addrs[addr.String()]; ok { 2081 if spentNotified { 2082 continue 2083 } 2084 2085 err := notifySpend() 2086 2087 // Stop the rescan early if the websocket client 2088 // disconnected. 2089 if err == ErrClientQuit { 2090 return 2091 } 2092 if err != nil { 2093 rpcsLog.Errorf("Unable to notify "+ 2094 "redeeming transaction %v: %v", 2095 tx.Hash(), err) 2096 continue 2097 } 2098 2099 spentNotified = true 2100 } 2101 } 2102 2103 for txOutIdx, txout := range tx.MsgTx().TxOut { 2104 _, addrs, _, _ := txscript.ExtractPkScriptAddrs( 2105 txout.PkScript, wsc.server.cfg.ChainParams) 2106 2107 for _, addr := range addrs { 2108 if _, ok := lookups.addrs[addr.String()]; !ok { 2109 continue 2110 } 2111 2112 outpoint := wire.OutPoint{ 2113 Hash: *tx.Hash(), 2114 Index: uint32(txOutIdx), 2115 } 2116 lookups.unspent[outpoint] = struct{}{} 2117 2118 if recvNotified { 2119 continue 2120 } 2121 2122 if txHex == "" { 2123 txHex = txHexString(tx.MsgTx()) 2124 } 2125 ntfn := btcjson.NewRecvTxNtfn(txHex, 2126 blockDetails(blk, tx.Index())) 2127 2128 marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) 2129 if err != nil { 2130 rpcsLog.Errorf("Failed to marshal recvtx notification: %v", err) 2131 return 2132 } 2133 2134 err = wsc.QueueNotification(marshalledJSON) 2135 // Stop the rescan early if the websocket client 2136 // disconnected. 2137 if err == ErrClientQuit { 2138 return 2139 } 2140 recvNotified = true 2141 } 2142 } 2143 } 2144} 2145 2146// rescanBlockFilter rescans a block for any relevant transactions for the 2147// passed lookup keys. Any discovered transactions are returned hex encoded as 2148// a string slice. 2149// 2150// NOTE: This extension is ported from github.com/decred/dcrd 2151func rescanBlockFilter(filter *wsClientFilter, block *btcutil.Block, params *chaincfg.Params) []string { 2152 var transactions []string 2153 2154 filter.mu.Lock() 2155 for _, tx := range block.Transactions() { 2156 msgTx := tx.MsgTx() 2157 2158 // Keep track of whether the transaction has already been added 2159 // to the result. It shouldn't be added twice. 2160 added := false 2161 2162 // Scan inputs if not a coinbase transaction. 2163 if !blockchain.IsCoinBaseTx(msgTx) { 2164 for _, input := range msgTx.TxIn { 2165 if !filter.existsUnspentOutPoint(&input.PreviousOutPoint) { 2166 continue 2167 } 2168 if !added { 2169 transactions = append( 2170 transactions, 2171 txHexString(msgTx)) 2172 added = true 2173 } 2174 } 2175 } 2176 2177 // Scan outputs. 2178 for i, output := range msgTx.TxOut { 2179 _, addrs, _, err := txscript.ExtractPkScriptAddrs( 2180 output.PkScript, params) 2181 if err != nil { 2182 continue 2183 } 2184 for _, a := range addrs { 2185 if !filter.existsAddress(a) { 2186 continue 2187 } 2188 2189 op := wire.OutPoint{ 2190 Hash: *tx.Hash(), 2191 Index: uint32(i), 2192 } 2193 filter.addUnspentOutPoint(&op) 2194 2195 if !added { 2196 transactions = append( 2197 transactions, 2198 txHexString(msgTx)) 2199 added = true 2200 } 2201 } 2202 } 2203 } 2204 filter.mu.Unlock() 2205 2206 return transactions 2207} 2208 2209// handleRescanBlocks implements the rescanblocks command extension for 2210// websocket connections. 2211// 2212// NOTE: This extension is ported from github.com/decred/dcrd 2213func handleRescanBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) { 2214 cmd, ok := icmd.(*btcjson.RescanBlocksCmd) 2215 if !ok { 2216 return nil, btcjson.ErrRPCInternal 2217 } 2218 2219 // Load client's transaction filter. Must exist in order to continue. 2220 wsc.Lock() 2221 filter := wsc.filterData 2222 wsc.Unlock() 2223 if filter == nil { 2224 return nil, &btcjson.RPCError{ 2225 Code: btcjson.ErrRPCMisc, 2226 Message: "Transaction filter must be loaded before rescanning", 2227 } 2228 } 2229 2230 blockHashes := make([]*chainhash.Hash, len(cmd.BlockHashes)) 2231 2232 for i := range cmd.BlockHashes { 2233 hash, err := chainhash.NewHashFromStr(cmd.BlockHashes[i]) 2234 if err != nil { 2235 return nil, err 2236 } 2237 blockHashes[i] = hash 2238 } 2239 2240 discoveredData := make([]btcjson.RescannedBlock, 0, len(blockHashes)) 2241 2242 // Iterate over each block in the request and rescan. When a block 2243 // contains relevant transactions, add it to the response. 2244 bc := wsc.server.cfg.Chain 2245 params := wsc.server.cfg.ChainParams 2246 var lastBlockHash *chainhash.Hash 2247 for i := range blockHashes { 2248 block, err := bc.BlockByHash(blockHashes[i]) 2249 if err != nil { 2250 return nil, &btcjson.RPCError{ 2251 Code: btcjson.ErrRPCBlockNotFound, 2252 Message: "Failed to fetch block: " + err.Error(), 2253 } 2254 } 2255 if lastBlockHash != nil && block.MsgBlock().Header.PrevBlock != *lastBlockHash { 2256 return nil, &btcjson.RPCError{ 2257 Code: btcjson.ErrRPCInvalidParameter, 2258 Message: fmt.Sprintf("Block %v is not a child of %v", 2259 blockHashes[i], lastBlockHash), 2260 } 2261 } 2262 lastBlockHash = blockHashes[i] 2263 2264 transactions := rescanBlockFilter(filter, block, params) 2265 if len(transactions) != 0 { 2266 discoveredData = append(discoveredData, btcjson.RescannedBlock{ 2267 Hash: cmd.BlockHashes[i], 2268 Transactions: transactions, 2269 }) 2270 } 2271 } 2272 2273 return &discoveredData, nil 2274} 2275 2276// recoverFromReorg attempts to recover from a detected reorganize during a 2277// rescan. It fetches a new range of block shas from the database and 2278// verifies that the new range of blocks is on the same fork as a previous 2279// range of blocks. If this condition does not hold true, the JSON-RPC error 2280// for an unrecoverable reorganize is returned. 2281func recoverFromReorg(chain *blockchain.BlockChain, minBlock, maxBlock int32, 2282 lastBlock *chainhash.Hash) ([]chainhash.Hash, error) { 2283 2284 hashList, err := chain.HeightRange(minBlock, maxBlock) 2285 if err != nil { 2286 rpcsLog.Errorf("Error looking up block range: %v", err) 2287 return nil, &btcjson.RPCError{ 2288 Code: btcjson.ErrRPCDatabase, 2289 Message: "Database error: " + err.Error(), 2290 } 2291 } 2292 if lastBlock == nil || len(hashList) == 0 { 2293 return hashList, nil 2294 } 2295 2296 blk, err := chain.BlockByHash(&hashList[0]) 2297 if err != nil { 2298 rpcsLog.Errorf("Error looking up possibly reorged block: %v", 2299 err) 2300 return nil, &btcjson.RPCError{ 2301 Code: btcjson.ErrRPCDatabase, 2302 Message: "Database error: " + err.Error(), 2303 } 2304 } 2305 jsonErr := descendantBlock(lastBlock, blk) 2306 if jsonErr != nil { 2307 return nil, jsonErr 2308 } 2309 return hashList, nil 2310} 2311 2312// descendantBlock returns the appropriate JSON-RPC error if a current block 2313// fetched during a reorganize is not a direct child of the parent block hash. 2314func descendantBlock(prevHash *chainhash.Hash, curBlock *btcutil.Block) error { 2315 curHash := &curBlock.MsgBlock().Header.PrevBlock 2316 if !prevHash.IsEqual(curHash) { 2317 rpcsLog.Errorf("Stopping rescan for reorged block %v "+ 2318 "(replaced by block %v)", prevHash, curHash) 2319 return &ErrRescanReorg 2320 } 2321 return nil 2322} 2323 2324// scanBlockChunks executes a rescan in chunked stages. We do this to limit the 2325// amount of memory that we'll allocate to a given rescan. Every so often, 2326// we'll send back a rescan progress notification to the websockets client. The 2327// final block and block hash that we've scanned will be returned. 2328func scanBlockChunks(wsc *wsClient, cmd *btcjson.RescanCmd, lookups *rescanKeys, minBlock, 2329 maxBlock int32, chain *blockchain.BlockChain) ( 2330 *btcutil.Block, *chainhash.Hash, error) { 2331 2332 // lastBlock and lastBlockHash track the previously-rescanned block. 2333 // They equal nil when no previous blocks have been rescanned. 2334 var ( 2335 lastBlock *btcutil.Block 2336 lastBlockHash *chainhash.Hash 2337 ) 2338 2339 // A ticker is created to wait at least 10 seconds before notifying the 2340 // websocket client of the current progress completed by the rescan. 2341 ticker := time.NewTicker(10 * time.Second) 2342 defer ticker.Stop() 2343 2344 // Instead of fetching all block shas at once, fetch in smaller chunks 2345 // to ensure large rescans consume a limited amount of memory. 2346fetchRange: 2347 for minBlock < maxBlock { 2348 // Limit the max number of hashes to fetch at once to the 2349 // maximum number of items allowed in a single inventory. 2350 // This value could be higher since it's not creating inventory 2351 // messages, but this mirrors the limiting logic used in the 2352 // peer-to-peer protocol. 2353 maxLoopBlock := maxBlock 2354 if maxLoopBlock-minBlock > wire.MaxInvPerMsg { 2355 maxLoopBlock = minBlock + wire.MaxInvPerMsg 2356 } 2357 hashList, err := chain.HeightRange(minBlock, maxLoopBlock) 2358 if err != nil { 2359 rpcsLog.Errorf("Error looking up block range: %v", err) 2360 return nil, nil, &btcjson.RPCError{ 2361 Code: btcjson.ErrRPCDatabase, 2362 Message: "Database error: " + err.Error(), 2363 } 2364 } 2365 if len(hashList) == 0 { 2366 // The rescan is finished if no blocks hashes for this 2367 // range were successfully fetched and a stop block 2368 // was provided. 2369 if maxBlock != math.MaxInt32 { 2370 break 2371 } 2372 2373 // If the rescan is through the current block, set up 2374 // the client to continue to receive notifications 2375 // regarding all rescanned addresses and the current set 2376 // of unspent outputs. 2377 // 2378 // This is done safely by temporarily grabbing exclusive 2379 // access of the block manager. If no more blocks have 2380 // been attached between this pause and the fetch above, 2381 // then it is safe to register the websocket client for 2382 // continuous notifications if necessary. Otherwise, 2383 // continue the fetch loop again to rescan the new 2384 // blocks (or error due to an irrecoverable reorganize). 2385 pauseGuard := wsc.server.cfg.SyncMgr.Pause() 2386 best := wsc.server.cfg.Chain.BestSnapshot() 2387 curHash := &best.Hash 2388 again := true 2389 if lastBlockHash == nil || *lastBlockHash == *curHash { 2390 again = false 2391 n := wsc.server.ntfnMgr 2392 n.RegisterSpentRequests(wsc, lookups.unspentSlice()) 2393 n.RegisterTxOutAddressRequests(wsc, cmd.Addresses) 2394 } 2395 close(pauseGuard) 2396 if err != nil { 2397 rpcsLog.Errorf("Error fetching best block "+ 2398 "hash: %v", err) 2399 return nil, nil, &btcjson.RPCError{ 2400 Code: btcjson.ErrRPCDatabase, 2401 Message: "Database error: " + 2402 err.Error(), 2403 } 2404 } 2405 if again { 2406 continue 2407 } 2408 break 2409 } 2410 2411 loopHashList: 2412 for i := range hashList { 2413 blk, err := chain.BlockByHash(&hashList[i]) 2414 if err != nil { 2415 // Only handle reorgs if a block could not be 2416 // found for the hash. 2417 if dbErr, ok := err.(database.Error); !ok || 2418 dbErr.ErrorCode != database.ErrBlockNotFound { 2419 2420 rpcsLog.Errorf("Error looking up "+ 2421 "block: %v", err) 2422 return nil, nil, &btcjson.RPCError{ 2423 Code: btcjson.ErrRPCDatabase, 2424 Message: "Database error: " + 2425 err.Error(), 2426 } 2427 } 2428 2429 // If an absolute max block was specified, don't 2430 // attempt to handle the reorg. 2431 if maxBlock != math.MaxInt32 { 2432 rpcsLog.Errorf("Stopping rescan for "+ 2433 "reorged block %v", 2434 cmd.EndBlock) 2435 return nil, nil, &ErrRescanReorg 2436 } 2437 2438 // If the lookup for the previously valid block 2439 // hash failed, there may have been a reorg. 2440 // Fetch a new range of block hashes and verify 2441 // that the previously processed block (if there 2442 // was any) still exists in the database. If it 2443 // doesn't, we error. 2444 // 2445 // A goto is used to branch executation back to 2446 // before the range was evaluated, as it must be 2447 // reevaluated for the new hashList. 2448 minBlock += int32(i) 2449 hashList, err = recoverFromReorg( 2450 chain, minBlock, maxBlock, lastBlockHash, 2451 ) 2452 if err != nil { 2453 return nil, nil, err 2454 } 2455 if len(hashList) == 0 { 2456 break fetchRange 2457 } 2458 goto loopHashList 2459 } 2460 if i == 0 && lastBlockHash != nil { 2461 // Ensure the new hashList is on the same fork 2462 // as the last block from the old hashList. 2463 jsonErr := descendantBlock(lastBlockHash, blk) 2464 if jsonErr != nil { 2465 return nil, nil, jsonErr 2466 } 2467 } 2468 2469 // A select statement is used to stop rescans if the 2470 // client requesting the rescan has disconnected. 2471 select { 2472 case <-wsc.quit: 2473 rpcsLog.Debugf("Stopped rescan at height %v "+ 2474 "for disconnected client", blk.Height()) 2475 return nil, nil, nil 2476 default: 2477 rescanBlock(wsc, lookups, blk) 2478 lastBlock = blk 2479 lastBlockHash = blk.Hash() 2480 } 2481 2482 // Periodically notify the client of the progress 2483 // completed. Continue with next block if no progress 2484 // notification is needed yet. 2485 select { 2486 case <-ticker.C: // fallthrough 2487 default: 2488 continue 2489 } 2490 2491 n := btcjson.NewRescanProgressNtfn( 2492 hashList[i].String(), blk.Height(), 2493 blk.MsgBlock().Header.Timestamp.Unix(), 2494 ) 2495 mn, err := btcjson.MarshalCmd(nil, n) 2496 if err != nil { 2497 rpcsLog.Errorf("Failed to marshal rescan "+ 2498 "progress notification: %v", err) 2499 continue 2500 } 2501 2502 if err = wsc.QueueNotification(mn); err == ErrClientQuit { 2503 // Finished if the client disconnected. 2504 rpcsLog.Debugf("Stopped rescan at height %v "+ 2505 "for disconnected client", blk.Height()) 2506 return nil, nil, nil 2507 } 2508 } 2509 2510 minBlock += int32(len(hashList)) 2511 } 2512 2513 return lastBlock, lastBlockHash, nil 2514} 2515 2516// handleRescan implements the rescan command extension for websocket 2517// connections. 2518// 2519// NOTE: This does not smartly handle reorgs, and fixing requires database 2520// changes (for safe, concurrent access to full block ranges, and support 2521// for other chains than the best chain). It will, however, detect whether 2522// a reorg removed a block that was previously processed, and result in the 2523// handler erroring. Clients must handle this by finding a block still in 2524// the chain (perhaps from a rescanprogress notification) to resume their 2525// rescan. 2526func handleRescan(wsc *wsClient, icmd interface{}) (interface{}, error) { 2527 cmd, ok := icmd.(*btcjson.RescanCmd) 2528 if !ok { 2529 return nil, btcjson.ErrRPCInternal 2530 } 2531 2532 outpoints := make([]*wire.OutPoint, 0, len(cmd.OutPoints)) 2533 for i := range cmd.OutPoints { 2534 cmdOutpoint := &cmd.OutPoints[i] 2535 blockHash, err := chainhash.NewHashFromStr(cmdOutpoint.Hash) 2536 if err != nil { 2537 return nil, rpcDecodeHexError(cmdOutpoint.Hash) 2538 } 2539 outpoint := wire.NewOutPoint(blockHash, cmdOutpoint.Index) 2540 outpoints = append(outpoints, outpoint) 2541 } 2542 2543 numAddrs := len(cmd.Addresses) 2544 if numAddrs == 1 { 2545 rpcsLog.Info("Beginning rescan for 1 address") 2546 } else { 2547 rpcsLog.Infof("Beginning rescan for %d addresses", numAddrs) 2548 } 2549 2550 // Build lookup maps. 2551 lookups := rescanKeys{ 2552 addrs: map[string]struct{}{}, 2553 unspent: map[wire.OutPoint]struct{}{}, 2554 } 2555 for _, addrStr := range cmd.Addresses { 2556 lookups.addrs[addrStr] = struct{}{} 2557 } 2558 for _, outpoint := range outpoints { 2559 lookups.unspent[*outpoint] = struct{}{} 2560 } 2561 2562 chain := wsc.server.cfg.Chain 2563 2564 minBlockHash, err := chainhash.NewHashFromStr(cmd.BeginBlock) 2565 if err != nil { 2566 return nil, rpcDecodeHexError(cmd.BeginBlock) 2567 } 2568 minBlock, err := chain.BlockHeightByHash(minBlockHash) 2569 if err != nil { 2570 return nil, &btcjson.RPCError{ 2571 Code: btcjson.ErrRPCBlockNotFound, 2572 Message: "Error getting block: " + err.Error(), 2573 } 2574 } 2575 2576 maxBlock := int32(math.MaxInt32) 2577 if cmd.EndBlock != nil { 2578 maxBlockHash, err := chainhash.NewHashFromStr(*cmd.EndBlock) 2579 if err != nil { 2580 return nil, rpcDecodeHexError(*cmd.EndBlock) 2581 } 2582 maxBlock, err = chain.BlockHeightByHash(maxBlockHash) 2583 if err != nil { 2584 return nil, &btcjson.RPCError{ 2585 Code: btcjson.ErrRPCBlockNotFound, 2586 Message: "Error getting block: " + err.Error(), 2587 } 2588 } 2589 } 2590 2591 var ( 2592 lastBlock *btcutil.Block 2593 lastBlockHash *chainhash.Hash 2594 ) 2595 if len(lookups.addrs) != 0 || len(lookups.unspent) != 0 { 2596 // With all the arguments parsed, we'll execute our chunked rescan 2597 // which will notify the clients of any address deposits or output 2598 // spends. 2599 lastBlock, lastBlockHash, err = scanBlockChunks( 2600 wsc, cmd, &lookups, minBlock, maxBlock, chain, 2601 ) 2602 if err != nil { 2603 return nil, err 2604 } 2605 2606 // If the last block is nil, then this means that the client 2607 // disconnected mid-rescan. As a result, we don't need to send 2608 // anything back to them. 2609 if lastBlock == nil { 2610 return nil, nil 2611 } 2612 } else { 2613 rpcsLog.Infof("Skipping rescan as client has no addrs/utxos") 2614 2615 // If we didn't actually do a rescan, then we'll give the 2616 // client our best known block within the final rescan finished 2617 // notification. 2618 chainTip := chain.BestSnapshot() 2619 lastBlockHash = &chainTip.Hash 2620 lastBlock, err = chain.BlockByHash(lastBlockHash) 2621 if err != nil { 2622 return nil, &btcjson.RPCError{ 2623 Code: btcjson.ErrRPCBlockNotFound, 2624 Message: "Error getting block: " + err.Error(), 2625 } 2626 } 2627 } 2628 2629 // Notify websocket client of the finished rescan. Due to how btcd 2630 // asynchronously queues notifications to not block calling code, 2631 // there is no guarantee that any of the notifications created during 2632 // rescan (such as rescanprogress, recvtx and redeemingtx) will be 2633 // received before the rescan RPC returns. Therefore, another method 2634 // is needed to safely inform clients that all rescan notifications have 2635 // been sent. 2636 n := btcjson.NewRescanFinishedNtfn( 2637 lastBlockHash.String(), lastBlock.Height(), 2638 lastBlock.MsgBlock().Header.Timestamp.Unix(), 2639 ) 2640 if mn, err := btcjson.MarshalCmd(nil, n); err != nil { 2641 rpcsLog.Errorf("Failed to marshal rescan finished "+ 2642 "notification: %v", err) 2643 } else { 2644 // The rescan is finished, so we don't care whether the client 2645 // has disconnected at this point, so discard error. 2646 _ = wsc.QueueNotification(mn) 2647 } 2648 2649 rpcsLog.Info("Finished rescan") 2650 return nil, nil 2651} 2652 2653func init() { 2654 wsHandlers = wsHandlersBeforeInit 2655} 2656