1// SPDX-License-Identifier: ISC
2// Copyright (c) 2014-2020 Bitmark Inc.
3// Use of this source code is governed by an ISC
4// license that can be found in the LICENSE file.
5
6package peer
7
8import (
9	"encoding/binary"
10
11	"github.com/bitmark-inc/bitmarkd/announce"
12	"github.com/bitmark-inc/bitmarkd/asset"
13	"github.com/bitmark-inc/bitmarkd/fault"
14	"github.com/bitmark-inc/bitmarkd/messagebus"
15	"github.com/bitmark-inc/bitmarkd/mode"
16	"github.com/bitmark-inc/bitmarkd/pay"
17	"github.com/bitmark-inc/bitmarkd/payment"
18	"github.com/bitmark-inc/bitmarkd/reservoir"
19	"github.com/bitmark-inc/bitmarkd/storage"
20	"github.com/bitmark-inc/bitmarkd/transactionrecord"
21	"github.com/bitmark-inc/logger"
22)
23
24// process received data
25func processSubscription(log *logger.L, command string, arguments [][]byte) {
26
27	dataLength := len(arguments)
28	switch command {
29	case "block":
30		if dataLength < 1 {
31			log.Debugf("block with too few data: %d items", dataLength)
32			return
33		}
34		log.Infof("received block: %x", arguments[0])
35		if !mode.Is(mode.Normal) {
36			err := fault.NotAvailableDuringSynchronise
37			log.Debugf("failed assets: error: %s", err)
38		} else {
39			messagebus.Bus.Blockstore.Send("remote", arguments[0])
40		}
41
42	case "assets":
43		if dataLength < 1 {
44			log.Debugf("assets with too few data: %d items", dataLength)
45			return
46		}
47		log.Infof("received assets: %x", arguments[0])
48		err := processAssets(arguments[0])
49		if nil != err {
50			log.Debugf("failed assets: error: %s", err)
51		} else {
52			messagebus.Bus.Broadcast.Send("assets", arguments[0])
53		}
54
55	case "issues":
56		if dataLength < 1 {
57			log.Debugf("issues with too few data: %d items", dataLength)
58			return
59		}
60		log.Infof("received issues: %x", arguments[0])
61		err := processIssues(arguments[0])
62		if nil != err {
63			log.Debugf("failed issues: error: %s", err)
64		} else {
65			messagebus.Bus.Broadcast.Send("issues", arguments[0])
66		}
67
68	case "transfer":
69		if dataLength < 1 {
70			log.Debugf("transfer with too few data: %d items", dataLength)
71			return
72		}
73		log.Infof("received transfer: %x", arguments[0])
74		err := processTransfer(arguments[0])
75		if nil != err {
76			log.Debugf("failed transfer: error: %s", err)
77		} else {
78			messagebus.Bus.Broadcast.Send("transfer", arguments[0])
79		}
80
81	case "proof":
82		if dataLength < 1 {
83			log.Debugf("proof with too few data: %d items", dataLength)
84			return
85		}
86		log.Infof("received proof: %x", arguments[0])
87		err := processProof(arguments[0])
88		if nil != err {
89			log.Debugf("failed proof: error: %s", err)
90		} else {
91			messagebus.Bus.Broadcast.Send("proof", arguments[0])
92		}
93
94	case "rpc":
95		if dataLength < 3 {
96			log.Debugf("rpc with too few data: %d items", dataLength)
97			return
98		}
99		if 8 != len(arguments[2]) {
100			log.Debug("rpc with invalid timestamp")
101			return
102		}
103		timestamp := binary.BigEndian.Uint64(arguments[2])
104		log.Infof("received rpc: fingerprint: %x  rpc: %x  timestamp: %d", arguments[0], arguments[1], timestamp)
105		if announce.AddRPC(arguments[0], arguments[1], timestamp) {
106			messagebus.Bus.Broadcast.Send("rpc", arguments[0:3]...)
107		}
108
109	case "peer":
110		if dataLength < 3 {
111			log.Debugf("peer with too few data: %d items", dataLength)
112			return
113		}
114		if 8 != len(arguments[2]) {
115			log.Debug("peer with invalid timestamp")
116			return
117		}
118		timestamp := binary.BigEndian.Uint64(arguments[2])
119		log.Infof("received peer: %x  listener: %x  timestamp: %d", arguments[0], arguments[1], timestamp)
120		if announce.AddPeer(arguments[0], arguments[1], timestamp) {
121			messagebus.Bus.Broadcast.Send("peer", arguments[0:3]...)
122		}
123
124	default:
125		log.Debugf("received unhandled command: %q arguments: %x", command, arguments)
126
127	}
128}
129
130// un pack each asset and cache them
131func processAssets(packed []byte) error {
132
133	if 0 == len(packed) {
134		return fault.MissingParameters
135	}
136
137	if !mode.Is(mode.Normal) {
138		return fault.NotAvailableDuringSynchronise
139	}
140
141	ok := false
142	for 0 != len(packed) {
143		transaction, n, err := transactionrecord.Packed(packed).Unpack(mode.IsTesting())
144		if nil != err {
145			return err
146		}
147
148		switch tx := transaction.(type) {
149		case *transactionrecord.AssetData:
150			_, packedAsset, err := asset.Cache(tx, storage.Pool.Assets)
151			if nil != err {
152				return err
153			}
154			if nil != packedAsset {
155				ok = true
156			}
157
158		default:
159			return fault.TransactionIsNotAnAsset
160		}
161		packed = packed[n:]
162	}
163
164	// all items were duplicates
165	if !ok {
166		return fault.NoNewTransactions
167	}
168	return nil
169}
170
171// un pack each issue and cache them
172func processIssues(packed []byte) error {
173
174	if 0 == len(packed) {
175		return fault.MissingParameters
176	}
177
178	if !mode.Is(mode.Normal) {
179		return fault.NotAvailableDuringSynchronise
180	}
181
182	packedIssues := transactionrecord.Packed(packed)
183	issueCount := 0 // for payment difficulty
184
185	issues := make([]*transactionrecord.BitmarkIssue, 0, reservoir.MaximumIssues)
186	for 0 != len(packedIssues) {
187		transaction, n, err := packedIssues.Unpack(mode.IsTesting())
188		if nil != err {
189			return err
190		}
191
192		switch tx := transaction.(type) {
193		case *transactionrecord.BitmarkIssue:
194			issues = append(issues, tx)
195			issueCount += 1
196		default:
197			return fault.TransactionIsNotAnIssue
198		}
199		packedIssues = packedIssues[n:]
200	}
201	if 0 == len(issues) {
202		return fault.MissingParameters
203	}
204
205	rsvr := reservoir.Get()
206	_, duplicate, err := rsvr.StoreIssues(issues)
207	if nil != err {
208		return err
209	}
210
211	if duplicate {
212		return fault.TransactionAlreadyExists
213	}
214
215	return nil
216}
217
218// unpack transfer and process it
219func processTransfer(packed []byte) error {
220
221	if 0 == len(packed) {
222		return fault.MissingParameters
223	}
224
225	if !mode.Is(mode.Normal) {
226		return fault.NotAvailableDuringSynchronise
227	}
228
229	transaction, _, err := transactionrecord.Packed(packed).Unpack(mode.IsTesting())
230	if nil != err {
231		return err
232	}
233
234	duplicate := false
235
236	transfer, ok := transaction.(transactionrecord.BitmarkTransfer)
237
238	rsvr := reservoir.Get()
239	if ok {
240
241		_, duplicate, err = rsvr.StoreTransfer(transfer)
242
243	} else {
244		switch tx := transaction.(type) {
245
246		case *transactionrecord.ShareGrant:
247			_, duplicate, err = rsvr.StoreGrant(tx)
248
249		case *transactionrecord.ShareSwap:
250			_, duplicate, err = rsvr.StoreSwap(tx)
251
252		default:
253			return fault.TransactionIsNotATransfer
254		}
255	}
256
257	if nil != err {
258		return err
259	}
260
261	if duplicate {
262		return fault.TransactionAlreadyExists
263	}
264
265	return nil
266}
267
268// process proof block
269func processProof(packed []byte) error {
270
271	if 0 == len(packed) {
272		return fault.MissingParameters
273	}
274
275	if !mode.Is(mode.Normal) {
276		return fault.NotAvailableDuringSynchronise
277	}
278
279	var payId pay.PayId
280	nonceLength := len(packed) - len(payId) // could be negative
281	if nonceLength < payment.MinimumNonceLength || nonceLength > payment.MaximumNonceLength {
282		return fault.InvalidNonce
283	}
284
285	copy(payId[:], packed[:len(payId)])
286	nonce := packed[len(payId):]
287	rsvr := reservoir.Get()
288	status := rsvr.TryProof(payId, nonce)
289	if reservoir.TrackingAccepted != status {
290		// pay id already processed or was invalid
291		return fault.PayIdAlreadyUsed
292	}
293
294	return nil
295}
296