1// Copyright 2017 The go-ethereum Authors
2// This file is part of the go-ethereum library.
3//
4// The go-ethereum library is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Lesser General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// The go-ethereum library is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU Lesser General Public License for more details.
13//
14// You should have received a copy of the GNU Lesser General Public License
15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16
17package core
18
19import (
20	"errors"
21	"io"
22	"os"
23
24	"github.com/ethereum/go-ethereum/common"
25	"github.com/ethereum/go-ethereum/core/types"
26	"github.com/ethereum/go-ethereum/log"
27	"github.com/ethereum/go-ethereum/rlp"
28)
29
30// errNoActiveJournal is returned if a transaction is attempted to be inserted
31// into the journal, but no such file is currently open.
32var errNoActiveJournal = errors.New("no active journal")
33
34// devNull is a WriteCloser that just discards anything written into it. Its
35// goal is to allow the transaction journal to write into a fake journal when
36// loading transactions on startup without printing warnings due to no file
37// being read for write.
38type devNull struct{}
39
40func (*devNull) Write(p []byte) (n int, err error) { return len(p), nil }
41func (*devNull) Close() error                      { return nil }
42
43// txJournal is a rotating log of transactions with the aim of storing locally
44// created transactions to allow non-executed ones to survive node restarts.
45type txJournal struct {
46	path   string         // Filesystem path to store the transactions at
47	writer io.WriteCloser // Output stream to write new transactions into
48}
49
50// newTxJournal creates a new transaction journal to
51func newTxJournal(path string) *txJournal {
52	return &txJournal{
53		path: path,
54	}
55}
56
57// load parses a transaction journal dump from disk, loading its contents into
58// the specified pool.
59func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
60	// Skip the parsing if the journal file doesn't exist at all
61	if _, err := os.Stat(journal.path); os.IsNotExist(err) {
62		return nil
63	}
64	// Open the journal for loading any past transactions
65	input, err := os.Open(journal.path)
66	if err != nil {
67		return err
68	}
69	defer input.Close()
70
71	// Temporarily discard any journal additions (don't double add on load)
72	journal.writer = new(devNull)
73	defer func() { journal.writer = nil }()
74
75	// Inject all transactions from the journal into the pool
76	stream := rlp.NewStream(input, 0)
77	total, dropped := 0, 0
78
79	// Create a method to load a limited batch of transactions and bump the
80	// appropriate progress counters. Then use this method to load all the
81	// journaled transactions in small-ish batches.
82	loadBatch := func(txs types.Transactions) {
83		for _, err := range add(txs) {
84			if err != nil {
85				log.Debug("Failed to add journaled transaction", "err", err)
86				dropped++
87			}
88		}
89	}
90	var (
91		failure error
92		batch   types.Transactions
93	)
94	for {
95		// Parse the next transaction and terminate on error
96		tx := new(types.Transaction)
97		if err = stream.Decode(tx); err != nil {
98			if err != io.EOF {
99				failure = err
100			}
101			if batch.Len() > 0 {
102				loadBatch(batch)
103			}
104			break
105		}
106		// New transaction parsed, queue up for later, import if threshold is reached
107		total++
108
109		if batch = append(batch, tx); batch.Len() > 1024 {
110			loadBatch(batch)
111			batch = batch[:0]
112		}
113	}
114	log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)
115
116	return failure
117}
118
119// insert adds the specified transaction to the local disk journal.
120func (journal *txJournal) insert(tx *types.Transaction) error {
121	if journal.writer == nil {
122		return errNoActiveJournal
123	}
124	if err := rlp.Encode(journal.writer, tx); err != nil {
125		return err
126	}
127	return nil
128}
129
130// rotate regenerates the transaction journal based on the current contents of
131// the transaction pool.
132func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error {
133	// Close the current journal (if any is open)
134	if journal.writer != nil {
135		if err := journal.writer.Close(); err != nil {
136			return err
137		}
138		journal.writer = nil
139	}
140	// Generate a new journal with the contents of the current pool
141	replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
142	if err != nil {
143		return err
144	}
145	journaled := 0
146	for _, txs := range all {
147		for _, tx := range txs {
148			if err = rlp.Encode(replacement, tx); err != nil {
149				replacement.Close()
150				return err
151			}
152		}
153		journaled += len(txs)
154	}
155	replacement.Close()
156
157	// Replace the live journal with the newly generated one
158	if err = os.Rename(journal.path+".new", journal.path); err != nil {
159		return err
160	}
161	sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0644)
162	if err != nil {
163		return err
164	}
165	journal.writer = sink
166	log.Info("Regenerated local transaction journal", "transactions", journaled, "accounts", len(all))
167
168	return nil
169}
170
171// close flushes the transaction journal contents to disk and closes the file.
172func (journal *txJournal) close() error {
173	var err error
174
175	if journal.writer != nil {
176		err = journal.writer.Close()
177		journal.writer = nil
178	}
179	return err
180}
181