1// Copyright 2015 The go-ethereum Authors
2// This file is part of the go-ethereum library.
3//
4// The go-ethereum library is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Lesser General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// The go-ethereum library is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU Lesser General Public License for more details.
13//
14// You should have received a copy of the GNU Lesser General Public License
15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16
17package trie
18
19import (
20	"errors"
21	"fmt"
22
23	"github.com/ethereum/go-ethereum/common"
24	"github.com/ethereum/go-ethereum/common/prque"
25	"github.com/ethereum/go-ethereum/ethdb"
26)
27
28// ErrNotRequested is returned by the trie sync when it's requested to process a
29// node it did not request.
30var ErrNotRequested = errors.New("not requested")
31
32// ErrAlreadyProcessed is returned by the trie sync when it's requested to process a
33// node it already processed previously.
34var ErrAlreadyProcessed = errors.New("already processed")
35
36// request represents a scheduled or already in-flight state retrieval request.
37type request struct {
38	hash common.Hash // Hash of the node data content to retrieve
39	data []byte      // Data content of the node, cached until all subtrees complete
40	raw  bool        // Whether this is a raw entry (code) or a trie node
41
42	parents []*request // Parent state nodes referencing this entry (notify all upon completion)
43	depth   int        // Depth level within the trie the node is located to prioritise DFS
44	deps    int        // Number of dependencies before allowed to commit this node
45
46	callback LeafCallback // Callback to invoke if a leaf node it reached on this branch
47}
48
49// SyncResult is a simple list to return missing nodes along with their request
50// hashes.
51type SyncResult struct {
52	Hash common.Hash // Hash of the originally unknown trie node
53	Data []byte      // Data content of the retrieved node
54}
55
56// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
57// persisted data items.
58type syncMemBatch struct {
59	batch map[common.Hash][]byte // In-memory membatch of recently completed items
60	order []common.Hash          // Order of completion to prevent out-of-order data loss
61}
62
63// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
64func newSyncMemBatch() *syncMemBatch {
65	return &syncMemBatch{
66		batch: make(map[common.Hash][]byte),
67		order: make([]common.Hash, 0, 256),
68	}
69}
70
71// Sync is the main state trie synchronisation scheduler, which provides yet
72// unknown trie hashes to retrieve, accepts node data associated with said hashes
73// and reconstructs the trie step by step until all is done.
74type Sync struct {
75	database DatabaseReader           // Persistent database to check for existing entries
76	membatch *syncMemBatch            // Memory buffer to avoid frequent database writes
77	requests map[common.Hash]*request // Pending requests pertaining to a key hash
78	queue    *prque.Prque             // Priority queue with the pending requests
79}
80
81// NewSync creates a new trie data download scheduler.
82func NewSync(root common.Hash, database DatabaseReader, callback LeafCallback) *Sync {
83	ts := &Sync{
84		database: database,
85		membatch: newSyncMemBatch(),
86		requests: make(map[common.Hash]*request),
87		queue:    prque.New(nil),
88	}
89	ts.AddSubTrie(root, 0, common.Hash{}, callback)
90	return ts
91}
92
93// AddSubTrie registers a new trie to the sync code, rooted at the designated parent.
94func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback LeafCallback) {
95	// Short circuit if the trie is empty or already known
96	if root == emptyRoot {
97		return
98	}
99	if _, ok := s.membatch.batch[root]; ok {
100		return
101	}
102	key := root.Bytes()
103	blob, _ := s.database.Get(key)
104	if local, err := decodeNode(key, blob, 0); local != nil && err == nil {
105		return
106	}
107	// Assemble the new sub-trie sync request
108	req := &request{
109		hash:     root,
110		depth:    depth,
111		callback: callback,
112	}
113	// If this sub-trie has a designated parent, link them together
114	if parent != (common.Hash{}) {
115		ancestor := s.requests[parent]
116		if ancestor == nil {
117			panic(fmt.Sprintf("sub-trie ancestor not found: %x", parent))
118		}
119		ancestor.deps++
120		req.parents = append(req.parents, ancestor)
121	}
122	s.schedule(req)
123}
124
125// AddRawEntry schedules the direct retrieval of a state entry that should not be
126// interpreted as a trie node, but rather accepted and stored into the database
127// as is. This method's goal is to support misc state metadata retrievals (e.g.
128// contract code).
129func (s *Sync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) {
130	// Short circuit if the entry is empty or already known
131	if hash == emptyState {
132		return
133	}
134	if _, ok := s.membatch.batch[hash]; ok {
135		return
136	}
137	if ok, _ := s.database.Has(hash.Bytes()); ok {
138		return
139	}
140	// Assemble the new sub-trie sync request
141	req := &request{
142		hash:  hash,
143		raw:   true,
144		depth: depth,
145	}
146	// If this sub-trie has a designated parent, link them together
147	if parent != (common.Hash{}) {
148		ancestor := s.requests[parent]
149		if ancestor == nil {
150			panic(fmt.Sprintf("raw-entry ancestor not found: %x", parent))
151		}
152		ancestor.deps++
153		req.parents = append(req.parents, ancestor)
154	}
155	s.schedule(req)
156}
157
158// Missing retrieves the known missing nodes from the trie for retrieval.
159func (s *Sync) Missing(max int) []common.Hash {
160	requests := []common.Hash{}
161	for !s.queue.Empty() && (max == 0 || len(requests) < max) {
162		requests = append(requests, s.queue.PopItem().(common.Hash))
163	}
164	return requests
165}
166
167// Process injects a batch of retrieved trie nodes data, returning if something
168// was committed to the database and also the index of an entry if processing of
169// it failed.
170func (s *Sync) Process(results []SyncResult) (bool, int, error) {
171	committed := false
172
173	for i, item := range results {
174		// If the item was not requested, bail out
175		request := s.requests[item.Hash]
176		if request == nil {
177			return committed, i, ErrNotRequested
178		}
179		if request.data != nil {
180			return committed, i, ErrAlreadyProcessed
181		}
182		// If the item is a raw entry request, commit directly
183		if request.raw {
184			request.data = item.Data
185			s.commit(request)
186			committed = true
187			continue
188		}
189		// Decode the node data content and update the request
190		node, err := decodeNode(item.Hash[:], item.Data, 0)
191		if err != nil {
192			return committed, i, err
193		}
194		request.data = item.Data
195
196		// Create and schedule a request for all the children nodes
197		requests, err := s.children(request, node)
198		if err != nil {
199			return committed, i, err
200		}
201		if len(requests) == 0 && request.deps == 0 {
202			s.commit(request)
203			committed = true
204			continue
205		}
206		request.deps += len(requests)
207		for _, child := range requests {
208			s.schedule(child)
209		}
210	}
211	return committed, 0, nil
212}
213
214// Commit flushes the data stored in the internal membatch out to persistent
215// storage, returning the number of items written and any occurred error.
216func (s *Sync) Commit(dbw ethdb.Putter) (int, error) {
217	// Dump the membatch into a database dbw
218	for i, key := range s.membatch.order {
219		if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil {
220			return i, err
221		}
222	}
223	written := len(s.membatch.order)
224
225	// Drop the membatch data and return
226	s.membatch = newSyncMemBatch()
227	return written, nil
228}
229
230// Pending returns the number of state entries currently pending for download.
231func (s *Sync) Pending() int {
232	return len(s.requests)
233}
234
235// schedule inserts a new state retrieval request into the fetch queue. If there
236// is already a pending request for this node, the new request will be discarded
237// and only a parent reference added to the old one.
238func (s *Sync) schedule(req *request) {
239	// If we're already requesting this node, add a new reference and stop
240	if old, ok := s.requests[req.hash]; ok {
241		old.parents = append(old.parents, req.parents...)
242		return
243	}
244	// Schedule the request for future retrieval
245	s.queue.Push(req.hash, int64(req.depth))
246	s.requests[req.hash] = req
247}
248
249// children retrieves all the missing children of a state trie entry for future
250// retrieval scheduling.
251func (s *Sync) children(req *request, object node) ([]*request, error) {
252	// Gather all the children of the node, irrelevant whether known or not
253	type child struct {
254		node  node
255		depth int
256	}
257	children := []child{}
258
259	switch node := (object).(type) {
260	case *shortNode:
261		children = []child{{
262			node:  node.Val,
263			depth: req.depth + len(node.Key),
264		}}
265	case *fullNode:
266		for i := 0; i < 17; i++ {
267			if node.Children[i] != nil {
268				children = append(children, child{
269					node:  node.Children[i],
270					depth: req.depth + 1,
271				})
272			}
273		}
274	default:
275		panic(fmt.Sprintf("unknown node: %+v", node))
276	}
277	// Iterate over the children, and request all unknown ones
278	requests := make([]*request, 0, len(children))
279	for _, child := range children {
280		// Notify any external watcher of a new key/value node
281		if req.callback != nil {
282			if node, ok := (child.node).(valueNode); ok {
283				if err := req.callback(node, req.hash); err != nil {
284					return nil, err
285				}
286			}
287		}
288		// If the child references another node, resolve or schedule
289		if node, ok := (child.node).(hashNode); ok {
290			// Try to resolve the node from the local database
291			hash := common.BytesToHash(node)
292			if _, ok := s.membatch.batch[hash]; ok {
293				continue
294			}
295			if ok, _ := s.database.Has(node); ok {
296				continue
297			}
298			// Locally unknown node, schedule for retrieval
299			requests = append(requests, &request{
300				hash:     hash,
301				parents:  []*request{req},
302				depth:    child.depth,
303				callback: req.callback,
304			})
305		}
306	}
307	return requests, nil
308}
309
310// commit finalizes a retrieval request and stores it into the membatch. If any
311// of the referencing parent requests complete due to this commit, they are also
312// committed themselves.
313func (s *Sync) commit(req *request) (err error) {
314	// Write the node content to the membatch
315	s.membatch.batch[req.hash] = req.data
316	s.membatch.order = append(s.membatch.order, req.hash)
317
318	delete(s.requests, req.hash)
319
320	// Check all parents for completion
321	for _, parent := range req.parents {
322		parent.deps--
323		if parent.deps == 0 {
324			if err := s.commit(parent); err != nil {
325				return err
326			}
327		}
328	}
329	return nil
330}
331