1// Copyright (c) 2017 The btcsuite developers
2// Use of this source code is governed by an ISC
3// license that can be found in the LICENSE file.
4
5package main
6
7import (
8	"sync/atomic"
9
10	"github.com/btcsuite/btcd/blockchain"
11	"github.com/btcsuite/btcd/chaincfg/chainhash"
12	"github.com/btcsuite/btcd/mempool"
13	"github.com/btcsuite/btcd/netsync"
14	"github.com/btcsuite/btcd/peer"
15	"github.com/btcsuite/btcd/wire"
16	"github.com/btcsuite/btcutil"
17)
18
19// rpcPeer provides a peer for use with the RPC server and implements the
20// rpcserverPeer interface.
21type rpcPeer serverPeer
22
23// Ensure rpcPeer implements the rpcserverPeer interface.
24var _ rpcserverPeer = (*rpcPeer)(nil)
25
26// ToPeer returns the underlying peer instance.
27//
28// This function is safe for concurrent access and is part of the rpcserverPeer
29// interface implementation.
30func (p *rpcPeer) ToPeer() *peer.Peer {
31	if p == nil {
32		return nil
33	}
34	return (*serverPeer)(p).Peer
35}
36
37// IsTxRelayDisabled returns whether or not the peer has disabled transaction
38// relay.
39//
40// This function is safe for concurrent access and is part of the rpcserverPeer
41// interface implementation.
42func (p *rpcPeer) IsTxRelayDisabled() bool {
43	return (*serverPeer)(p).disableRelayTx
44}
45
46// BanScore returns the current integer value that represents how close the peer
47// is to being banned.
48//
49// This function is safe for concurrent access and is part of the rpcserverPeer
50// interface implementation.
51func (p *rpcPeer) BanScore() uint32 {
52	return (*serverPeer)(p).banScore.Int()
53}
54
55// FeeFilter returns the requested current minimum fee rate for which
56// transactions should be announced.
57//
58// This function is safe for concurrent access and is part of the rpcserverPeer
59// interface implementation.
60func (p *rpcPeer) FeeFilter() int64 {
61	return atomic.LoadInt64(&(*serverPeer)(p).feeFilter)
62}
63
64// rpcConnManager provides a connection manager for use with the RPC server and
65// implements the rpcserverConnManager interface.
66type rpcConnManager struct {
67	server *server
68}
69
70// Ensure rpcConnManager implements the rpcserverConnManager interface.
71var _ rpcserverConnManager = &rpcConnManager{}
72
73// Connect adds the provided address as a new outbound peer.  The permanent flag
74// indicates whether or not to make the peer persistent and reconnect if the
75// connection is lost.  Attempting to connect to an already existing peer will
76// return an error.
77//
78// This function is safe for concurrent access and is part of the
79// rpcserverConnManager interface implementation.
80func (cm *rpcConnManager) Connect(addr string, permanent bool) error {
81	replyChan := make(chan error)
82	cm.server.query <- connectNodeMsg{
83		addr:      addr,
84		permanent: permanent,
85		reply:     replyChan,
86	}
87	return <-replyChan
88}
89
90// RemoveByID removes the peer associated with the provided id from the list of
91// persistent peers.  Attempting to remove an id that does not exist will return
92// an error.
93//
94// This function is safe for concurrent access and is part of the
95// rpcserverConnManager interface implementation.
96func (cm *rpcConnManager) RemoveByID(id int32) error {
97	replyChan := make(chan error)
98	cm.server.query <- removeNodeMsg{
99		cmp:   func(sp *serverPeer) bool { return sp.ID() == id },
100		reply: replyChan,
101	}
102	return <-replyChan
103}
104
105// RemoveByAddr removes the peer associated with the provided address from the
106// list of persistent peers.  Attempting to remove an address that does not
107// exist will return an error.
108//
109// This function is safe for concurrent access and is part of the
110// rpcserverConnManager interface implementation.
111func (cm *rpcConnManager) RemoveByAddr(addr string) error {
112	replyChan := make(chan error)
113	cm.server.query <- removeNodeMsg{
114		cmp:   func(sp *serverPeer) bool { return sp.Addr() == addr },
115		reply: replyChan,
116	}
117	return <-replyChan
118}
119
120// DisconnectByID disconnects the peer associated with the provided id.  This
121// applies to both inbound and outbound peers.  Attempting to remove an id that
122// does not exist will return an error.
123//
124// This function is safe for concurrent access and is part of the
125// rpcserverConnManager interface implementation.
126func (cm *rpcConnManager) DisconnectByID(id int32) error {
127	replyChan := make(chan error)
128	cm.server.query <- disconnectNodeMsg{
129		cmp:   func(sp *serverPeer) bool { return sp.ID() == id },
130		reply: replyChan,
131	}
132	return <-replyChan
133}
134
135// DisconnectByAddr disconnects the peer associated with the provided address.
136// This applies to both inbound and outbound peers.  Attempting to remove an
137// address that does not exist will return an error.
138//
139// This function is safe for concurrent access and is part of the
140// rpcserverConnManager interface implementation.
141func (cm *rpcConnManager) DisconnectByAddr(addr string) error {
142	replyChan := make(chan error)
143	cm.server.query <- disconnectNodeMsg{
144		cmp:   func(sp *serverPeer) bool { return sp.Addr() == addr },
145		reply: replyChan,
146	}
147	return <-replyChan
148}
149
150// ConnectedCount returns the number of currently connected peers.
151//
152// This function is safe for concurrent access and is part of the
153// rpcserverConnManager interface implementation.
154func (cm *rpcConnManager) ConnectedCount() int32 {
155	return cm.server.ConnectedCount()
156}
157
158// NetTotals returns the sum of all bytes received and sent across the network
159// for all peers.
160//
161// This function is safe for concurrent access and is part of the
162// rpcserverConnManager interface implementation.
163func (cm *rpcConnManager) NetTotals() (uint64, uint64) {
164	return cm.server.NetTotals()
165}
166
167// ConnectedPeers returns an array consisting of all connected peers.
168//
169// This function is safe for concurrent access and is part of the
170// rpcserverConnManager interface implementation.
171func (cm *rpcConnManager) ConnectedPeers() []rpcserverPeer {
172	replyChan := make(chan []*serverPeer)
173	cm.server.query <- getPeersMsg{reply: replyChan}
174	serverPeers := <-replyChan
175
176	// Convert to RPC server peers.
177	peers := make([]rpcserverPeer, 0, len(serverPeers))
178	for _, sp := range serverPeers {
179		peers = append(peers, (*rpcPeer)(sp))
180	}
181	return peers
182}
183
184// PersistentPeers returns an array consisting of all the added persistent
185// peers.
186//
187// This function is safe for concurrent access and is part of the
188// rpcserverConnManager interface implementation.
189func (cm *rpcConnManager) PersistentPeers() []rpcserverPeer {
190	replyChan := make(chan []*serverPeer)
191	cm.server.query <- getAddedNodesMsg{reply: replyChan}
192	serverPeers := <-replyChan
193
194	// Convert to generic peers.
195	peers := make([]rpcserverPeer, 0, len(serverPeers))
196	for _, sp := range serverPeers {
197		peers = append(peers, (*rpcPeer)(sp))
198	}
199	return peers
200}
201
202// BroadcastMessage sends the provided message to all currently connected peers.
203//
204// This function is safe for concurrent access and is part of the
205// rpcserverConnManager interface implementation.
206func (cm *rpcConnManager) BroadcastMessage(msg wire.Message) {
207	cm.server.BroadcastMessage(msg)
208}
209
210// AddRebroadcastInventory adds the provided inventory to the list of
211// inventories to be rebroadcast at random intervals until they show up in a
212// block.
213//
214// This function is safe for concurrent access and is part of the
215// rpcserverConnManager interface implementation.
216func (cm *rpcConnManager) AddRebroadcastInventory(iv *wire.InvVect, data interface{}) {
217	cm.server.AddRebroadcastInventory(iv, data)
218}
219
220// RelayTransactions generates and relays inventory vectors for all of the
221// passed transactions to all connected peers.
222func (cm *rpcConnManager) RelayTransactions(txns []*mempool.TxDesc) {
223	cm.server.relayTransactions(txns)
224}
225
226// rpcSyncMgr provides a block manager for use with the RPC server and
227// implements the rpcserverSyncManager interface.
228type rpcSyncMgr struct {
229	server  *server
230	syncMgr *netsync.SyncManager
231}
232
233// Ensure rpcSyncMgr implements the rpcserverSyncManager interface.
234var _ rpcserverSyncManager = (*rpcSyncMgr)(nil)
235
236// IsCurrent returns whether or not the sync manager believes the chain is
237// current as compared to the rest of the network.
238//
239// This function is safe for concurrent access and is part of the
240// rpcserverSyncManager interface implementation.
241func (b *rpcSyncMgr) IsCurrent() bool {
242	return b.syncMgr.IsCurrent()
243}
244
245// SubmitBlock submits the provided block to the network after processing it
246// locally.
247//
248// This function is safe for concurrent access and is part of the
249// rpcserverSyncManager interface implementation.
250func (b *rpcSyncMgr) SubmitBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) {
251	return b.syncMgr.ProcessBlock(block, flags)
252}
253
254// Pause pauses the sync manager until the returned channel is closed.
255//
256// This function is safe for concurrent access and is part of the
257// rpcserverSyncManager interface implementation.
258func (b *rpcSyncMgr) Pause() chan<- struct{} {
259	return b.syncMgr.Pause()
260}
261
262// SyncPeerID returns the peer that is currently the peer being used to sync
263// from.
264//
265// This function is safe for concurrent access and is part of the
266// rpcserverSyncManager interface implementation.
267func (b *rpcSyncMgr) SyncPeerID() int32 {
268	return b.syncMgr.SyncPeerID()
269}
270
271// LocateBlocks returns the hashes of the blocks after the first known block in
272// the provided locators until the provided stop hash or the current tip is
273// reached, up to a max of wire.MaxBlockHeadersPerMsg hashes.
274//
275// This function is safe for concurrent access and is part of the
276// rpcserverSyncManager interface implementation.
277func (b *rpcSyncMgr) LocateHeaders(locators []*chainhash.Hash, hashStop *chainhash.Hash) []wire.BlockHeader {
278	return b.server.chain.LocateHeaders(locators, hashStop)
279}
280