1// SPDX-License-Identifier: ISC
2// Copyright (c) 2014-2020 Bitmark Inc.
3// Use of this source code is governed by an ISC
4// license that can be found in the LICENSE file.
5
6package reservoir
7
8import (
9	"path"
10	"sync"
11	"time"
12
13	"github.com/bitmark-inc/bitmarkd/account"
14	"github.com/bitmark-inc/bitmarkd/background"
15	"github.com/bitmark-inc/bitmarkd/blockrecord"
16	"github.com/bitmark-inc/bitmarkd/currency"
17	"github.com/bitmark-inc/bitmarkd/difficulty"
18	"github.com/bitmark-inc/bitmarkd/fault"
19	"github.com/bitmark-inc/bitmarkd/merkle"
20	"github.com/bitmark-inc/bitmarkd/ownership"
21	"github.com/bitmark-inc/bitmarkd/pay"
22	"github.com/bitmark-inc/bitmarkd/storage"
23	"github.com/bitmark-inc/bitmarkd/transactionrecord"
24	"github.com/bitmark-inc/logger"
25)
26
27// various limiting constants
28const (
29	MaximumIssues = 100 // maximum allowable issues per block
30)
31
32// internal limiting constants
33const (
34	maximumPendingFreeIssues   = blockrecord.MaximumTransactions * 2
35	maximumPendingPaidIssues   = blockrecord.MaximumTransactions * 2
36	maximumPendingTransactions = blockrecord.MaximumTransactions * 16
37)
38
39// the cache file
40const reservoirFile = "reservoir.cache"
41
42// single transactions of any type
43type transactionData struct {
44	txId        merkle.Digest                 // transaction id
45	transaction transactionrecord.Transaction // unpacked transaction
46	packed      transactionrecord.Packed      // transaction bytes
47}
48
49// key: pay id
50type transactionPaymentData struct {
51	tx        *transactionData                       // record on this pay id
52	payId     pay.PayId                              // for payment matching
53	payments  []transactionrecord.PaymentAlternative // required payment
54	expiresAt time.Time                              // only used in pending state
55}
56
57// key: pay id
58type issuePaymentData struct {
59	txs       []*transactionData                     // all records on this pay id
60	payId     pay.PayId                              // for payment matching
61	payments  []transactionrecord.PaymentAlternative // issue existing asset, or other records
62	expiresAt time.Time                              // only used in pending state
63}
64
65// key: pay id
66type issueFreeData struct {
67	txs        []*transactionData     // all records on this pay id
68	payId      pay.PayId              // for payment matching
69	nonce      PayNonce               // only free issue, client nonce from successful try proof RPC
70	difficulty *difficulty.Difficulty // only free issue, to test client nonce
71	expiresAt  time.Time              // only used in pending state
72}
73
74// PaymentDetail - a payment record for a single currency
75type PaymentDetail struct {
76	Currency currency.Currency // code number
77	TxID     string            // tx id on currency blockchain
78	Amounts  map[string]uint64 // address(Base58) → value(Satoshis)
79}
80
81// track the shares
82type spendKey struct {
83	owner [64]byte
84	share merkle.Digest
85}
86
87// Handles - storage handles used when restore from cache file
88type Handles struct {
89	Assets            storage.Handle
90	BlockOwnerPayment storage.Handle
91	Blocks            storage.Handle
92	Transactions      storage.Handle
93	OwnerTxIndex      storage.Handle
94	OwnerData         storage.Handle
95	Shares            storage.Handle
96	ShareQuantity     storage.Handle
97}
98
99type globalDataType struct {
100	sync.RWMutex
101
102	// to prevent fetch during critical operations
103	enabled bool
104
105	filename string
106
107	log *logger.L
108
109	background *background.T
110
111	// separate verified pools
112	verifiedTransactions map[pay.PayId]*transactionData  // normal transactions
113	verifiedFreeIssues   map[pay.PayId]*issueFreeData    // so proof can be recreated
114	verifiedPaidIssues   map[pay.PayId]*issuePaymentData // so block can be confirmed as a whole
115	verifiedIndex        map[merkle.Digest]pay.PayId     // tx id → pay id
116
117	// Link -> TxId to check for double spend
118	inProgressLinks map[merkle.Digest]merkle.Digest
119
120	// separate pending pools
121	pendingTransactions map[pay.PayId]*transactionPaymentData
122	pendingFreeIssues   map[pay.PayId]*issueFreeData
123	pendingPaidIssues   map[pay.PayId]*issuePaymentData
124	pendingIndex        map[merkle.Digest]pay.PayId // tx id → pay is
125
126	pendingFreeCount int
127	pendingPaidCount int
128
129	// payments that are valid but have no pending record
130	// ***** FIX THIS: need to expire
131	orphanPayments map[pay.PayId]*PaymentDetail
132
133	// tracking the shares
134	spend map[spendKey]uint64
135
136	// set once during initialise
137	initialised bool
138
139	handles Handles
140}
141
142// globals as a struct to allow lock
143var globalData globalDataType
144
145func (g *globalDataType) StoreTransfer(transfer transactionrecord.BitmarkTransfer) (*TransferInfo, bool, error) {
146	return storeTransfer(
147		transfer,
148		g.handles.Transactions,
149		g.handles.OwnerTxIndex,
150		g.handles.OwnerData,
151		g.handles.BlockOwnerPayment,
152	)
153}
154
155func (g *globalDataType) StoreIssues(issues []*transactionrecord.BitmarkIssue) (*IssueInfo, bool, error) {
156	return storeIssues(
157		issues,
158		g.handles.Assets,
159		g.handles.BlockOwnerPayment,
160	)
161}
162
163func (g *globalDataType) TryProof(payID pay.PayId, clientNonce []byte) TrackingStatus {
164	return tryProof(payID, clientNonce)
165}
166
167func (g *globalDataType) TransactionStatus(txID merkle.Digest) TransactionState {
168	return transactionStatus(txID)
169}
170
171func (g *globalDataType) ShareBalance(owner *account.Account, startSharedID merkle.Digest, count int) ([]BalanceInfo, error) {
172	return shareBalance(owner, startSharedID, count, g.handles.ShareQuantity)
173}
174
175func (g *globalDataType) StoreGrant(grant *transactionrecord.ShareGrant) (*GrantInfo, bool, error) {
176	return storeGrant(
177		grant,
178		g.handles.ShareQuantity,
179		g.handles.Shares,
180		g.handles.OwnerData,
181		g.handles.BlockOwnerPayment,
182		g.handles.Transactions,
183	)
184}
185
186func (g *globalDataType) StoreSwap(swap *transactionrecord.ShareSwap) (*SwapInfo, bool, error) {
187	return storeSwap(
188		swap,
189		g.handles.ShareQuantity,
190		g.handles.Shares,
191		g.handles.OwnerData,
192		g.handles.BlockOwnerPayment,
193	)
194}
195
196// Reservoir - APIs
197type Reservoir interface {
198	StoreTransfer(transactionrecord.BitmarkTransfer) (*TransferInfo, bool, error)
199	StoreIssues(issues []*transactionrecord.BitmarkIssue) (*IssueInfo, bool, error)
200	TryProof(pay.PayId, []byte) TrackingStatus
201	TransactionStatus(merkle.Digest) TransactionState
202	ShareBalance(*account.Account, merkle.Digest, int) ([]BalanceInfo, error)
203	StoreGrant(*transactionrecord.ShareGrant) (*GrantInfo, bool, error)
204	StoreSwap(swap *transactionrecord.ShareSwap) (*SwapInfo, bool, error)
205}
206
207// Get - return reservoir APIs
208func Get() Reservoir {
209	if globalData.initialised {
210		return &globalData
211	}
212
213	return nil
214}
215
216// Initialise - create the cache
217func Initialise(cacheDirectory string, handles Handles) error {
218	globalData.Lock()
219	defer globalData.Unlock()
220
221	// no need to start if already started
222	if globalData.initialised {
223		return fault.AlreadyInitialised
224	}
225
226	globalData.log = logger.New("reservoir")
227	globalData.log.Info("starting…")
228
229	globalData.inProgressLinks = make(map[merkle.Digest]merkle.Digest)
230
231	globalData.verifiedTransactions = make(map[pay.PayId]*transactionData)
232	globalData.verifiedFreeIssues = make(map[pay.PayId]*issueFreeData)
233	globalData.verifiedPaidIssues = make(map[pay.PayId]*issuePaymentData)
234	globalData.verifiedIndex = make(map[merkle.Digest]pay.PayId)
235
236	globalData.pendingTransactions = make(map[pay.PayId]*transactionPaymentData)
237	globalData.pendingFreeIssues = make(map[pay.PayId]*issueFreeData)
238	globalData.pendingPaidIssues = make(map[pay.PayId]*issuePaymentData)
239	globalData.pendingIndex = make(map[merkle.Digest]pay.PayId)
240
241	globalData.pendingFreeCount = 0
242	globalData.pendingPaidCount = 0
243
244	globalData.orphanPayments = make(map[pay.PayId]*PaymentDetail)
245
246	globalData.spend = make(map[spendKey]uint64)
247
248	globalData.enabled = true
249
250	globalData.filename = path.Join(cacheDirectory, reservoirFile)
251
252	globalData.handles = handles
253
254	// all data initialised
255	globalData.initialised = true
256
257	globalData.log.Debugf("load from file: %s", globalData.filename)
258
259	// start background processes
260	globalData.log.Info("start background…")
261
262	processes := background.Processes{
263		&rebroadcaster{},
264		&cleaner{},
265	}
266
267	globalData.background = background.Start(processes, nil)
268
269	return nil
270}
271
272// Finalise - stop all background processes
273func Finalise() error {
274
275	if !globalData.initialised {
276		return fault.NotInitialised
277	}
278
279	globalData.log.Info("shutting down…")
280	globalData.log.Flush()
281
282	// stop background
283	globalData.background.Stop()
284
285	// save data
286	saveToFile()
287
288	// finally...
289	globalData.initialised = false
290
291	globalData.log.Info("finished")
292	globalData.log.Flush()
293
294	return nil
295}
296
297// ReadCounters - for API to get status data
298func ReadCounters() (int, int) {
299	globalData.RLock()
300	pending := len(globalData.pendingIndex)
301	verified := len(globalData.verifiedIndex)
302	globalData.RUnlock()
303	return pending, verified
304}
305
306// TransactionState - status enumeration
307type TransactionState int
308
309// list of all states
310const (
311	StateUnknown   TransactionState = iota
312	StatePending   TransactionState = iota
313	StateVerified  TransactionState = iota
314	StateConfirmed TransactionState = iota
315)
316
317// String - string representation of a transaction state
318func (state TransactionState) String() string {
319	switch state {
320	case StateUnknown:
321		return "Unknown"
322	case StatePending:
323		return "Pending"
324	case StateVerified:
325		return "Verified"
326	case StateConfirmed:
327		return "Confirmed"
328	default:
329		return "Unknown"
330	}
331}
332
333// transactionStatus - get status of a transaction
334func transactionStatus(txId merkle.Digest) TransactionState {
335	globalData.RLock()
336	defer globalData.RUnlock()
337
338	_, ok := globalData.pendingIndex[txId]
339	if ok {
340		return StatePending
341	}
342
343	_, ok = globalData.verifiedIndex[txId]
344	if ok {
345		return StateVerified
346	}
347
348	if storage.Pool.Transactions.Has(txId[:]) {
349		return StateConfirmed
350	}
351
352	return StateUnknown
353}
354
355// move transaction(s) to verified cache
356func setVerified(payId pay.PayId, detail *PaymentDetail) bool {
357
358	if nil == detail {
359		globalData.log.Warn("payment was not provided")
360		return false
361	}
362
363	globalData.log.Infof("detail: currency: %s, amounts: %#v", detail.Currency, detail.Amounts)
364
365	// single transaction
366	if entry, ok := globalData.pendingTransactions[payId]; ok {
367		if !acceptablePayment(detail, entry.payments) {
368			globalData.log.Warnf("single transaction failed check for txid: %s  payid: %s", detail.TxID, payId)
369			return false
370		}
371		globalData.log.Infof("paid txid: %s  payid: %s", detail.TxID, payId)
372
373		delete(globalData.pendingTransactions, payId)
374		globalData.verifiedTransactions[payId] = entry.tx
375
376		txId := entry.tx.txId
377
378		delete(globalData.pendingIndex, txId)
379		globalData.verifiedIndex[txId] = payId
380
381		return true
382	}
383
384	// issue block
385	if entry, ok := globalData.pendingPaidIssues[payId]; ok {
386		if !acceptablePayment(detail, entry.payments) {
387			globalData.log.Warnf("issue block failed check for txid: %s  payid: %s", detail.TxID, payId)
388			return false
389		}
390		globalData.log.Infof("paid txid: %s  payid: %s", detail.TxID, payId)
391
392		globalData.pendingPaidCount -= len(entry.txs)
393		delete(globalData.pendingPaidIssues, payId)
394		globalData.verifiedPaidIssues[payId] = entry
395
396		for _, tx := range entry.txs {
397			txId := tx.txId
398
399			delete(globalData.pendingIndex, txId)
400			globalData.verifiedIndex[txId] = payId
401		}
402
403		return true
404	}
405
406	return false
407}
408
409// check that the incoming payment details match the stored payments records
410func acceptablePayment(detail *PaymentDetail, payments []transactionrecord.PaymentAlternative) bool {
411
412next_currency:
413	for _, p := range payments {
414		acceptable := true
415		for _, item := range p {
416			if item.Currency != detail.Currency {
417				continue next_currency
418			}
419			if detail.Amounts[item.Address] < item.Amount {
420				acceptable = false
421			}
422		}
423		if acceptable {
424			return true
425		}
426	}
427	return false
428}
429
430// SetTransferVerified - set verified if transaction found, otherwise preserv payment for later
431func SetTransferVerified(payId pay.PayId, detail *PaymentDetail) {
432	globalData.log.Infof("txid: %s  payid: %s", detail.TxID, payId)
433
434	globalData.Lock()
435	if !setVerified(payId, detail) {
436		globalData.log.Debugf("orphan payment: txid: %s  payid: %s", detail.TxID, payId)
437		globalData.orphanPayments[payId] = detail
438	}
439	globalData.Unlock()
440}
441
442// Disable - lock down to prevent proofer from getting data
443func Disable() {
444	globalData.Lock()
445	globalData.enabled = false
446	globalData.Unlock()
447}
448
449// Enable - allow proofer to run again
450func Enable() {
451	globalData.Lock()
452	globalData.enabled = true
453	globalData.Unlock()
454}
455
456// ClearSpend - reset spend map
457func ClearSpend() {
458	globalData.Lock()
459	defer globalData.Unlock()
460
461	if globalData.enabled {
462		logger.Panic("reservoir clear spend when not locked")
463	}
464
465	globalData.spend = make(map[spendKey]uint64)
466}
467
468// Rescan - before calling Enable may need to run rescan to drop any
469// invalidated transactions especially if the block height has changed
470func Rescan() {
471	globalData.Lock()
472	defer globalData.Unlock()
473
474	//empty the spend map
475	globalData.spend = make(map[spendKey]uint64)
476
477	// pending
478
479	for _, item := range globalData.pendingTransactions {
480		rescanItem(item.tx)
481	}
482	for _, item := range globalData.pendingFreeIssues {
483		for _, tx := range item.txs {
484			rescanItem(tx)
485		}
486	}
487	for _, item := range globalData.pendingPaidIssues {
488		for _, tx := range item.txs {
489			rescanItem(tx)
490		}
491	}
492
493	// verified
494
495	for _, tx := range globalData.verifiedTransactions {
496		rescanItem(tx)
497	}
498	for _, item := range globalData.verifiedFreeIssues {
499		for _, tx := range item.txs {
500			rescanItem(tx)
501		}
502	}
503	for _, item := range globalData.verifiedPaidIssues {
504		for _, tx := range item.txs {
505			rescanItem(tx)
506		}
507	}
508}
509
510func rescanItem(item *transactionData) {
511
512	txId := item.txId
513
514	// repack records to check signature is valid
515	switch tx := item.transaction.(type) {
516
517	case *transactionrecord.OldBaseData:
518		// should never be in the memory pool - so panic
519		logger.Panic("reservoir: rescan found: OldBaseData")
520
521	case *transactionrecord.AssetData:
522		// should never be in the memory pool - so panic
523		logger.Panic("reservoir: rescan found: AssetData")
524
525	case *transactionrecord.BitmarkIssue:
526		if storage.Pool.Transactions.Has(txId[:]) {
527			internalDeleteByTxId(txId)
528		}
529
530	case *transactionrecord.BitmarkTransferUnratified, *transactionrecord.BitmarkTransferCountersigned:
531		tr := tx.(transactionrecord.BitmarkTransfer)
532		link := tr.GetLink()
533		_, linkOwner := ownership.OwnerOf(nil, link)
534		if nil == linkOwner || !ownership.CurrentlyOwns(nil, linkOwner, link, storage.Pool.OwnerTxIndex) {
535			internalDeleteByTxId(txId)
536		}
537
538	case *transactionrecord.BlockFoundation:
539		// should never be in the memory pool - so panic
540		logger.Panic("reservoir: rescan found: BlockFoundation")
541
542	case *transactionrecord.BlockOwnerTransfer:
543		link := tx.Link
544		_, linkOwner := ownership.OwnerOf(nil, link)
545		if nil == linkOwner || !ownership.CurrentlyOwns(nil, linkOwner, link, storage.Pool.OwnerTxIndex) {
546			internalDeleteByTxId(txId)
547		}
548
549	case *transactionrecord.BitmarkShare:
550		link := tx.Link
551		_, linkOwner := ownership.OwnerOf(nil, link)
552		if nil == linkOwner || !ownership.CurrentlyOwns(nil, linkOwner, link, storage.Pool.OwnerTxIndex) {
553			internalDeleteByTxId(txId)
554		}
555
556	case *transactionrecord.ShareGrant:
557		_, err := CheckGrantBalance(nil, tx, storage.Pool.ShareQuantity)
558		if nil != err {
559			internalDeleteByTxId(txId)
560		} else {
561			k := makeSpendKey(tx.Owner, tx.ShareId)
562			globalData.spend[k] += tx.Quantity
563		}
564
565	case *transactionrecord.ShareSwap:
566		_, _, err := CheckSwapBalances(nil, tx, storage.Pool.ShareQuantity)
567		if nil != err {
568			internalDeleteByTxId(txId)
569		} else {
570			k := makeSpendKey(tx.OwnerOne, tx.ShareIdOne)
571			globalData.spend[k] += tx.QuantityOne
572			k = makeSpendKey(tx.OwnerTwo, tx.ShareIdTwo)
573			globalData.spend[k] += tx.QuantityTwo
574		}
575
576	default:
577		// undefined data in the memory pool - so panic
578		globalData.log.Criticalf("reservoir rescan unhandled transaction: %v", tx)
579		logger.Panicf("unhandled transaction: %v", tx)
580	}
581}
582
583// DeleteByTxId - remove a record using a transaction id
584// note, remove one issue in a block removes the whole issue block
585func DeleteByTxId(txId merkle.Digest) {
586	globalData.Lock()
587	defer globalData.Unlock()
588
589	if globalData.enabled {
590		logger.Panic("reservoir delete tx id when not locked")
591	}
592
593	internalDeleteByTxId(txId)
594}
595
596// non-locking version of above
597func internalDeleteByTxId(txId merkle.Digest) {
598	if payId, ok := globalData.pendingIndex[txId]; ok {
599		internalDelete(payId)
600	}
601	if payId, ok := globalData.verifiedIndex[txId]; ok {
602		internalDelete(payId)
603	}
604}
605
606// DeleteByLink - remove a record using a link id
607func DeleteByLink(link merkle.Digest) {
608	globalData.Lock()
609	defer globalData.Unlock()
610
611	if globalData.enabled {
612		logger.Panic("reservoir delete link when not locked")
613	}
614	if txId, ok := globalData.inProgressLinks[link]; ok {
615		if payId, ok := globalData.pendingIndex[txId]; ok {
616			internalDelete(payId)
617		}
618		if payId, ok := globalData.verifiedIndex[txId]; ok {
619			internalDelete(payId)
620		}
621	}
622}
623
624// delete all buffered transactions relating to a pay id
625// (after it has been confirmed)
626// Lock must be held before calling this
627func internalDelete(payId pay.PayId) {
628
629	// pending
630
631	if entry, ok := globalData.pendingTransactions[payId]; ok {
632		delete(globalData.pendingIndex, entry.tx.txId)
633		if transfer, ok := entry.tx.transaction.(transactionrecord.BitmarkTransfer); ok {
634			link := transfer.GetLink()
635			delete(globalData.inProgressLinks, link)
636		}
637		delete(globalData.pendingTransactions, payId)
638	}
639
640	if entry, ok := globalData.pendingFreeIssues[payId]; ok {
641		for _, tx := range entry.txs {
642			delete(globalData.pendingIndex, tx.txId)
643		}
644		globalData.pendingFreeCount -= len(entry.txs)
645		delete(globalData.pendingFreeIssues, payId)
646	}
647
648	if entry, ok := globalData.pendingPaidIssues[payId]; ok {
649		for _, tx := range entry.txs {
650			delete(globalData.pendingIndex, tx.txId)
651		}
652		globalData.pendingPaidCount -= len(entry.txs)
653		delete(globalData.pendingPaidIssues, payId)
654	}
655
656	// verified
657
658	if entry, ok := globalData.verifiedTransactions[payId]; ok {
659		delete(globalData.verifiedIndex, entry.txId)
660		if transfer, ok := entry.transaction.(transactionrecord.BitmarkTransfer); ok {
661			link := transfer.GetLink()
662			delete(globalData.inProgressLinks, link)
663		}
664		delete(globalData.verifiedTransactions, payId)
665	}
666
667	if entry, ok := globalData.verifiedFreeIssues[payId]; ok {
668		for _, tx := range entry.txs {
669			delete(globalData.verifiedIndex, tx.txId)
670		}
671		delete(globalData.verifiedFreeIssues, payId)
672	}
673
674	if entry, ok := globalData.verifiedPaidIssues[payId]; ok {
675		for _, tx := range entry.txs {
676			delete(globalData.verifiedIndex, tx.txId)
677		}
678		delete(globalData.verifiedPaidIssues, payId)
679	}
680}
681