1// Copyright (c) 2013-2017 The btcsuite developers
2// Copyright (c) 2015-2017 The Decred developers
3// Use of this source code is governed by an ISC
4// license that can be found in the LICENSE file.
5
6package main
7
8import (
9	"bytes"
10	"container/list"
11	"crypto/sha256"
12	"crypto/subtle"
13	"encoding/base64"
14	"encoding/hex"
15	"encoding/json"
16	"errors"
17	"fmt"
18	"io"
19	"math"
20	"sync"
21	"time"
22
23	"github.com/btcsuite/btcd/blockchain"
24	"github.com/btcsuite/btcd/btcjson"
25	"github.com/btcsuite/btcd/chaincfg"
26	"github.com/btcsuite/btcd/chaincfg/chainhash"
27	"github.com/btcsuite/btcd/database"
28	"github.com/btcsuite/btcd/txscript"
29	"github.com/btcsuite/btcd/wire"
30	"github.com/btcsuite/btcutil"
31	"github.com/btcsuite/websocket"
32	"golang.org/x/crypto/ripemd160"
33)
34
35const (
36	// websocketSendBufferSize is the number of elements the send channel
37	// can queue before blocking.  Note that this only applies to requests
38	// handled directly in the websocket client input handler or the async
39	// handler since notifications have their own queuing mechanism
40	// independent of the send channel buffer.
41	websocketSendBufferSize = 50
42)
43
44type semaphore chan struct{}
45
46func makeSemaphore(n int) semaphore {
47	return make(chan struct{}, n)
48}
49
50func (s semaphore) acquire() { s <- struct{}{} }
51func (s semaphore) release() { <-s }
52
53// timeZeroVal is simply the zero value for a time.Time and is used to avoid
54// creating multiple instances.
55var timeZeroVal time.Time
56
57// wsCommandHandler describes a callback function used to handle a specific
58// command.
59type wsCommandHandler func(*wsClient, interface{}) (interface{}, error)
60
61// wsHandlers maps RPC command strings to appropriate websocket handler
62// functions.  This is set by init because help references wsHandlers and thus
63// causes a dependency loop.
64var wsHandlers map[string]wsCommandHandler
65var wsHandlersBeforeInit = map[string]wsCommandHandler{
66	"loadtxfilter":              handleLoadTxFilter,
67	"help":                      handleWebsocketHelp,
68	"notifyblocks":              handleNotifyBlocks,
69	"notifynewtransactions":     handleNotifyNewTransactions,
70	"notifyreceived":            handleNotifyReceived,
71	"notifyspent":               handleNotifySpent,
72	"session":                   handleSession,
73	"stopnotifyblocks":          handleStopNotifyBlocks,
74	"stopnotifynewtransactions": handleStopNotifyNewTransactions,
75	"stopnotifyspent":           handleStopNotifySpent,
76	"stopnotifyreceived":        handleStopNotifyReceived,
77	"rescan":                    handleRescan,
78	"rescanblocks":              handleRescanBlocks,
79}
80
81// WebsocketHandler handles a new websocket client by creating a new wsClient,
82// starting it, and blocking until the connection closes.  Since it blocks, it
83// must be run in a separate goroutine.  It should be invoked from the websocket
84// server handler which runs each new connection in a new goroutine thereby
85// satisfying the requirement.
86func (s *rpcServer) WebsocketHandler(conn *websocket.Conn, remoteAddr string,
87	authenticated bool, isAdmin bool) {
88
89	// Clear the read deadline that was set before the websocket hijacked
90	// the connection.
91	conn.SetReadDeadline(timeZeroVal)
92
93	// Limit max number of websocket clients.
94	rpcsLog.Infof("New websocket client %s", remoteAddr)
95	if s.ntfnMgr.NumClients()+1 > cfg.RPCMaxWebsockets {
96		rpcsLog.Infof("Max websocket clients exceeded [%d] - "+
97			"disconnecting client %s", cfg.RPCMaxWebsockets,
98			remoteAddr)
99		conn.Close()
100		return
101	}
102
103	// Create a new websocket client to handle the new websocket connection
104	// and wait for it to shutdown.  Once it has shutdown (and hence
105	// disconnected), remove it and any notifications it registered for.
106	client, err := newWebsocketClient(s, conn, remoteAddr, authenticated, isAdmin)
107	if err != nil {
108		rpcsLog.Errorf("Failed to serve client %s: %v", remoteAddr, err)
109		conn.Close()
110		return
111	}
112	s.ntfnMgr.AddClient(client)
113	client.Start()
114	client.WaitForShutdown()
115	s.ntfnMgr.RemoveClient(client)
116	rpcsLog.Infof("Disconnected websocket client %s", remoteAddr)
117}
118
119// wsNotificationManager is a connection and notification manager used for
120// websockets.  It allows websocket clients to register for notifications they
121// are interested in.  When an event happens elsewhere in the code such as
122// transactions being added to the memory pool or block connects/disconnects,
123// the notification manager is provided with the relevant details needed to
124// figure out which websocket clients need to be notified based on what they
125// have registered for and notifies them accordingly.  It is also used to keep
126// track of all connected websocket clients.
127type wsNotificationManager struct {
128	// server is the RPC server the notification manager is associated with.
129	server *rpcServer
130
131	// queueNotification queues a notification for handling.
132	queueNotification chan interface{}
133
134	// notificationMsgs feeds notificationHandler with notifications
135	// and client (un)registeration requests from a queue as well as
136	// registeration and unregisteration requests from clients.
137	notificationMsgs chan interface{}
138
139	// Access channel for current number of connected clients.
140	numClients chan int
141
142	// Shutdown handling
143	wg   sync.WaitGroup
144	quit chan struct{}
145}
146
147// queueHandler manages a queue of empty interfaces, reading from in and
148// sending the oldest unsent to out.  This handler stops when either of the
149// in or quit channels are closed, and closes out before returning, without
150// waiting to send any variables still remaining in the queue.
151func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) {
152	var q []interface{}
153	var dequeue chan<- interface{}
154	skipQueue := out
155	var next interface{}
156out:
157	for {
158		select {
159		case n, ok := <-in:
160			if !ok {
161				// Sender closed input channel.
162				break out
163			}
164
165			// Either send to out immediately if skipQueue is
166			// non-nil (queue is empty) and reader is ready,
167			// or append to the queue and send later.
168			select {
169			case skipQueue <- n:
170			default:
171				q = append(q, n)
172				dequeue = out
173				skipQueue = nil
174				next = q[0]
175			}
176
177		case dequeue <- next:
178			copy(q, q[1:])
179			q[len(q)-1] = nil // avoid leak
180			q = q[:len(q)-1]
181			if len(q) == 0 {
182				dequeue = nil
183				skipQueue = out
184			} else {
185				next = q[0]
186			}
187
188		case <-quit:
189			break out
190		}
191	}
192	close(out)
193}
194
195// queueHandler maintains a queue of notifications and notification handler
196// control messages.
197func (m *wsNotificationManager) queueHandler() {
198	queueHandler(m.queueNotification, m.notificationMsgs, m.quit)
199	m.wg.Done()
200}
201
202// NotifyBlockConnected passes a block newly-connected to the best chain
203// to the notification manager for block and transaction notification
204// processing.
205func (m *wsNotificationManager) NotifyBlockConnected(block *btcutil.Block) {
206	// As NotifyBlockConnected will be called by the block manager
207	// and the RPC server may no longer be running, use a select
208	// statement to unblock enqueuing the notification once the RPC
209	// server has begun shutting down.
210	select {
211	case m.queueNotification <- (*notificationBlockConnected)(block):
212	case <-m.quit:
213	}
214}
215
216// NotifyBlockDisconnected passes a block disconnected from the best chain
217// to the notification manager for block notification processing.
218func (m *wsNotificationManager) NotifyBlockDisconnected(block *btcutil.Block) {
219	// As NotifyBlockDisconnected will be called by the block manager
220	// and the RPC server may no longer be running, use a select
221	// statement to unblock enqueuing the notification once the RPC
222	// server has begun shutting down.
223	select {
224	case m.queueNotification <- (*notificationBlockDisconnected)(block):
225	case <-m.quit:
226	}
227}
228
229// NotifyMempoolTx passes a transaction accepted by mempool to the
230// notification manager for transaction notification processing.  If
231// isNew is true, the tx is is a new transaction, rather than one
232// added to the mempool during a reorg.
233func (m *wsNotificationManager) NotifyMempoolTx(tx *btcutil.Tx, isNew bool) {
234	n := &notificationTxAcceptedByMempool{
235		isNew: isNew,
236		tx:    tx,
237	}
238
239	// As NotifyMempoolTx will be called by mempool and the RPC server
240	// may no longer be running, use a select statement to unblock
241	// enqueuing the notification once the RPC server has begun
242	// shutting down.
243	select {
244	case m.queueNotification <- n:
245	case <-m.quit:
246	}
247}
248
249// wsClientFilter tracks relevant addresses for each websocket client for
250// the `rescanblocks` extension. It is modified by the `loadtxfilter` command.
251//
252// NOTE: This extension was ported from github.com/decred/dcrd
253type wsClientFilter struct {
254	mu sync.Mutex
255
256	// Implemented fast paths for address lookup.
257	pubKeyHashes        map[[ripemd160.Size]byte]struct{}
258	scriptHashes        map[[ripemd160.Size]byte]struct{}
259	compressedPubKeys   map[[33]byte]struct{}
260	uncompressedPubKeys map[[65]byte]struct{}
261
262	// A fallback address lookup map in case a fast path doesn't exist.
263	// Only exists for completeness.  If using this shows up in a profile,
264	// there's a good chance a fast path should be added.
265	otherAddresses map[string]struct{}
266
267	// Outpoints of unspent outputs.
268	unspent map[wire.OutPoint]struct{}
269}
270
271// newWSClientFilter creates a new, empty wsClientFilter struct to be used
272// for a websocket client.
273//
274// NOTE: This extension was ported from github.com/decred/dcrd
275func newWSClientFilter(addresses []string, unspentOutPoints []wire.OutPoint, params *chaincfg.Params) *wsClientFilter {
276	filter := &wsClientFilter{
277		pubKeyHashes:        map[[ripemd160.Size]byte]struct{}{},
278		scriptHashes:        map[[ripemd160.Size]byte]struct{}{},
279		compressedPubKeys:   map[[33]byte]struct{}{},
280		uncompressedPubKeys: map[[65]byte]struct{}{},
281		otherAddresses:      map[string]struct{}{},
282		unspent:             make(map[wire.OutPoint]struct{}, len(unspentOutPoints)),
283	}
284
285	for _, s := range addresses {
286		filter.addAddressStr(s, params)
287	}
288	for i := range unspentOutPoints {
289		filter.addUnspentOutPoint(&unspentOutPoints[i])
290	}
291
292	return filter
293}
294
295// addAddress adds an address to a wsClientFilter, treating it correctly based
296// on the type of address passed as an argument.
297//
298// NOTE: This extension was ported from github.com/decred/dcrd
299func (f *wsClientFilter) addAddress(a btcutil.Address) {
300	switch a := a.(type) {
301	case *btcutil.AddressPubKeyHash:
302		f.pubKeyHashes[*a.Hash160()] = struct{}{}
303		return
304	case *btcutil.AddressScriptHash:
305		f.scriptHashes[*a.Hash160()] = struct{}{}
306		return
307	case *btcutil.AddressPubKey:
308		serializedPubKey := a.ScriptAddress()
309		switch len(serializedPubKey) {
310		case 33: // compressed
311			var compressedPubKey [33]byte
312			copy(compressedPubKey[:], serializedPubKey)
313			f.compressedPubKeys[compressedPubKey] = struct{}{}
314			return
315		case 65: // uncompressed
316			var uncompressedPubKey [65]byte
317			copy(uncompressedPubKey[:], serializedPubKey)
318			f.uncompressedPubKeys[uncompressedPubKey] = struct{}{}
319			return
320		}
321	}
322
323	f.otherAddresses[a.EncodeAddress()] = struct{}{}
324}
325
326// addAddressStr parses an address from a string and then adds it to the
327// wsClientFilter using addAddress.
328//
329// NOTE: This extension was ported from github.com/decred/dcrd
330func (f *wsClientFilter) addAddressStr(s string, params *chaincfg.Params) {
331	// If address can't be decoded, no point in saving it since it should also
332	// impossible to create the address from an inspected transaction output
333	// script.
334	a, err := btcutil.DecodeAddress(s, params)
335	if err != nil {
336		return
337	}
338	f.addAddress(a)
339}
340
341// existsAddress returns true if the passed address has been added to the
342// wsClientFilter.
343//
344// NOTE: This extension was ported from github.com/decred/dcrd
345func (f *wsClientFilter) existsAddress(a btcutil.Address) bool {
346	switch a := a.(type) {
347	case *btcutil.AddressPubKeyHash:
348		_, ok := f.pubKeyHashes[*a.Hash160()]
349		return ok
350	case *btcutil.AddressScriptHash:
351		_, ok := f.scriptHashes[*a.Hash160()]
352		return ok
353	case *btcutil.AddressPubKey:
354		serializedPubKey := a.ScriptAddress()
355		switch len(serializedPubKey) {
356		case 33: // compressed
357			var compressedPubKey [33]byte
358			copy(compressedPubKey[:], serializedPubKey)
359			_, ok := f.compressedPubKeys[compressedPubKey]
360			if !ok {
361				_, ok = f.pubKeyHashes[*a.AddressPubKeyHash().Hash160()]
362			}
363			return ok
364		case 65: // uncompressed
365			var uncompressedPubKey [65]byte
366			copy(uncompressedPubKey[:], serializedPubKey)
367			_, ok := f.uncompressedPubKeys[uncompressedPubKey]
368			if !ok {
369				_, ok = f.pubKeyHashes[*a.AddressPubKeyHash().Hash160()]
370			}
371			return ok
372		}
373	}
374
375	_, ok := f.otherAddresses[a.EncodeAddress()]
376	return ok
377}
378
379// removeAddress removes the passed address, if it exists, from the
380// wsClientFilter.
381//
382// NOTE: This extension was ported from github.com/decred/dcrd
383func (f *wsClientFilter) removeAddress(a btcutil.Address) {
384	switch a := a.(type) {
385	case *btcutil.AddressPubKeyHash:
386		delete(f.pubKeyHashes, *a.Hash160())
387		return
388	case *btcutil.AddressScriptHash:
389		delete(f.scriptHashes, *a.Hash160())
390		return
391	case *btcutil.AddressPubKey:
392		serializedPubKey := a.ScriptAddress()
393		switch len(serializedPubKey) {
394		case 33: // compressed
395			var compressedPubKey [33]byte
396			copy(compressedPubKey[:], serializedPubKey)
397			delete(f.compressedPubKeys, compressedPubKey)
398			return
399		case 65: // uncompressed
400			var uncompressedPubKey [65]byte
401			copy(uncompressedPubKey[:], serializedPubKey)
402			delete(f.uncompressedPubKeys, uncompressedPubKey)
403			return
404		}
405	}
406
407	delete(f.otherAddresses, a.EncodeAddress())
408}
409
410// removeAddressStr parses an address from a string and then removes it from the
411// wsClientFilter using removeAddress.
412//
413// NOTE: This extension was ported from github.com/decred/dcrd
414func (f *wsClientFilter) removeAddressStr(s string, params *chaincfg.Params) {
415	a, err := btcutil.DecodeAddress(s, params)
416	if err == nil {
417		f.removeAddress(a)
418	} else {
419		delete(f.otherAddresses, s)
420	}
421}
422
423// addUnspentOutPoint adds an outpoint to the wsClientFilter.
424//
425// NOTE: This extension was ported from github.com/decred/dcrd
426func (f *wsClientFilter) addUnspentOutPoint(op *wire.OutPoint) {
427	f.unspent[*op] = struct{}{}
428}
429
430// existsUnspentOutPoint returns true if the passed outpoint has been added to
431// the wsClientFilter.
432//
433// NOTE: This extension was ported from github.com/decred/dcrd
434func (f *wsClientFilter) existsUnspentOutPoint(op *wire.OutPoint) bool {
435	_, ok := f.unspent[*op]
436	return ok
437}
438
439// removeUnspentOutPoint removes the passed outpoint, if it exists, from the
440// wsClientFilter.
441//
442// NOTE: This extension was ported from github.com/decred/dcrd
443func (f *wsClientFilter) removeUnspentOutPoint(op *wire.OutPoint) {
444	delete(f.unspent, *op)
445}
446
447// Notification types
448type notificationBlockConnected btcutil.Block
449type notificationBlockDisconnected btcutil.Block
450type notificationTxAcceptedByMempool struct {
451	isNew bool
452	tx    *btcutil.Tx
453}
454
455// Notification control requests
456type notificationRegisterClient wsClient
457type notificationUnregisterClient wsClient
458type notificationRegisterBlocks wsClient
459type notificationUnregisterBlocks wsClient
460type notificationRegisterNewMempoolTxs wsClient
461type notificationUnregisterNewMempoolTxs wsClient
462type notificationRegisterSpent struct {
463	wsc *wsClient
464	ops []*wire.OutPoint
465}
466type notificationUnregisterSpent struct {
467	wsc *wsClient
468	op  *wire.OutPoint
469}
470type notificationRegisterAddr struct {
471	wsc   *wsClient
472	addrs []string
473}
474type notificationUnregisterAddr struct {
475	wsc  *wsClient
476	addr string
477}
478
479// notificationHandler reads notifications and control messages from the queue
480// handler and processes one at a time.
481func (m *wsNotificationManager) notificationHandler() {
482	// clients is a map of all currently connected websocket clients.
483	clients := make(map[chan struct{}]*wsClient)
484
485	// Maps used to hold lists of websocket clients to be notified on
486	// certain events.  Each websocket client also keeps maps for the events
487	// which have multiple triggers to make removal from these lists on
488	// connection close less horrendously expensive.
489	//
490	// Where possible, the quit channel is used as the unique id for a client
491	// since it is quite a bit more efficient than using the entire struct.
492	blockNotifications := make(map[chan struct{}]*wsClient)
493	txNotifications := make(map[chan struct{}]*wsClient)
494	watchedOutPoints := make(map[wire.OutPoint]map[chan struct{}]*wsClient)
495	watchedAddrs := make(map[string]map[chan struct{}]*wsClient)
496
497out:
498	for {
499		select {
500		case n, ok := <-m.notificationMsgs:
501			if !ok {
502				// queueHandler quit.
503				break out
504			}
505			switch n := n.(type) {
506			case *notificationBlockConnected:
507				block := (*btcutil.Block)(n)
508
509				// Skip iterating through all txs if no
510				// tx notification requests exist.
511				if len(watchedOutPoints) != 0 || len(watchedAddrs) != 0 {
512					for _, tx := range block.Transactions() {
513						m.notifyForTx(watchedOutPoints,
514							watchedAddrs, tx, block)
515					}
516				}
517
518				if len(blockNotifications) != 0 {
519					m.notifyBlockConnected(blockNotifications,
520						block)
521					m.notifyFilteredBlockConnected(blockNotifications,
522						block)
523				}
524
525			case *notificationBlockDisconnected:
526				block := (*btcutil.Block)(n)
527
528				if len(blockNotifications) != 0 {
529					m.notifyBlockDisconnected(blockNotifications,
530						block)
531					m.notifyFilteredBlockDisconnected(blockNotifications,
532						block)
533				}
534
535			case *notificationTxAcceptedByMempool:
536				if n.isNew && len(txNotifications) != 0 {
537					m.notifyForNewTx(txNotifications, n.tx)
538				}
539				m.notifyForTx(watchedOutPoints, watchedAddrs, n.tx, nil)
540				m.notifyRelevantTxAccepted(n.tx, clients)
541
542			case *notificationRegisterBlocks:
543				wsc := (*wsClient)(n)
544				blockNotifications[wsc.quit] = wsc
545
546			case *notificationUnregisterBlocks:
547				wsc := (*wsClient)(n)
548				delete(blockNotifications, wsc.quit)
549
550			case *notificationRegisterClient:
551				wsc := (*wsClient)(n)
552				clients[wsc.quit] = wsc
553
554			case *notificationUnregisterClient:
555				wsc := (*wsClient)(n)
556				// Remove any requests made by the client as well as
557				// the client itself.
558				delete(blockNotifications, wsc.quit)
559				delete(txNotifications, wsc.quit)
560				for k := range wsc.spentRequests {
561					op := k
562					m.removeSpentRequest(watchedOutPoints, wsc, &op)
563				}
564				for addr := range wsc.addrRequests {
565					m.removeAddrRequest(watchedAddrs, wsc, addr)
566				}
567				delete(clients, wsc.quit)
568
569			case *notificationRegisterSpent:
570				m.addSpentRequests(watchedOutPoints, n.wsc, n.ops)
571
572			case *notificationUnregisterSpent:
573				m.removeSpentRequest(watchedOutPoints, n.wsc, n.op)
574
575			case *notificationRegisterAddr:
576				m.addAddrRequests(watchedAddrs, n.wsc, n.addrs)
577
578			case *notificationUnregisterAddr:
579				m.removeAddrRequest(watchedAddrs, n.wsc, n.addr)
580
581			case *notificationRegisterNewMempoolTxs:
582				wsc := (*wsClient)(n)
583				txNotifications[wsc.quit] = wsc
584
585			case *notificationUnregisterNewMempoolTxs:
586				wsc := (*wsClient)(n)
587				delete(txNotifications, wsc.quit)
588
589			default:
590				rpcsLog.Warn("Unhandled notification type")
591			}
592
593		case m.numClients <- len(clients):
594
595		case <-m.quit:
596			// RPC server shutting down.
597			break out
598		}
599	}
600
601	for _, c := range clients {
602		c.Disconnect()
603	}
604	m.wg.Done()
605}
606
607// NumClients returns the number of clients actively being served.
608func (m *wsNotificationManager) NumClients() (n int) {
609	select {
610	case n = <-m.numClients:
611	case <-m.quit: // Use default n (0) if server has shut down.
612	}
613	return
614}
615
616// RegisterBlockUpdates requests block update notifications to the passed
617// websocket client.
618func (m *wsNotificationManager) RegisterBlockUpdates(wsc *wsClient) {
619	m.queueNotification <- (*notificationRegisterBlocks)(wsc)
620}
621
622// UnregisterBlockUpdates removes block update notifications for the passed
623// websocket client.
624func (m *wsNotificationManager) UnregisterBlockUpdates(wsc *wsClient) {
625	m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
626}
627
628// subscribedClients returns the set of all websocket client quit channels that
629// are registered to receive notifications regarding tx, either due to tx
630// spending a watched output or outputting to a watched address.  Matching
631// client's filters are updated based on this transaction's outputs and output
632// addresses that may be relevant for a client.
633func (m *wsNotificationManager) subscribedClients(tx *btcutil.Tx,
634	clients map[chan struct{}]*wsClient) map[chan struct{}]struct{} {
635
636	// Use a map of client quit channels as keys to prevent duplicates when
637	// multiple inputs and/or outputs are relevant to the client.
638	subscribed := make(map[chan struct{}]struct{})
639
640	msgTx := tx.MsgTx()
641	for _, input := range msgTx.TxIn {
642		for quitChan, wsc := range clients {
643			wsc.Lock()
644			filter := wsc.filterData
645			wsc.Unlock()
646			if filter == nil {
647				continue
648			}
649			filter.mu.Lock()
650			if filter.existsUnspentOutPoint(&input.PreviousOutPoint) {
651				subscribed[quitChan] = struct{}{}
652			}
653			filter.mu.Unlock()
654		}
655	}
656
657	for i, output := range msgTx.TxOut {
658		_, addrs, _, err := txscript.ExtractPkScriptAddrs(
659			output.PkScript, m.server.cfg.ChainParams)
660		if err != nil {
661			// Clients are not able to subscribe to
662			// nonstandard or non-address outputs.
663			continue
664		}
665		for quitChan, wsc := range clients {
666			wsc.Lock()
667			filter := wsc.filterData
668			wsc.Unlock()
669			if filter == nil {
670				continue
671			}
672			filter.mu.Lock()
673			for _, a := range addrs {
674				if filter.existsAddress(a) {
675					subscribed[quitChan] = struct{}{}
676					op := wire.OutPoint{
677						Hash:  *tx.Hash(),
678						Index: uint32(i),
679					}
680					filter.addUnspentOutPoint(&op)
681				}
682			}
683			filter.mu.Unlock()
684		}
685	}
686
687	return subscribed
688}
689
690// notifyBlockConnected notifies websocket clients that have registered for
691// block updates when a block is connected to the main chain.
692func (*wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*wsClient,
693	block *btcutil.Block) {
694
695	// Notify interested websocket clients about the connected block.
696	ntfn := btcjson.NewBlockConnectedNtfn(block.Hash().String(), block.Height(),
697		block.MsgBlock().Header.Timestamp.Unix())
698	marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
699	if err != nil {
700		rpcsLog.Errorf("Failed to marshal block connected notification: "+
701			"%v", err)
702		return
703	}
704	for _, wsc := range clients {
705		wsc.QueueNotification(marshalledJSON)
706	}
707}
708
709// notifyBlockDisconnected notifies websocket clients that have registered for
710// block updates when a block is disconnected from the main chain (due to a
711// reorganize).
712func (*wsNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*wsClient, block *btcutil.Block) {
713	// Skip notification creation if no clients have requested block
714	// connected/disconnected notifications.
715	if len(clients) == 0 {
716		return
717	}
718
719	// Notify interested websocket clients about the disconnected block.
720	ntfn := btcjson.NewBlockDisconnectedNtfn(block.Hash().String(),
721		block.Height(), block.MsgBlock().Header.Timestamp.Unix())
722	marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
723	if err != nil {
724		rpcsLog.Errorf("Failed to marshal block disconnected "+
725			"notification: %v", err)
726		return
727	}
728	for _, wsc := range clients {
729		wsc.QueueNotification(marshalledJSON)
730	}
731}
732
733// notifyFilteredBlockConnected notifies websocket clients that have registered for
734// block updates when a block is connected to the main chain.
735func (m *wsNotificationManager) notifyFilteredBlockConnected(clients map[chan struct{}]*wsClient,
736	block *btcutil.Block) {
737
738	// Create the common portion of the notification that is the same for
739	// every client.
740	var w bytes.Buffer
741	err := block.MsgBlock().Header.Serialize(&w)
742	if err != nil {
743		rpcsLog.Errorf("Failed to serialize header for filtered block "+
744			"connected notification: %v", err)
745		return
746	}
747	ntfn := btcjson.NewFilteredBlockConnectedNtfn(block.Height(),
748		hex.EncodeToString(w.Bytes()), nil)
749
750	// Search for relevant transactions for each client and save them
751	// serialized in hex encoding for the notification.
752	subscribedTxs := make(map[chan struct{}][]string)
753	for _, tx := range block.Transactions() {
754		var txHex string
755		for quitChan := range m.subscribedClients(tx, clients) {
756			if txHex == "" {
757				txHex = txHexString(tx.MsgTx())
758			}
759			subscribedTxs[quitChan] = append(subscribedTxs[quitChan], txHex)
760		}
761	}
762	for quitChan, wsc := range clients {
763		// Add all discovered transactions for this client. For clients
764		// that have no new-style filter, add the empty string slice.
765		ntfn.SubscribedTxs = subscribedTxs[quitChan]
766
767		// Marshal and queue notification.
768		marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
769		if err != nil {
770			rpcsLog.Errorf("Failed to marshal filtered block "+
771				"connected notification: %v", err)
772			return
773		}
774		wsc.QueueNotification(marshalledJSON)
775	}
776}
777
778// notifyFilteredBlockDisconnected notifies websocket clients that have registered for
779// block updates when a block is disconnected from the main chain (due to a
780// reorganize).
781func (*wsNotificationManager) notifyFilteredBlockDisconnected(clients map[chan struct{}]*wsClient,
782	block *btcutil.Block) {
783	// Skip notification creation if no clients have requested block
784	// connected/disconnected notifications.
785	if len(clients) == 0 {
786		return
787	}
788
789	// Notify interested websocket clients about the disconnected block.
790	var w bytes.Buffer
791	err := block.MsgBlock().Header.Serialize(&w)
792	if err != nil {
793		rpcsLog.Errorf("Failed to serialize header for filtered block "+
794			"disconnected notification: %v", err)
795		return
796	}
797	ntfn := btcjson.NewFilteredBlockDisconnectedNtfn(block.Height(),
798		hex.EncodeToString(w.Bytes()))
799	marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
800	if err != nil {
801		rpcsLog.Errorf("Failed to marshal filtered block disconnected "+
802			"notification: %v", err)
803		return
804	}
805	for _, wsc := range clients {
806		wsc.QueueNotification(marshalledJSON)
807	}
808}
809
810// RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
811// client when new transactions are added to the memory pool.
812func (m *wsNotificationManager) RegisterNewMempoolTxsUpdates(wsc *wsClient) {
813	m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc)
814}
815
816// UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket
817// client when new transaction are added to the memory pool.
818func (m *wsNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *wsClient) {
819	m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc)
820}
821
822// notifyForNewTx notifies websocket clients that have registered for updates
823// when a new transaction is added to the memory pool.
824func (m *wsNotificationManager) notifyForNewTx(clients map[chan struct{}]*wsClient, tx *btcutil.Tx) {
825	txHashStr := tx.Hash().String()
826	mtx := tx.MsgTx()
827
828	var amount int64
829	for _, txOut := range mtx.TxOut {
830		amount += txOut.Value
831	}
832
833	ntfn := btcjson.NewTxAcceptedNtfn(txHashStr, btcutil.Amount(amount).ToBTC())
834	marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
835	if err != nil {
836		rpcsLog.Errorf("Failed to marshal tx notification: %s", err.Error())
837		return
838	}
839
840	var verboseNtfn *btcjson.TxAcceptedVerboseNtfn
841	var marshalledJSONVerbose []byte
842	for _, wsc := range clients {
843		if wsc.verboseTxUpdates {
844			if marshalledJSONVerbose != nil {
845				wsc.QueueNotification(marshalledJSONVerbose)
846				continue
847			}
848
849			net := m.server.cfg.ChainParams
850			rawTx, err := createTxRawResult(net, mtx, txHashStr, nil,
851				"", 0, 0)
852			if err != nil {
853				return
854			}
855
856			verboseNtfn = btcjson.NewTxAcceptedVerboseNtfn(*rawTx)
857			marshalledJSONVerbose, err = btcjson.MarshalCmd(nil,
858				verboseNtfn)
859			if err != nil {
860				rpcsLog.Errorf("Failed to marshal verbose tx "+
861					"notification: %s", err.Error())
862				return
863			}
864			wsc.QueueNotification(marshalledJSONVerbose)
865		} else {
866			wsc.QueueNotification(marshalledJSON)
867		}
868	}
869}
870
871// RegisterSpentRequests requests a notification when each of the passed
872// outpoints is confirmed spent (contained in a block connected to the main
873// chain) for the passed websocket client.  The request is automatically
874// removed once the notification has been sent.
875func (m *wsNotificationManager) RegisterSpentRequests(wsc *wsClient, ops []*wire.OutPoint) {
876	m.queueNotification <- &notificationRegisterSpent{
877		wsc: wsc,
878		ops: ops,
879	}
880}
881
882// addSpentRequests modifies a map of watched outpoints to sets of websocket
883// clients to add a new request watch all of the outpoints in ops and create
884// and send a notification when spent to the websocket client wsc.
885func (m *wsNotificationManager) addSpentRequests(opMap map[wire.OutPoint]map[chan struct{}]*wsClient,
886	wsc *wsClient, ops []*wire.OutPoint) {
887
888	for _, op := range ops {
889		// Track the request in the client as well so it can be quickly
890		// be removed on disconnect.
891		wsc.spentRequests[*op] = struct{}{}
892
893		// Add the client to the list to notify when the outpoint is seen.
894		// Create the list as needed.
895		cmap, ok := opMap[*op]
896		if !ok {
897			cmap = make(map[chan struct{}]*wsClient)
898			opMap[*op] = cmap
899		}
900		cmap[wsc.quit] = wsc
901	}
902
903	// Check if any transactions spending these outputs already exists in
904	// the mempool, if so send the notification immediately.
905	spends := make(map[chainhash.Hash]*btcutil.Tx)
906	for _, op := range ops {
907		spend := m.server.cfg.TxMemPool.CheckSpend(*op)
908		if spend != nil {
909			rpcsLog.Debugf("Found existing mempool spend for "+
910				"outpoint<%v>: %v", op, spend.Hash())
911			spends[*spend.Hash()] = spend
912		}
913	}
914
915	for _, spend := range spends {
916		m.notifyForTx(opMap, nil, spend, nil)
917	}
918}
919
920// UnregisterSpentRequest removes a request from the passed websocket client
921// to be notified when the passed outpoint is confirmed spent (contained in a
922// block connected to the main chain).
923func (m *wsNotificationManager) UnregisterSpentRequest(wsc *wsClient, op *wire.OutPoint) {
924	m.queueNotification <- &notificationUnregisterSpent{
925		wsc: wsc,
926		op:  op,
927	}
928}
929
930// removeSpentRequest modifies a map of watched outpoints to remove the
931// websocket client wsc from the set of clients to be notified when a
932// watched outpoint is spent.  If wsc is the last client, the outpoint
933// key is removed from the map.
934func (*wsNotificationManager) removeSpentRequest(ops map[wire.OutPoint]map[chan struct{}]*wsClient,
935	wsc *wsClient, op *wire.OutPoint) {
936
937	// Remove the request tracking from the client.
938	delete(wsc.spentRequests, *op)
939
940	// Remove the client from the list to notify.
941	notifyMap, ok := ops[*op]
942	if !ok {
943		rpcsLog.Warnf("Attempt to remove nonexistent spent request "+
944			"for websocket client %s", wsc.addr)
945		return
946	}
947	delete(notifyMap, wsc.quit)
948
949	// Remove the map entry altogether if there are
950	// no more clients interested in it.
951	if len(notifyMap) == 0 {
952		delete(ops, *op)
953	}
954}
955
956// txHexString returns the serialized transaction encoded in hexadecimal.
957func txHexString(tx *wire.MsgTx) string {
958	buf := bytes.NewBuffer(make([]byte, 0, tx.SerializeSize()))
959	// Ignore Serialize's error, as writing to a bytes.buffer cannot fail.
960	tx.Serialize(buf)
961	return hex.EncodeToString(buf.Bytes())
962}
963
964// blockDetails creates a BlockDetails struct to include in btcws notifications
965// from a block and a transaction's block index.
966func blockDetails(block *btcutil.Block, txIndex int) *btcjson.BlockDetails {
967	if block == nil {
968		return nil
969	}
970	return &btcjson.BlockDetails{
971		Height: block.Height(),
972		Hash:   block.Hash().String(),
973		Index:  txIndex,
974		Time:   block.MsgBlock().Header.Timestamp.Unix(),
975	}
976}
977
978// newRedeemingTxNotification returns a new marshalled redeemingtx notification
979// with the passed parameters.
980func newRedeemingTxNotification(txHex string, index int, block *btcutil.Block) ([]byte, error) {
981	// Create and marshal the notification.
982	ntfn := btcjson.NewRedeemingTxNtfn(txHex, blockDetails(block, index))
983	return btcjson.MarshalCmd(nil, ntfn)
984}
985
986// notifyForTxOuts examines each transaction output, notifying interested
987// websocket clients of the transaction if an output spends to a watched
988// address.  A spent notification request is automatically registered for
989// the client for each matching output.
990func (m *wsNotificationManager) notifyForTxOuts(ops map[wire.OutPoint]map[chan struct{}]*wsClient,
991	addrs map[string]map[chan struct{}]*wsClient, tx *btcutil.Tx, block *btcutil.Block) {
992
993	// Nothing to do if nobody is listening for address notifications.
994	if len(addrs) == 0 {
995		return
996	}
997
998	txHex := ""
999	wscNotified := make(map[chan struct{}]struct{})
1000	for i, txOut := range tx.MsgTx().TxOut {
1001		_, txAddrs, _, err := txscript.ExtractPkScriptAddrs(
1002			txOut.PkScript, m.server.cfg.ChainParams)
1003		if err != nil {
1004			continue
1005		}
1006
1007		for _, txAddr := range txAddrs {
1008			cmap, ok := addrs[txAddr.EncodeAddress()]
1009			if !ok {
1010				continue
1011			}
1012
1013			if txHex == "" {
1014				txHex = txHexString(tx.MsgTx())
1015			}
1016			ntfn := btcjson.NewRecvTxNtfn(txHex, blockDetails(block,
1017				tx.Index()))
1018
1019			marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
1020			if err != nil {
1021				rpcsLog.Errorf("Failed to marshal processedtx notification: %v", err)
1022				continue
1023			}
1024
1025			op := []*wire.OutPoint{wire.NewOutPoint(tx.Hash(), uint32(i))}
1026			for wscQuit, wsc := range cmap {
1027				m.addSpentRequests(ops, wsc, op)
1028
1029				if _, ok := wscNotified[wscQuit]; !ok {
1030					wscNotified[wscQuit] = struct{}{}
1031					wsc.QueueNotification(marshalledJSON)
1032				}
1033			}
1034		}
1035	}
1036}
1037
1038// notifyRelevantTxAccepted examines the inputs and outputs of the passed
1039// transaction, notifying websocket clients of outputs spending to a watched
1040// address and inputs spending a watched outpoint.  Any outputs paying to a
1041// watched address result in the output being watched as well for future
1042// notifications.
1043func (m *wsNotificationManager) notifyRelevantTxAccepted(tx *btcutil.Tx,
1044	clients map[chan struct{}]*wsClient) {
1045
1046	clientsToNotify := m.subscribedClients(tx, clients)
1047
1048	if len(clientsToNotify) != 0 {
1049		n := btcjson.NewRelevantTxAcceptedNtfn(txHexString(tx.MsgTx()))
1050		marshalled, err := btcjson.MarshalCmd(nil, n)
1051		if err != nil {
1052			rpcsLog.Errorf("Failed to marshal notification: %v", err)
1053			return
1054		}
1055		for quitChan := range clientsToNotify {
1056			clients[quitChan].QueueNotification(marshalled)
1057		}
1058	}
1059}
1060
1061// notifyForTx examines the inputs and outputs of the passed transaction,
1062// notifying websocket clients of outputs spending to a watched address
1063// and inputs spending a watched outpoint.
1064func (m *wsNotificationManager) notifyForTx(ops map[wire.OutPoint]map[chan struct{}]*wsClient,
1065	addrs map[string]map[chan struct{}]*wsClient, tx *btcutil.Tx, block *btcutil.Block) {
1066
1067	if len(ops) != 0 {
1068		m.notifyForTxIns(ops, tx, block)
1069	}
1070	if len(addrs) != 0 {
1071		m.notifyForTxOuts(ops, addrs, tx, block)
1072	}
1073}
1074
1075// notifyForTxIns examines the inputs of the passed transaction and sends
1076// interested websocket clients a redeemingtx notification if any inputs
1077// spend a watched output.  If block is non-nil, any matching spent
1078// requests are removed.
1079func (m *wsNotificationManager) notifyForTxIns(ops map[wire.OutPoint]map[chan struct{}]*wsClient,
1080	tx *btcutil.Tx, block *btcutil.Block) {
1081
1082	// Nothing to do if nobody is watching outpoints.
1083	if len(ops) == 0 {
1084		return
1085	}
1086
1087	txHex := ""
1088	wscNotified := make(map[chan struct{}]struct{})
1089	for _, txIn := range tx.MsgTx().TxIn {
1090		prevOut := &txIn.PreviousOutPoint
1091		if cmap, ok := ops[*prevOut]; ok {
1092			if txHex == "" {
1093				txHex = txHexString(tx.MsgTx())
1094			}
1095			marshalledJSON, err := newRedeemingTxNotification(txHex, tx.Index(), block)
1096			if err != nil {
1097				rpcsLog.Warnf("Failed to marshal redeemingtx notification: %v", err)
1098				continue
1099			}
1100			for wscQuit, wsc := range cmap {
1101				if block != nil {
1102					m.removeSpentRequest(ops, wsc, prevOut)
1103				}
1104
1105				if _, ok := wscNotified[wscQuit]; !ok {
1106					wscNotified[wscQuit] = struct{}{}
1107					wsc.QueueNotification(marshalledJSON)
1108				}
1109			}
1110		}
1111	}
1112}
1113
1114// RegisterTxOutAddressRequests requests notifications to the passed websocket
1115// client when a transaction output spends to the passed address.
1116func (m *wsNotificationManager) RegisterTxOutAddressRequests(wsc *wsClient, addrs []string) {
1117	m.queueNotification <- &notificationRegisterAddr{
1118		wsc:   wsc,
1119		addrs: addrs,
1120	}
1121}
1122
1123// addAddrRequests adds the websocket client wsc to the address to client set
1124// addrMap so wsc will be notified for any mempool or block transaction outputs
1125// spending to any of the addresses in addrs.
1126func (*wsNotificationManager) addAddrRequests(addrMap map[string]map[chan struct{}]*wsClient,
1127	wsc *wsClient, addrs []string) {
1128
1129	for _, addr := range addrs {
1130		// Track the request in the client as well so it can be quickly be
1131		// removed on disconnect.
1132		wsc.addrRequests[addr] = struct{}{}
1133
1134		// Add the client to the set of clients to notify when the
1135		// outpoint is seen.  Create map as needed.
1136		cmap, ok := addrMap[addr]
1137		if !ok {
1138			cmap = make(map[chan struct{}]*wsClient)
1139			addrMap[addr] = cmap
1140		}
1141		cmap[wsc.quit] = wsc
1142	}
1143}
1144
1145// UnregisterTxOutAddressRequest removes a request from the passed websocket
1146// client to be notified when a transaction spends to the passed address.
1147func (m *wsNotificationManager) UnregisterTxOutAddressRequest(wsc *wsClient, addr string) {
1148	m.queueNotification <- &notificationUnregisterAddr{
1149		wsc:  wsc,
1150		addr: addr,
1151	}
1152}
1153
1154// removeAddrRequest removes the websocket client wsc from the address to
1155// client set addrs so it will no longer receive notification updates for
1156// any transaction outputs send to addr.
1157func (*wsNotificationManager) removeAddrRequest(addrs map[string]map[chan struct{}]*wsClient,
1158	wsc *wsClient, addr string) {
1159
1160	// Remove the request tracking from the client.
1161	delete(wsc.addrRequests, addr)
1162
1163	// Remove the client from the list to notify.
1164	cmap, ok := addrs[addr]
1165	if !ok {
1166		rpcsLog.Warnf("Attempt to remove nonexistent addr request "+
1167			"<%s> for websocket client %s", addr, wsc.addr)
1168		return
1169	}
1170	delete(cmap, wsc.quit)
1171
1172	// Remove the map entry altogether if there are no more clients
1173	// interested in it.
1174	if len(cmap) == 0 {
1175		delete(addrs, addr)
1176	}
1177}
1178
1179// AddClient adds the passed websocket client to the notification manager.
1180func (m *wsNotificationManager) AddClient(wsc *wsClient) {
1181	m.queueNotification <- (*notificationRegisterClient)(wsc)
1182}
1183
1184// RemoveClient removes the passed websocket client and all notifications
1185// registered for it.
1186func (m *wsNotificationManager) RemoveClient(wsc *wsClient) {
1187	select {
1188	case m.queueNotification <- (*notificationUnregisterClient)(wsc):
1189	case <-m.quit:
1190	}
1191}
1192
1193// Start starts the goroutines required for the manager to queue and process
1194// websocket client notifications.
1195func (m *wsNotificationManager) Start() {
1196	m.wg.Add(2)
1197	go m.queueHandler()
1198	go m.notificationHandler()
1199}
1200
1201// WaitForShutdown blocks until all notification manager goroutines have
1202// finished.
1203func (m *wsNotificationManager) WaitForShutdown() {
1204	m.wg.Wait()
1205}
1206
1207// Shutdown shuts down the manager, stopping the notification queue and
1208// notification handler goroutines.
1209func (m *wsNotificationManager) Shutdown() {
1210	close(m.quit)
1211}
1212
1213// newWsNotificationManager returns a new notification manager ready for use.
1214// See wsNotificationManager for more details.
1215func newWsNotificationManager(server *rpcServer) *wsNotificationManager {
1216	return &wsNotificationManager{
1217		server:            server,
1218		queueNotification: make(chan interface{}),
1219		notificationMsgs:  make(chan interface{}),
1220		numClients:        make(chan int),
1221		quit:              make(chan struct{}),
1222	}
1223}
1224
1225// wsResponse houses a message to send to a connected websocket client as
1226// well as a channel to reply on when the message is sent.
1227type wsResponse struct {
1228	msg      []byte
1229	doneChan chan bool
1230}
1231
1232// wsClient provides an abstraction for handling a websocket client.  The
1233// overall data flow is split into 3 main goroutines, a possible 4th goroutine
1234// for long-running operations (only started if request is made), and a
1235// websocket manager which is used to allow things such as broadcasting
1236// requested notifications to all connected websocket clients.   Inbound
1237// messages are read via the inHandler goroutine and generally dispatched to
1238// their own handler.  However, certain potentially long-running operations such
1239// as rescans, are sent to the asyncHander goroutine and are limited to one at a
1240// time.  There are two outbound message types - one for responding to client
1241// requests and another for async notifications.  Responses to client requests
1242// use SendMessage which employs a buffered channel thereby limiting the number
1243// of outstanding requests that can be made.  Notifications are sent via
1244// QueueNotification which implements a queue via notificationQueueHandler to
1245// ensure sending notifications from other subsystems can't block.  Ultimately,
1246// all messages are sent via the outHandler.
1247type wsClient struct {
1248	sync.Mutex
1249
1250	// server is the RPC server that is servicing the client.
1251	server *rpcServer
1252
1253	// conn is the underlying websocket connection.
1254	conn *websocket.Conn
1255
1256	// disconnected indicated whether or not the websocket client is
1257	// disconnected.
1258	disconnected bool
1259
1260	// addr is the remote address of the client.
1261	addr string
1262
1263	// authenticated specifies whether a client has been authenticated
1264	// and therefore is allowed to communicated over the websocket.
1265	authenticated bool
1266
1267	// isAdmin specifies whether a client may change the state of the server;
1268	// false means its access is only to the limited set of RPC calls.
1269	isAdmin bool
1270
1271	// sessionID is a random ID generated for each client when connected.
1272	// These IDs may be queried by a client using the session RPC.  A change
1273	// to the session ID indicates that the client reconnected.
1274	sessionID uint64
1275
1276	// verboseTxUpdates specifies whether a client has requested verbose
1277	// information about all new transactions.
1278	verboseTxUpdates bool
1279
1280	// addrRequests is a set of addresses the caller has requested to be
1281	// notified about.  It is maintained here so all requests can be removed
1282	// when a wallet disconnects.  Owned by the notification manager.
1283	addrRequests map[string]struct{}
1284
1285	// spentRequests is a set of unspent Outpoints a wallet has requested
1286	// notifications for when they are spent by a processed transaction.
1287	// Owned by the notification manager.
1288	spentRequests map[wire.OutPoint]struct{}
1289
1290	// filterData is the new generation transaction filter backported from
1291	// github.com/decred/dcrd for the new backported `loadtxfilter` and
1292	// `rescanblocks` methods.
1293	filterData *wsClientFilter
1294
1295	// Networking infrastructure.
1296	serviceRequestSem semaphore
1297	ntfnChan          chan []byte
1298	sendChan          chan wsResponse
1299	quit              chan struct{}
1300	wg                sync.WaitGroup
1301}
1302
1303// inHandler handles all incoming messages for the websocket connection.  It
1304// must be run as a goroutine.
1305func (c *wsClient) inHandler() {
1306out:
1307	for {
1308		// Break out of the loop once the quit channel has been closed.
1309		// Use a non-blocking select here so we fall through otherwise.
1310		select {
1311		case <-c.quit:
1312			break out
1313		default:
1314		}
1315
1316		_, msg, err := c.conn.ReadMessage()
1317		if err != nil {
1318			// Log the error if it's not due to disconnecting.
1319			if err != io.EOF {
1320				rpcsLog.Errorf("Websocket receive error from "+
1321					"%s: %v", c.addr, err)
1322			}
1323			break out
1324		}
1325
1326		var request btcjson.Request
1327		err = json.Unmarshal(msg, &request)
1328		if err != nil {
1329			if !c.authenticated {
1330				break out
1331			}
1332
1333			jsonErr := &btcjson.RPCError{
1334				Code:    btcjson.ErrRPCParse.Code,
1335				Message: "Failed to parse request: " + err.Error(),
1336			}
1337			reply, err := createMarshalledReply(nil, nil, jsonErr)
1338			if err != nil {
1339				rpcsLog.Errorf("Failed to marshal parse failure "+
1340					"reply: %v", err)
1341				continue
1342			}
1343			c.SendMessage(reply, nil)
1344			continue
1345		}
1346
1347		// The JSON-RPC 1.0 spec defines that notifications must have their "id"
1348		// set to null and states that notifications do not have a response.
1349		//
1350		// A JSON-RPC 2.0 notification is a request with "json-rpc":"2.0", and
1351		// without an "id" member. The specification states that notifications
1352		// must not be responded to. JSON-RPC 2.0 permits the null value as a
1353		// valid request id, therefore such requests are not notifications.
1354		//
1355		// Bitcoin Core serves requests with "id":null or even an absent "id",
1356		// and responds to such requests with "id":null in the response.
1357		//
1358		// Btcd does not respond to any request without and "id" or "id":null,
1359		// regardless the indicated JSON-RPC protocol version unless RPC quirks
1360		// are enabled. With RPC quirks enabled, such requests will be responded
1361		// to if the reqeust does not indicate JSON-RPC version.
1362		//
1363		// RPC quirks can be enabled by the user to avoid compatibility issues
1364		// with software relying on Core's behavior.
1365		if request.ID == nil && !(cfg.RPCQuirks && request.Jsonrpc == "") {
1366			if !c.authenticated {
1367				break out
1368			}
1369			continue
1370		}
1371
1372		cmd := parseCmd(&request)
1373		if cmd.err != nil {
1374			if !c.authenticated {
1375				break out
1376			}
1377
1378			reply, err := createMarshalledReply(cmd.id, nil, cmd.err)
1379			if err != nil {
1380				rpcsLog.Errorf("Failed to marshal parse failure "+
1381					"reply: %v", err)
1382				continue
1383			}
1384			c.SendMessage(reply, nil)
1385			continue
1386		}
1387		rpcsLog.Debugf("Received command <%s> from %s", cmd.method, c.addr)
1388
1389		// Check auth.  The client is immediately disconnected if the
1390		// first request of an unauthentiated websocket client is not
1391		// the authenticate request, an authenticate request is received
1392		// when the client is already authenticated, or incorrect
1393		// authentication credentials are provided in the request.
1394		switch authCmd, ok := cmd.cmd.(*btcjson.AuthenticateCmd); {
1395		case c.authenticated && ok:
1396			rpcsLog.Warnf("Websocket client %s is already authenticated",
1397				c.addr)
1398			break out
1399		case !c.authenticated && !ok:
1400			rpcsLog.Warnf("Unauthenticated websocket message " +
1401				"received")
1402			break out
1403		case !c.authenticated:
1404			// Check credentials.
1405			login := authCmd.Username + ":" + authCmd.Passphrase
1406			auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
1407			authSha := sha256.Sum256([]byte(auth))
1408			cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:])
1409			limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:])
1410			if cmp != 1 && limitcmp != 1 {
1411				rpcsLog.Warnf("Auth failure.")
1412				break out
1413			}
1414			c.authenticated = true
1415			c.isAdmin = cmp == 1
1416
1417			// Marshal and send response.
1418			reply, err := createMarshalledReply(cmd.id, nil, nil)
1419			if err != nil {
1420				rpcsLog.Errorf("Failed to marshal authenticate reply: "+
1421					"%v", err.Error())
1422				continue
1423			}
1424			c.SendMessage(reply, nil)
1425			continue
1426		}
1427
1428		// Check if the client is using limited RPC credentials and
1429		// error when not authorized to call this RPC.
1430		if !c.isAdmin {
1431			if _, ok := rpcLimited[request.Method]; !ok {
1432				jsonErr := &btcjson.RPCError{
1433					Code:    btcjson.ErrRPCInvalidParams.Code,
1434					Message: "limited user not authorized for this method",
1435				}
1436				// Marshal and send response.
1437				reply, err := createMarshalledReply(request.ID, nil, jsonErr)
1438				if err != nil {
1439					rpcsLog.Errorf("Failed to marshal parse failure "+
1440						"reply: %v", err)
1441					continue
1442				}
1443				c.SendMessage(reply, nil)
1444				continue
1445			}
1446		}
1447
1448		// Asynchronously handle the request.  A semaphore is used to
1449		// limit the number of concurrent requests currently being
1450		// serviced.  If the semaphore can not be acquired, simply wait
1451		// until a request finished before reading the next RPC request
1452		// from the websocket client.
1453		//
1454		// This could be a little fancier by timing out and erroring
1455		// when it takes too long to service the request, but if that is
1456		// done, the read of the next request should not be blocked by
1457		// this semaphore, otherwise the next request will be read and
1458		// will probably sit here for another few seconds before timing
1459		// out as well.  This will cause the total timeout duration for
1460		// later requests to be much longer than the check here would
1461		// imply.
1462		//
1463		// If a timeout is added, the semaphore acquiring should be
1464		// moved inside of the new goroutine with a select statement
1465		// that also reads a time.After channel.  This will unblock the
1466		// read of the next request from the websocket client and allow
1467		// many requests to be waited on concurrently.
1468		c.serviceRequestSem.acquire()
1469		go func() {
1470			c.serviceRequest(cmd)
1471			c.serviceRequestSem.release()
1472		}()
1473	}
1474
1475	// Ensure the connection is closed.
1476	c.Disconnect()
1477	c.wg.Done()
1478	rpcsLog.Tracef("Websocket client input handler done for %s", c.addr)
1479}
1480
1481// serviceRequest services a parsed RPC request by looking up and executing the
1482// appropriate RPC handler.  The response is marshalled and sent to the
1483// websocket client.
1484func (c *wsClient) serviceRequest(r *parsedRPCCmd) {
1485	var (
1486		result interface{}
1487		err    error
1488	)
1489
1490	// Lookup the websocket extension for the command and if it doesn't
1491	// exist fallback to handling the command as a standard command.
1492	wsHandler, ok := wsHandlers[r.method]
1493	if ok {
1494		result, err = wsHandler(c, r.cmd)
1495	} else {
1496		result, err = c.server.standardCmdResult(r, nil)
1497	}
1498	reply, err := createMarshalledReply(r.id, result, err)
1499	if err != nil {
1500		rpcsLog.Errorf("Failed to marshal reply for <%s> "+
1501			"command: %v", r.method, err)
1502		return
1503	}
1504	c.SendMessage(reply, nil)
1505}
1506
1507// notificationQueueHandler handles the queuing of outgoing notifications for
1508// the websocket client.  This runs as a muxer for various sources of input to
1509// ensure that queuing up notifications to be sent will not block.  Otherwise,
1510// slow clients could bog down the other systems (such as the mempool or block
1511// manager) which are queuing the data.  The data is passed on to outHandler to
1512// actually be written.  It must be run as a goroutine.
1513func (c *wsClient) notificationQueueHandler() {
1514	ntfnSentChan := make(chan bool, 1) // nonblocking sync
1515
1516	// pendingNtfns is used as a queue for notifications that are ready to
1517	// be sent once there are no outstanding notifications currently being
1518	// sent.  The waiting flag is used over simply checking for items in the
1519	// pending list to ensure cleanup knows what has and hasn't been sent
1520	// to the outHandler.  Currently no special cleanup is needed, however
1521	// if something like a done channel is added to notifications in the
1522	// future, not knowing what has and hasn't been sent to the outHandler
1523	// (and thus who should respond to the done channel) would be
1524	// problematic without using this approach.
1525	pendingNtfns := list.New()
1526	waiting := false
1527out:
1528	for {
1529		select {
1530		// This channel is notified when a message is being queued to
1531		// be sent across the network socket.  It will either send the
1532		// message immediately if a send is not already in progress, or
1533		// queue the message to be sent once the other pending messages
1534		// are sent.
1535		case msg := <-c.ntfnChan:
1536			if !waiting {
1537				c.SendMessage(msg, ntfnSentChan)
1538			} else {
1539				pendingNtfns.PushBack(msg)
1540			}
1541			waiting = true
1542
1543		// This channel is notified when a notification has been sent
1544		// across the network socket.
1545		case <-ntfnSentChan:
1546			// No longer waiting if there are no more messages in
1547			// the pending messages queue.
1548			next := pendingNtfns.Front()
1549			if next == nil {
1550				waiting = false
1551				continue
1552			}
1553
1554			// Notify the outHandler about the next item to
1555			// asynchronously send.
1556			msg := pendingNtfns.Remove(next).([]byte)
1557			c.SendMessage(msg, ntfnSentChan)
1558
1559		case <-c.quit:
1560			break out
1561		}
1562	}
1563
1564	// Drain any wait channels before exiting so nothing is left waiting
1565	// around to send.
1566cleanup:
1567	for {
1568		select {
1569		case <-c.ntfnChan:
1570		case <-ntfnSentChan:
1571		default:
1572			break cleanup
1573		}
1574	}
1575	c.wg.Done()
1576	rpcsLog.Tracef("Websocket client notification queue handler done "+
1577		"for %s", c.addr)
1578}
1579
1580// outHandler handles all outgoing messages for the websocket connection.  It
1581// must be run as a goroutine.  It uses a buffered channel to serialize output
1582// messages while allowing the sender to continue running asynchronously.  It
1583// must be run as a goroutine.
1584func (c *wsClient) outHandler() {
1585out:
1586	for {
1587		// Send any messages ready for send until the quit channel is
1588		// closed.
1589		select {
1590		case r := <-c.sendChan:
1591			err := c.conn.WriteMessage(websocket.TextMessage, r.msg)
1592			if err != nil {
1593				c.Disconnect()
1594				break out
1595			}
1596			if r.doneChan != nil {
1597				r.doneChan <- true
1598			}
1599
1600		case <-c.quit:
1601			break out
1602		}
1603	}
1604
1605	// Drain any wait channels before exiting so nothing is left waiting
1606	// around to send.
1607cleanup:
1608	for {
1609		select {
1610		case r := <-c.sendChan:
1611			if r.doneChan != nil {
1612				r.doneChan <- false
1613			}
1614		default:
1615			break cleanup
1616		}
1617	}
1618	c.wg.Done()
1619	rpcsLog.Tracef("Websocket client output handler done for %s", c.addr)
1620}
1621
1622// SendMessage sends the passed json to the websocket client.  It is backed
1623// by a buffered channel, so it will not block until the send channel is full.
1624// Note however that QueueNotification must be used for sending async
1625// notifications instead of the this function.  This approach allows a limit to
1626// the number of outstanding requests a client can make without preventing or
1627// blocking on async notifications.
1628func (c *wsClient) SendMessage(marshalledJSON []byte, doneChan chan bool) {
1629	// Don't send the message if disconnected.
1630	if c.Disconnected() {
1631		if doneChan != nil {
1632			doneChan <- false
1633		}
1634		return
1635	}
1636
1637	c.sendChan <- wsResponse{msg: marshalledJSON, doneChan: doneChan}
1638}
1639
1640// ErrClientQuit describes the error where a client send is not processed due
1641// to the client having already been disconnected or dropped.
1642var ErrClientQuit = errors.New("client quit")
1643
1644// QueueNotification queues the passed notification to be sent to the websocket
1645// client.  This function, as the name implies, is only intended for
1646// notifications since it has additional logic to prevent other subsystems, such
1647// as the memory pool and block manager, from blocking even when the send
1648// channel is full.
1649//
1650// If the client is in the process of shutting down, this function returns
1651// ErrClientQuit.  This is intended to be checked by long-running notification
1652// handlers to stop processing if there is no more work needed to be done.
1653func (c *wsClient) QueueNotification(marshalledJSON []byte) error {
1654	// Don't queue the message if disconnected.
1655	if c.Disconnected() {
1656		return ErrClientQuit
1657	}
1658
1659	c.ntfnChan <- marshalledJSON
1660	return nil
1661}
1662
1663// Disconnected returns whether or not the websocket client is disconnected.
1664func (c *wsClient) Disconnected() bool {
1665	c.Lock()
1666	isDisconnected := c.disconnected
1667	c.Unlock()
1668
1669	return isDisconnected
1670}
1671
1672// Disconnect disconnects the websocket client.
1673func (c *wsClient) Disconnect() {
1674	c.Lock()
1675	defer c.Unlock()
1676
1677	// Nothing to do if already disconnected.
1678	if c.disconnected {
1679		return
1680	}
1681
1682	rpcsLog.Tracef("Disconnecting websocket client %s", c.addr)
1683	close(c.quit)
1684	c.conn.Close()
1685	c.disconnected = true
1686}
1687
1688// Start begins processing input and output messages.
1689func (c *wsClient) Start() {
1690	rpcsLog.Tracef("Starting websocket client %s", c.addr)
1691
1692	// Start processing input and output.
1693	c.wg.Add(3)
1694	go c.inHandler()
1695	go c.notificationQueueHandler()
1696	go c.outHandler()
1697}
1698
1699// WaitForShutdown blocks until the websocket client goroutines are stopped
1700// and the connection is closed.
1701func (c *wsClient) WaitForShutdown() {
1702	c.wg.Wait()
1703}
1704
1705// newWebsocketClient returns a new websocket client given the notification
1706// manager, websocket connection, remote address, and whether or not the client
1707// has already been authenticated (via HTTP Basic access authentication).  The
1708// returned client is ready to start.  Once started, the client will process
1709// incoming and outgoing messages in separate goroutines complete with queuing
1710// and asynchrous handling for long-running operations.
1711func newWebsocketClient(server *rpcServer, conn *websocket.Conn,
1712	remoteAddr string, authenticated bool, isAdmin bool) (*wsClient, error) {
1713
1714	sessionID, err := wire.RandomUint64()
1715	if err != nil {
1716		return nil, err
1717	}
1718
1719	client := &wsClient{
1720		conn:              conn,
1721		addr:              remoteAddr,
1722		authenticated:     authenticated,
1723		isAdmin:           isAdmin,
1724		sessionID:         sessionID,
1725		server:            server,
1726		addrRequests:      make(map[string]struct{}),
1727		spentRequests:     make(map[wire.OutPoint]struct{}),
1728		serviceRequestSem: makeSemaphore(cfg.RPCMaxConcurrentReqs),
1729		ntfnChan:          make(chan []byte, 1), // nonblocking sync
1730		sendChan:          make(chan wsResponse, websocketSendBufferSize),
1731		quit:              make(chan struct{}),
1732	}
1733	return client, nil
1734}
1735
1736// handleWebsocketHelp implements the help command for websocket connections.
1737func handleWebsocketHelp(wsc *wsClient, icmd interface{}) (interface{}, error) {
1738	cmd, ok := icmd.(*btcjson.HelpCmd)
1739	if !ok {
1740		return nil, btcjson.ErrRPCInternal
1741	}
1742
1743	// Provide a usage overview of all commands when no specific command
1744	// was specified.
1745	var command string
1746	if cmd.Command != nil {
1747		command = *cmd.Command
1748	}
1749	if command == "" {
1750		usage, err := wsc.server.helpCacher.rpcUsage(true)
1751		if err != nil {
1752			context := "Failed to generate RPC usage"
1753			return nil, internalRPCError(err.Error(), context)
1754		}
1755		return usage, nil
1756	}
1757
1758	// Check that the command asked for is supported and implemented.
1759	// Search the list of websocket handlers as well as the main list of
1760	// handlers since help should only be provided for those cases.
1761	valid := true
1762	if _, ok := rpcHandlers[command]; !ok {
1763		if _, ok := wsHandlers[command]; !ok {
1764			valid = false
1765		}
1766	}
1767	if !valid {
1768		return nil, &btcjson.RPCError{
1769			Code:    btcjson.ErrRPCInvalidParameter,
1770			Message: "Unknown command: " + command,
1771		}
1772	}
1773
1774	// Get the help for the command.
1775	help, err := wsc.server.helpCacher.rpcMethodHelp(command)
1776	if err != nil {
1777		context := "Failed to generate help"
1778		return nil, internalRPCError(err.Error(), context)
1779	}
1780	return help, nil
1781}
1782
1783// handleLoadTxFilter implements the loadtxfilter command extension for
1784// websocket connections.
1785//
1786// NOTE: This extension is ported from github.com/decred/dcrd
1787func handleLoadTxFilter(wsc *wsClient, icmd interface{}) (interface{}, error) {
1788	cmd := icmd.(*btcjson.LoadTxFilterCmd)
1789
1790	outPoints := make([]wire.OutPoint, len(cmd.OutPoints))
1791	for i := range cmd.OutPoints {
1792		hash, err := chainhash.NewHashFromStr(cmd.OutPoints[i].Hash)
1793		if err != nil {
1794			return nil, &btcjson.RPCError{
1795				Code:    btcjson.ErrRPCInvalidParameter,
1796				Message: err.Error(),
1797			}
1798		}
1799		outPoints[i] = wire.OutPoint{
1800			Hash:  *hash,
1801			Index: cmd.OutPoints[i].Index,
1802		}
1803	}
1804
1805	params := wsc.server.cfg.ChainParams
1806
1807	wsc.Lock()
1808	if cmd.Reload || wsc.filterData == nil {
1809		wsc.filterData = newWSClientFilter(cmd.Addresses, outPoints,
1810			params)
1811		wsc.Unlock()
1812	} else {
1813		wsc.Unlock()
1814
1815		wsc.filterData.mu.Lock()
1816		for _, a := range cmd.Addresses {
1817			wsc.filterData.addAddressStr(a, params)
1818		}
1819		for i := range outPoints {
1820			wsc.filterData.addUnspentOutPoint(&outPoints[i])
1821		}
1822		wsc.filterData.mu.Unlock()
1823	}
1824
1825	return nil, nil
1826}
1827
1828// handleNotifyBlocks implements the notifyblocks command extension for
1829// websocket connections.
1830func handleNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) {
1831	wsc.server.ntfnMgr.RegisterBlockUpdates(wsc)
1832	return nil, nil
1833}
1834
1835// handleSession implements the session command extension for websocket
1836// connections.
1837func handleSession(wsc *wsClient, icmd interface{}) (interface{}, error) {
1838	return &btcjson.SessionResult{SessionID: wsc.sessionID}, nil
1839}
1840
1841// handleStopNotifyBlocks implements the stopnotifyblocks command extension for
1842// websocket connections.
1843func handleStopNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) {
1844	wsc.server.ntfnMgr.UnregisterBlockUpdates(wsc)
1845	return nil, nil
1846}
1847
1848// handleNotifySpent implements the notifyspent command extension for
1849// websocket connections.
1850func handleNotifySpent(wsc *wsClient, icmd interface{}) (interface{}, error) {
1851	cmd, ok := icmd.(*btcjson.NotifySpentCmd)
1852	if !ok {
1853		return nil, btcjson.ErrRPCInternal
1854	}
1855
1856	outpoints, err := deserializeOutpoints(cmd.OutPoints)
1857	if err != nil {
1858		return nil, err
1859	}
1860
1861	wsc.server.ntfnMgr.RegisterSpentRequests(wsc, outpoints)
1862	return nil, nil
1863}
1864
1865// handleNotifyNewTransations implements the notifynewtransactions command
1866// extension for websocket connections.
1867func handleNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, error) {
1868	cmd, ok := icmd.(*btcjson.NotifyNewTransactionsCmd)
1869	if !ok {
1870		return nil, btcjson.ErrRPCInternal
1871	}
1872
1873	wsc.verboseTxUpdates = cmd.Verbose != nil && *cmd.Verbose
1874	wsc.server.ntfnMgr.RegisterNewMempoolTxsUpdates(wsc)
1875	return nil, nil
1876}
1877
1878// handleStopNotifyNewTransations implements the stopnotifynewtransactions
1879// command extension for websocket connections.
1880func handleStopNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, error) {
1881	wsc.server.ntfnMgr.UnregisterNewMempoolTxsUpdates(wsc)
1882	return nil, nil
1883}
1884
1885// handleNotifyReceived implements the notifyreceived command extension for
1886// websocket connections.
1887func handleNotifyReceived(wsc *wsClient, icmd interface{}) (interface{}, error) {
1888	cmd, ok := icmd.(*btcjson.NotifyReceivedCmd)
1889	if !ok {
1890		return nil, btcjson.ErrRPCInternal
1891	}
1892
1893	// Decode addresses to validate input, but the strings slice is used
1894	// directly if these are all ok.
1895	err := checkAddressValidity(cmd.Addresses, wsc.server.cfg.ChainParams)
1896	if err != nil {
1897		return nil, err
1898	}
1899
1900	wsc.server.ntfnMgr.RegisterTxOutAddressRequests(wsc, cmd.Addresses)
1901	return nil, nil
1902}
1903
1904// handleStopNotifySpent implements the stopnotifyspent command extension for
1905// websocket connections.
1906func handleStopNotifySpent(wsc *wsClient, icmd interface{}) (interface{}, error) {
1907	cmd, ok := icmd.(*btcjson.StopNotifySpentCmd)
1908	if !ok {
1909		return nil, btcjson.ErrRPCInternal
1910	}
1911
1912	outpoints, err := deserializeOutpoints(cmd.OutPoints)
1913	if err != nil {
1914		return nil, err
1915	}
1916
1917	for _, outpoint := range outpoints {
1918		wsc.server.ntfnMgr.UnregisterSpentRequest(wsc, outpoint)
1919	}
1920
1921	return nil, nil
1922}
1923
1924// handleStopNotifyReceived implements the stopnotifyreceived command extension
1925// for websocket connections.
1926func handleStopNotifyReceived(wsc *wsClient, icmd interface{}) (interface{}, error) {
1927	cmd, ok := icmd.(*btcjson.StopNotifyReceivedCmd)
1928	if !ok {
1929		return nil, btcjson.ErrRPCInternal
1930	}
1931
1932	// Decode addresses to validate input, but the strings slice is used
1933	// directly if these are all ok.
1934	err := checkAddressValidity(cmd.Addresses, wsc.server.cfg.ChainParams)
1935	if err != nil {
1936		return nil, err
1937	}
1938
1939	for _, addr := range cmd.Addresses {
1940		wsc.server.ntfnMgr.UnregisterTxOutAddressRequest(wsc, addr)
1941	}
1942
1943	return nil, nil
1944}
1945
1946// checkAddressValidity checks the validity of each address in the passed
1947// string slice. It does this by attempting to decode each address using the
1948// current active network parameters. If any single address fails to decode
1949// properly, the function returns an error. Otherwise, nil is returned.
1950func checkAddressValidity(addrs []string, params *chaincfg.Params) error {
1951	for _, addr := range addrs {
1952		_, err := btcutil.DecodeAddress(addr, params)
1953		if err != nil {
1954			return &btcjson.RPCError{
1955				Code: btcjson.ErrRPCInvalidAddressOrKey,
1956				Message: fmt.Sprintf("Invalid address or key: %v",
1957					addr),
1958			}
1959		}
1960	}
1961	return nil
1962}
1963
1964// deserializeOutpoints deserializes each serialized outpoint.
1965func deserializeOutpoints(serializedOuts []btcjson.OutPoint) ([]*wire.OutPoint, error) {
1966	outpoints := make([]*wire.OutPoint, 0, len(serializedOuts))
1967	for i := range serializedOuts {
1968		blockHash, err := chainhash.NewHashFromStr(serializedOuts[i].Hash)
1969		if err != nil {
1970			return nil, rpcDecodeHexError(serializedOuts[i].Hash)
1971		}
1972		index := serializedOuts[i].Index
1973		outpoints = append(outpoints, wire.NewOutPoint(blockHash, index))
1974	}
1975
1976	return outpoints, nil
1977}
1978
1979type rescanKeys struct {
1980	addrs   map[string]struct{}
1981	unspent map[wire.OutPoint]struct{}
1982}
1983
1984// unspentSlice returns a slice of currently-unspent outpoints for the rescan
1985// lookup keys.  This is primarily intended to be used to register outpoints
1986// for continuous notifications after a rescan has completed.
1987func (r *rescanKeys) unspentSlice() []*wire.OutPoint {
1988	ops := make([]*wire.OutPoint, 0, len(r.unspent))
1989	for op := range r.unspent {
1990		opCopy := op
1991		ops = append(ops, &opCopy)
1992	}
1993	return ops
1994}
1995
1996// ErrRescanReorg defines the error that is returned when an unrecoverable
1997// reorganize is detected during a rescan.
1998var ErrRescanReorg = btcjson.RPCError{
1999	Code:    btcjson.ErrRPCDatabase,
2000	Message: "Reorganize",
2001}
2002
2003// rescanBlock rescans all transactions in a single block.  This is a helper
2004// function for handleRescan.
2005func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *btcutil.Block) {
2006	for _, tx := range blk.Transactions() {
2007		// Hexadecimal representation of this tx.  Only created if
2008		// needed, and reused for later notifications if already made.
2009		var txHex string
2010
2011		// All inputs and outputs must be iterated through to correctly
2012		// modify the unspent map, however, just a single notification
2013		// for any matching transaction inputs or outputs should be
2014		// created and sent.
2015		spentNotified := false
2016		recvNotified := false
2017
2018		// notifySpend is a closure we'll use when we first detect that
2019		// a transactions spends an outpoint/script in our filter list.
2020		notifySpend := func() error {
2021			if txHex == "" {
2022				txHex = txHexString(tx.MsgTx())
2023			}
2024			marshalledJSON, err := newRedeemingTxNotification(
2025				txHex, tx.Index(), blk,
2026			)
2027			if err != nil {
2028				return fmt.Errorf("unable to marshal "+
2029					"btcjson.RedeeminTxNtfn: %v", err)
2030			}
2031
2032			return wsc.QueueNotification(marshalledJSON)
2033		}
2034
2035		// We'll start by iterating over the transaction's inputs to
2036		// determine if it spends an outpoint/script in our filter list.
2037		for _, txin := range tx.MsgTx().TxIn {
2038			// If it spends an outpoint, we'll dispatch a spend
2039			// notification for the transaction.
2040			if _, ok := lookups.unspent[txin.PreviousOutPoint]; ok {
2041				delete(lookups.unspent, txin.PreviousOutPoint)
2042
2043				if spentNotified {
2044					continue
2045				}
2046
2047				err := notifySpend()
2048
2049				// Stop the rescan early if the websocket client
2050				// disconnected.
2051				if err == ErrClientQuit {
2052					return
2053				}
2054				if err != nil {
2055					rpcsLog.Errorf("Unable to notify "+
2056						"redeeming transaction %v: %v",
2057						tx.Hash(), err)
2058					continue
2059				}
2060
2061				spentNotified = true
2062			}
2063
2064			// We'll also recompute the pkScript the input is
2065			// attempting to spend to determine whether it is
2066			// relevant to us.
2067			pkScript, err := txscript.ComputePkScript(
2068				txin.SignatureScript, txin.Witness,
2069			)
2070			if err != nil {
2071				continue
2072			}
2073			addr, err := pkScript.Address(wsc.server.cfg.ChainParams)
2074			if err != nil {
2075				continue
2076			}
2077
2078			// If it is, we'll also dispatch a spend notification
2079			// for this transaction if we haven't already.
2080			if _, ok := lookups.addrs[addr.String()]; ok {
2081				if spentNotified {
2082					continue
2083				}
2084
2085				err := notifySpend()
2086
2087				// Stop the rescan early if the websocket client
2088				// disconnected.
2089				if err == ErrClientQuit {
2090					return
2091				}
2092				if err != nil {
2093					rpcsLog.Errorf("Unable to notify "+
2094						"redeeming transaction %v: %v",
2095						tx.Hash(), err)
2096					continue
2097				}
2098
2099				spentNotified = true
2100			}
2101		}
2102
2103		for txOutIdx, txout := range tx.MsgTx().TxOut {
2104			_, addrs, _, _ := txscript.ExtractPkScriptAddrs(
2105				txout.PkScript, wsc.server.cfg.ChainParams)
2106
2107			for _, addr := range addrs {
2108				if _, ok := lookups.addrs[addr.String()]; !ok {
2109					continue
2110				}
2111
2112				outpoint := wire.OutPoint{
2113					Hash:  *tx.Hash(),
2114					Index: uint32(txOutIdx),
2115				}
2116				lookups.unspent[outpoint] = struct{}{}
2117
2118				if recvNotified {
2119					continue
2120				}
2121
2122				if txHex == "" {
2123					txHex = txHexString(tx.MsgTx())
2124				}
2125				ntfn := btcjson.NewRecvTxNtfn(txHex,
2126					blockDetails(blk, tx.Index()))
2127
2128				marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
2129				if err != nil {
2130					rpcsLog.Errorf("Failed to marshal recvtx notification: %v", err)
2131					return
2132				}
2133
2134				err = wsc.QueueNotification(marshalledJSON)
2135				// Stop the rescan early if the websocket client
2136				// disconnected.
2137				if err == ErrClientQuit {
2138					return
2139				}
2140				recvNotified = true
2141			}
2142		}
2143	}
2144}
2145
2146// rescanBlockFilter rescans a block for any relevant transactions for the
2147// passed lookup keys. Any discovered transactions are returned hex encoded as
2148// a string slice.
2149//
2150// NOTE: This extension is ported from github.com/decred/dcrd
2151func rescanBlockFilter(filter *wsClientFilter, block *btcutil.Block, params *chaincfg.Params) []string {
2152	var transactions []string
2153
2154	filter.mu.Lock()
2155	for _, tx := range block.Transactions() {
2156		msgTx := tx.MsgTx()
2157
2158		// Keep track of whether the transaction has already been added
2159		// to the result.  It shouldn't be added twice.
2160		added := false
2161
2162		// Scan inputs if not a coinbase transaction.
2163		if !blockchain.IsCoinBaseTx(msgTx) {
2164			for _, input := range msgTx.TxIn {
2165				if !filter.existsUnspentOutPoint(&input.PreviousOutPoint) {
2166					continue
2167				}
2168				if !added {
2169					transactions = append(
2170						transactions,
2171						txHexString(msgTx))
2172					added = true
2173				}
2174			}
2175		}
2176
2177		// Scan outputs.
2178		for i, output := range msgTx.TxOut {
2179			_, addrs, _, err := txscript.ExtractPkScriptAddrs(
2180				output.PkScript, params)
2181			if err != nil {
2182				continue
2183			}
2184			for _, a := range addrs {
2185				if !filter.existsAddress(a) {
2186					continue
2187				}
2188
2189				op := wire.OutPoint{
2190					Hash:  *tx.Hash(),
2191					Index: uint32(i),
2192				}
2193				filter.addUnspentOutPoint(&op)
2194
2195				if !added {
2196					transactions = append(
2197						transactions,
2198						txHexString(msgTx))
2199					added = true
2200				}
2201			}
2202		}
2203	}
2204	filter.mu.Unlock()
2205
2206	return transactions
2207}
2208
2209// handleRescanBlocks implements the rescanblocks command extension for
2210// websocket connections.
2211//
2212// NOTE: This extension is ported from github.com/decred/dcrd
2213func handleRescanBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) {
2214	cmd, ok := icmd.(*btcjson.RescanBlocksCmd)
2215	if !ok {
2216		return nil, btcjson.ErrRPCInternal
2217	}
2218
2219	// Load client's transaction filter.  Must exist in order to continue.
2220	wsc.Lock()
2221	filter := wsc.filterData
2222	wsc.Unlock()
2223	if filter == nil {
2224		return nil, &btcjson.RPCError{
2225			Code:    btcjson.ErrRPCMisc,
2226			Message: "Transaction filter must be loaded before rescanning",
2227		}
2228	}
2229
2230	blockHashes := make([]*chainhash.Hash, len(cmd.BlockHashes))
2231
2232	for i := range cmd.BlockHashes {
2233		hash, err := chainhash.NewHashFromStr(cmd.BlockHashes[i])
2234		if err != nil {
2235			return nil, err
2236		}
2237		blockHashes[i] = hash
2238	}
2239
2240	discoveredData := make([]btcjson.RescannedBlock, 0, len(blockHashes))
2241
2242	// Iterate over each block in the request and rescan.  When a block
2243	// contains relevant transactions, add it to the response.
2244	bc := wsc.server.cfg.Chain
2245	params := wsc.server.cfg.ChainParams
2246	var lastBlockHash *chainhash.Hash
2247	for i := range blockHashes {
2248		block, err := bc.BlockByHash(blockHashes[i])
2249		if err != nil {
2250			return nil, &btcjson.RPCError{
2251				Code:    btcjson.ErrRPCBlockNotFound,
2252				Message: "Failed to fetch block: " + err.Error(),
2253			}
2254		}
2255		if lastBlockHash != nil && block.MsgBlock().Header.PrevBlock != *lastBlockHash {
2256			return nil, &btcjson.RPCError{
2257				Code: btcjson.ErrRPCInvalidParameter,
2258				Message: fmt.Sprintf("Block %v is not a child of %v",
2259					blockHashes[i], lastBlockHash),
2260			}
2261		}
2262		lastBlockHash = blockHashes[i]
2263
2264		transactions := rescanBlockFilter(filter, block, params)
2265		if len(transactions) != 0 {
2266			discoveredData = append(discoveredData, btcjson.RescannedBlock{
2267				Hash:         cmd.BlockHashes[i],
2268				Transactions: transactions,
2269			})
2270		}
2271	}
2272
2273	return &discoveredData, nil
2274}
2275
2276// recoverFromReorg attempts to recover from a detected reorganize during a
2277// rescan.  It fetches a new range of block shas from the database and
2278// verifies that the new range of blocks is on the same fork as a previous
2279// range of blocks.  If this condition does not hold true, the JSON-RPC error
2280// for an unrecoverable reorganize is returned.
2281func recoverFromReorg(chain *blockchain.BlockChain, minBlock, maxBlock int32,
2282	lastBlock *chainhash.Hash) ([]chainhash.Hash, error) {
2283
2284	hashList, err := chain.HeightRange(minBlock, maxBlock)
2285	if err != nil {
2286		rpcsLog.Errorf("Error looking up block range: %v", err)
2287		return nil, &btcjson.RPCError{
2288			Code:    btcjson.ErrRPCDatabase,
2289			Message: "Database error: " + err.Error(),
2290		}
2291	}
2292	if lastBlock == nil || len(hashList) == 0 {
2293		return hashList, nil
2294	}
2295
2296	blk, err := chain.BlockByHash(&hashList[0])
2297	if err != nil {
2298		rpcsLog.Errorf("Error looking up possibly reorged block: %v",
2299			err)
2300		return nil, &btcjson.RPCError{
2301			Code:    btcjson.ErrRPCDatabase,
2302			Message: "Database error: " + err.Error(),
2303		}
2304	}
2305	jsonErr := descendantBlock(lastBlock, blk)
2306	if jsonErr != nil {
2307		return nil, jsonErr
2308	}
2309	return hashList, nil
2310}
2311
2312// descendantBlock returns the appropriate JSON-RPC error if a current block
2313// fetched during a reorganize is not a direct child of the parent block hash.
2314func descendantBlock(prevHash *chainhash.Hash, curBlock *btcutil.Block) error {
2315	curHash := &curBlock.MsgBlock().Header.PrevBlock
2316	if !prevHash.IsEqual(curHash) {
2317		rpcsLog.Errorf("Stopping rescan for reorged block %v "+
2318			"(replaced by block %v)", prevHash, curHash)
2319		return &ErrRescanReorg
2320	}
2321	return nil
2322}
2323
2324// scanBlockChunks executes a rescan in chunked stages. We do this to limit the
2325// amount of memory that we'll allocate to a given rescan. Every so often,
2326// we'll send back a rescan progress notification to the websockets client. The
2327// final block and block hash that we've scanned will be returned.
2328func scanBlockChunks(wsc *wsClient, cmd *btcjson.RescanCmd, lookups *rescanKeys, minBlock,
2329	maxBlock int32, chain *blockchain.BlockChain) (
2330	*btcutil.Block, *chainhash.Hash, error) {
2331
2332	// lastBlock and lastBlockHash track the previously-rescanned block.
2333	// They equal nil when no previous blocks have been rescanned.
2334	var (
2335		lastBlock     *btcutil.Block
2336		lastBlockHash *chainhash.Hash
2337	)
2338
2339	// A ticker is created to wait at least 10 seconds before notifying the
2340	// websocket client of the current progress completed by the rescan.
2341	ticker := time.NewTicker(10 * time.Second)
2342	defer ticker.Stop()
2343
2344	// Instead of fetching all block shas at once, fetch in smaller chunks
2345	// to ensure large rescans consume a limited amount of memory.
2346fetchRange:
2347	for minBlock < maxBlock {
2348		// Limit the max number of hashes to fetch at once to the
2349		// maximum number of items allowed in a single inventory.
2350		// This value could be higher since it's not creating inventory
2351		// messages, but this mirrors the limiting logic used in the
2352		// peer-to-peer protocol.
2353		maxLoopBlock := maxBlock
2354		if maxLoopBlock-minBlock > wire.MaxInvPerMsg {
2355			maxLoopBlock = minBlock + wire.MaxInvPerMsg
2356		}
2357		hashList, err := chain.HeightRange(minBlock, maxLoopBlock)
2358		if err != nil {
2359			rpcsLog.Errorf("Error looking up block range: %v", err)
2360			return nil, nil, &btcjson.RPCError{
2361				Code:    btcjson.ErrRPCDatabase,
2362				Message: "Database error: " + err.Error(),
2363			}
2364		}
2365		if len(hashList) == 0 {
2366			// The rescan is finished if no blocks hashes for this
2367			// range were successfully fetched and a stop block
2368			// was provided.
2369			if maxBlock != math.MaxInt32 {
2370				break
2371			}
2372
2373			// If the rescan is through the current block, set up
2374			// the client to continue to receive notifications
2375			// regarding all rescanned addresses and the current set
2376			// of unspent outputs.
2377			//
2378			// This is done safely by temporarily grabbing exclusive
2379			// access of the block manager.  If no more blocks have
2380			// been attached between this pause and the fetch above,
2381			// then it is safe to register the websocket client for
2382			// continuous notifications if necessary.  Otherwise,
2383			// continue the fetch loop again to rescan the new
2384			// blocks (or error due to an irrecoverable reorganize).
2385			pauseGuard := wsc.server.cfg.SyncMgr.Pause()
2386			best := wsc.server.cfg.Chain.BestSnapshot()
2387			curHash := &best.Hash
2388			again := true
2389			if lastBlockHash == nil || *lastBlockHash == *curHash {
2390				again = false
2391				n := wsc.server.ntfnMgr
2392				n.RegisterSpentRequests(wsc, lookups.unspentSlice())
2393				n.RegisterTxOutAddressRequests(wsc, cmd.Addresses)
2394			}
2395			close(pauseGuard)
2396			if err != nil {
2397				rpcsLog.Errorf("Error fetching best block "+
2398					"hash: %v", err)
2399				return nil, nil, &btcjson.RPCError{
2400					Code: btcjson.ErrRPCDatabase,
2401					Message: "Database error: " +
2402						err.Error(),
2403				}
2404			}
2405			if again {
2406				continue
2407			}
2408			break
2409		}
2410
2411	loopHashList:
2412		for i := range hashList {
2413			blk, err := chain.BlockByHash(&hashList[i])
2414			if err != nil {
2415				// Only handle reorgs if a block could not be
2416				// found for the hash.
2417				if dbErr, ok := err.(database.Error); !ok ||
2418					dbErr.ErrorCode != database.ErrBlockNotFound {
2419
2420					rpcsLog.Errorf("Error looking up "+
2421						"block: %v", err)
2422					return nil, nil, &btcjson.RPCError{
2423						Code: btcjson.ErrRPCDatabase,
2424						Message: "Database error: " +
2425							err.Error(),
2426					}
2427				}
2428
2429				// If an absolute max block was specified, don't
2430				// attempt to handle the reorg.
2431				if maxBlock != math.MaxInt32 {
2432					rpcsLog.Errorf("Stopping rescan for "+
2433						"reorged block %v",
2434						cmd.EndBlock)
2435					return nil, nil, &ErrRescanReorg
2436				}
2437
2438				// If the lookup for the previously valid block
2439				// hash failed, there may have been a reorg.
2440				// Fetch a new range of block hashes and verify
2441				// that the previously processed block (if there
2442				// was any) still exists in the database.  If it
2443				// doesn't, we error.
2444				//
2445				// A goto is used to branch executation back to
2446				// before the range was evaluated, as it must be
2447				// reevaluated for the new hashList.
2448				minBlock += int32(i)
2449				hashList, err = recoverFromReorg(
2450					chain, minBlock, maxBlock, lastBlockHash,
2451				)
2452				if err != nil {
2453					return nil, nil, err
2454				}
2455				if len(hashList) == 0 {
2456					break fetchRange
2457				}
2458				goto loopHashList
2459			}
2460			if i == 0 && lastBlockHash != nil {
2461				// Ensure the new hashList is on the same fork
2462				// as the last block from the old hashList.
2463				jsonErr := descendantBlock(lastBlockHash, blk)
2464				if jsonErr != nil {
2465					return nil, nil, jsonErr
2466				}
2467			}
2468
2469			// A select statement is used to stop rescans if the
2470			// client requesting the rescan has disconnected.
2471			select {
2472			case <-wsc.quit:
2473				rpcsLog.Debugf("Stopped rescan at height %v "+
2474					"for disconnected client", blk.Height())
2475				return nil, nil, nil
2476			default:
2477				rescanBlock(wsc, lookups, blk)
2478				lastBlock = blk
2479				lastBlockHash = blk.Hash()
2480			}
2481
2482			// Periodically notify the client of the progress
2483			// completed.  Continue with next block if no progress
2484			// notification is needed yet.
2485			select {
2486			case <-ticker.C: // fallthrough
2487			default:
2488				continue
2489			}
2490
2491			n := btcjson.NewRescanProgressNtfn(
2492				hashList[i].String(), blk.Height(),
2493				blk.MsgBlock().Header.Timestamp.Unix(),
2494			)
2495			mn, err := btcjson.MarshalCmd(nil, n)
2496			if err != nil {
2497				rpcsLog.Errorf("Failed to marshal rescan "+
2498					"progress notification: %v", err)
2499				continue
2500			}
2501
2502			if err = wsc.QueueNotification(mn); err == ErrClientQuit {
2503				// Finished if the client disconnected.
2504				rpcsLog.Debugf("Stopped rescan at height %v "+
2505					"for disconnected client", blk.Height())
2506				return nil, nil, nil
2507			}
2508		}
2509
2510		minBlock += int32(len(hashList))
2511	}
2512
2513	return lastBlock, lastBlockHash, nil
2514}
2515
2516// handleRescan implements the rescan command extension for websocket
2517// connections.
2518//
2519// NOTE: This does not smartly handle reorgs, and fixing requires database
2520// changes (for safe, concurrent access to full block ranges, and support
2521// for other chains than the best chain).  It will, however, detect whether
2522// a reorg removed a block that was previously processed, and result in the
2523// handler erroring.  Clients must handle this by finding a block still in
2524// the chain (perhaps from a rescanprogress notification) to resume their
2525// rescan.
2526func handleRescan(wsc *wsClient, icmd interface{}) (interface{}, error) {
2527	cmd, ok := icmd.(*btcjson.RescanCmd)
2528	if !ok {
2529		return nil, btcjson.ErrRPCInternal
2530	}
2531
2532	outpoints := make([]*wire.OutPoint, 0, len(cmd.OutPoints))
2533	for i := range cmd.OutPoints {
2534		cmdOutpoint := &cmd.OutPoints[i]
2535		blockHash, err := chainhash.NewHashFromStr(cmdOutpoint.Hash)
2536		if err != nil {
2537			return nil, rpcDecodeHexError(cmdOutpoint.Hash)
2538		}
2539		outpoint := wire.NewOutPoint(blockHash, cmdOutpoint.Index)
2540		outpoints = append(outpoints, outpoint)
2541	}
2542
2543	numAddrs := len(cmd.Addresses)
2544	if numAddrs == 1 {
2545		rpcsLog.Info("Beginning rescan for 1 address")
2546	} else {
2547		rpcsLog.Infof("Beginning rescan for %d addresses", numAddrs)
2548	}
2549
2550	// Build lookup maps.
2551	lookups := rescanKeys{
2552		addrs:   map[string]struct{}{},
2553		unspent: map[wire.OutPoint]struct{}{},
2554	}
2555	for _, addrStr := range cmd.Addresses {
2556		lookups.addrs[addrStr] = struct{}{}
2557	}
2558	for _, outpoint := range outpoints {
2559		lookups.unspent[*outpoint] = struct{}{}
2560	}
2561
2562	chain := wsc.server.cfg.Chain
2563
2564	minBlockHash, err := chainhash.NewHashFromStr(cmd.BeginBlock)
2565	if err != nil {
2566		return nil, rpcDecodeHexError(cmd.BeginBlock)
2567	}
2568	minBlock, err := chain.BlockHeightByHash(minBlockHash)
2569	if err != nil {
2570		return nil, &btcjson.RPCError{
2571			Code:    btcjson.ErrRPCBlockNotFound,
2572			Message: "Error getting block: " + err.Error(),
2573		}
2574	}
2575
2576	maxBlock := int32(math.MaxInt32)
2577	if cmd.EndBlock != nil {
2578		maxBlockHash, err := chainhash.NewHashFromStr(*cmd.EndBlock)
2579		if err != nil {
2580			return nil, rpcDecodeHexError(*cmd.EndBlock)
2581		}
2582		maxBlock, err = chain.BlockHeightByHash(maxBlockHash)
2583		if err != nil {
2584			return nil, &btcjson.RPCError{
2585				Code:    btcjson.ErrRPCBlockNotFound,
2586				Message: "Error getting block: " + err.Error(),
2587			}
2588		}
2589	}
2590
2591	var (
2592		lastBlock     *btcutil.Block
2593		lastBlockHash *chainhash.Hash
2594	)
2595	if len(lookups.addrs) != 0 || len(lookups.unspent) != 0 {
2596		// With all the arguments parsed, we'll execute our chunked rescan
2597		// which will notify the clients of any address deposits or output
2598		// spends.
2599		lastBlock, lastBlockHash, err = scanBlockChunks(
2600			wsc, cmd, &lookups, minBlock, maxBlock, chain,
2601		)
2602		if err != nil {
2603			return nil, err
2604		}
2605
2606		// If the last block is nil, then this means that the client
2607		// disconnected mid-rescan. As a result, we don't need to send
2608		// anything back to them.
2609		if lastBlock == nil {
2610			return nil, nil
2611		}
2612	} else {
2613		rpcsLog.Infof("Skipping rescan as client has no addrs/utxos")
2614
2615		// If we didn't actually do a rescan, then we'll give the
2616		// client our best known block within the final rescan finished
2617		// notification.
2618		chainTip := chain.BestSnapshot()
2619		lastBlockHash = &chainTip.Hash
2620		lastBlock, err = chain.BlockByHash(lastBlockHash)
2621		if err != nil {
2622			return nil, &btcjson.RPCError{
2623				Code:    btcjson.ErrRPCBlockNotFound,
2624				Message: "Error getting block: " + err.Error(),
2625			}
2626		}
2627	}
2628
2629	// Notify websocket client of the finished rescan.  Due to how btcd
2630	// asynchronously queues notifications to not block calling code,
2631	// there is no guarantee that any of the notifications created during
2632	// rescan (such as rescanprogress, recvtx and redeemingtx) will be
2633	// received before the rescan RPC returns.  Therefore, another method
2634	// is needed to safely inform clients that all rescan notifications have
2635	// been sent.
2636	n := btcjson.NewRescanFinishedNtfn(
2637		lastBlockHash.String(), lastBlock.Height(),
2638		lastBlock.MsgBlock().Header.Timestamp.Unix(),
2639	)
2640	if mn, err := btcjson.MarshalCmd(nil, n); err != nil {
2641		rpcsLog.Errorf("Failed to marshal rescan finished "+
2642			"notification: %v", err)
2643	} else {
2644		// The rescan is finished, so we don't care whether the client
2645		// has disconnected at this point, so discard error.
2646		_ = wsc.QueueNotification(mn)
2647	}
2648
2649	rpcsLog.Info("Finished rescan")
2650	return nil, nil
2651}
2652
2653func init() {
2654	wsHandlers = wsHandlersBeforeInit
2655}
2656