1package networkdb
2
3//go:generate protoc -I.:../vendor/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto
4
5import (
6	"context"
7	"fmt"
8	"net"
9	"os"
10	"strings"
11	"sync"
12	"time"
13
14	"github.com/armon/go-radix"
15	"github.com/docker/docker/pkg/stringid"
16	"github.com/docker/go-events"
17	"github.com/docker/libnetwork/types"
18	"github.com/hashicorp/memberlist"
19	"github.com/hashicorp/serf/serf"
20	"github.com/sirupsen/logrus"
21)
22
23const (
24	byTable int = 1 + iota
25	byNetwork
26)
27
28// NetworkDB instance drives the networkdb cluster and acts the broker
29// for cluster-scoped and network-scoped gossip and watches.
30type NetworkDB struct {
31	// The clocks MUST be the first things
32	// in this struct due to Golang issue #599.
33
34	// Global lamport clock for node network attach events.
35	networkClock serf.LamportClock
36
37	// Global lamport clock for table events.
38	tableClock serf.LamportClock
39
40	sync.RWMutex
41
42	// NetworkDB configuration.
43	config *Config
44
45	// All the tree index (byTable, byNetwork) that we maintain
46	// the db.
47	indexes map[int]*radix.Tree
48
49	// Memberlist we use to drive the cluster.
50	memberlist *memberlist.Memberlist
51
52	// List of all peer nodes in the cluster not-limited to any
53	// network.
54	nodes map[string]*node
55
56	// List of all peer nodes which have failed
57	failedNodes map[string]*node
58
59	// List of all peer nodes which have left
60	leftNodes map[string]*node
61
62	// A multi-dimensional map of network/node attachmemts. The
63	// first key is a node name and the second key is a network ID
64	// for the network that node is participating in.
65	networks map[string]map[string]*network
66
67	// A map of nodes which are participating in a given
68	// network. The key is a network ID.
69	networkNodes map[string][]string
70
71	// A table of ack channels for every node from which we are
72	// waiting for an ack.
73	bulkSyncAckTbl map[string]chan struct{}
74
75	// Broadcast queue for network event gossip.
76	networkBroadcasts *memberlist.TransmitLimitedQueue
77
78	// Broadcast queue for node event gossip.
79	nodeBroadcasts *memberlist.TransmitLimitedQueue
80
81	// A central context to stop all go routines running on
82	// behalf of the NetworkDB instance.
83	ctx       context.Context
84	cancelCtx context.CancelFunc
85
86	// A central broadcaster for all local watchers watching table
87	// events.
88	broadcaster *events.Broadcaster
89
90	// List of all tickers which needed to be stopped when
91	// cleaning up.
92	tickers []*time.Ticker
93
94	// Reference to the memberlist's keyring to add & remove keys
95	keyring *memberlist.Keyring
96
97	// bootStrapIP is the list of IPs that can be used to bootstrap
98	// the gossip.
99	bootStrapIP []net.IP
100
101	// lastStatsTimestamp is the last timestamp when the stats got printed
102	lastStatsTimestamp time.Time
103
104	// lastHealthTimestamp is the last timestamp when the health score got printed
105	lastHealthTimestamp time.Time
106}
107
108// PeerInfo represents the peer (gossip cluster) nodes of a network
109type PeerInfo struct {
110	Name string
111	IP   string
112}
113
114// PeerClusterInfo represents the peer (gossip cluster) nodes
115type PeerClusterInfo struct {
116	PeerInfo
117}
118
119type node struct {
120	memberlist.Node
121	ltime serf.LamportTime
122	// Number of hours left before the reaper removes the node
123	reapTime time.Duration
124}
125
126// network describes the node/network attachment.
127type network struct {
128	// Network ID
129	id string
130
131	// Lamport time for the latest state of the entry.
132	ltime serf.LamportTime
133
134	// Node leave is in progress.
135	leaving bool
136
137	// Number of seconds still left before a deleted network entry gets
138	// removed from networkDB
139	reapTime time.Duration
140
141	// The broadcast queue for table event gossip. This is only
142	// initialized for this node's network attachment entries.
143	tableBroadcasts *memberlist.TransmitLimitedQueue
144
145	// Number of gossip messages sent related to this network during the last stats collection period
146	qMessagesSent int
147
148	// Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
149	// Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
150	// interval
151	entriesNumber int
152}
153
154// Config represents the configuration of the networdb instance and
155// can be passed by the caller.
156type Config struct {
157	// NodeID is the node unique identifier of the node when is part of the cluster
158	NodeID string
159
160	// Hostname is the node hostname.
161	Hostname string
162
163	// BindAddr is the IP on which networkdb listens. It can be
164	// 0.0.0.0 to listen on all addresses on the host.
165	BindAddr string
166
167	// AdvertiseAddr is the node's IP address that we advertise for
168	// cluster communication.
169	AdvertiseAddr string
170
171	// BindPort is the local node's port to which we bind to for
172	// cluster communication.
173	BindPort int
174
175	// Keys to be added to the Keyring of the memberlist. Key at index
176	// 0 is the primary key
177	Keys [][]byte
178
179	// PacketBufferSize is the maximum number of bytes that memberlist will
180	// put in a packet (this will be for UDP packets by default with a NetTransport).
181	// A safe value for this is typically 1400 bytes (which is the default). However,
182	// depending on your network's MTU (Maximum Transmission Unit) you may
183	// be able to increase this to get more content into each gossip packet.
184	PacketBufferSize int
185
186	// reapEntryInterval duration of a deleted entry before being garbage collected
187	reapEntryInterval time.Duration
188
189	// reapNetworkInterval duration of a delted network before being garbage collected
190	// NOTE this MUST always be higher than reapEntryInterval
191	reapNetworkInterval time.Duration
192
193	// StatsPrintPeriod the period to use to print queue stats
194	// Default is 5min
195	StatsPrintPeriod time.Duration
196
197	// HealthPrintPeriod the period to use to print the health score
198	// Default is 1min
199	HealthPrintPeriod time.Duration
200}
201
202// entry defines a table entry
203type entry struct {
204	// node from which this entry was learned.
205	node string
206
207	// Lamport time for the most recent update to the entry
208	ltime serf.LamportTime
209
210	// Opaque value store in the entry
211	value []byte
212
213	// Deleting the entry is in progress. All entries linger in
214	// the cluster for certain amount of time after deletion.
215	deleting bool
216
217	// Number of seconds still left before a deleted table entry gets
218	// removed from networkDB
219	reapTime time.Duration
220}
221
222// DefaultConfig returns a NetworkDB config with default values
223func DefaultConfig() *Config {
224	hostname, _ := os.Hostname()
225	return &Config{
226		NodeID:            stringid.TruncateID(stringid.GenerateRandomID()),
227		Hostname:          hostname,
228		BindAddr:          "0.0.0.0",
229		PacketBufferSize:  1400,
230		StatsPrintPeriod:  5 * time.Minute,
231		HealthPrintPeriod: 1 * time.Minute,
232		reapEntryInterval: 30 * time.Minute,
233	}
234}
235
236// New creates a new instance of NetworkDB using the Config passed by
237// the caller.
238func New(c *Config) (*NetworkDB, error) {
239	// The garbage collection logic for entries leverage the presence of the network.
240	// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
241	// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
242	c.reapNetworkInterval = c.reapEntryInterval + 5*reapPeriod
243
244	nDB := &NetworkDB{
245		config:         c,
246		indexes:        make(map[int]*radix.Tree),
247		networks:       make(map[string]map[string]*network),
248		nodes:          make(map[string]*node),
249		failedNodes:    make(map[string]*node),
250		leftNodes:      make(map[string]*node),
251		networkNodes:   make(map[string][]string),
252		bulkSyncAckTbl: make(map[string]chan struct{}),
253		broadcaster:    events.NewBroadcaster(),
254	}
255
256	nDB.indexes[byTable] = radix.New()
257	nDB.indexes[byNetwork] = radix.New()
258
259	logrus.Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c)
260	if err := nDB.clusterInit(); err != nil {
261		return nil, err
262	}
263
264	return nDB, nil
265}
266
267// Join joins this NetworkDB instance with a list of peer NetworkDB
268// instances passed by the caller in the form of addr:port
269func (nDB *NetworkDB) Join(members []string) error {
270	nDB.Lock()
271	nDB.bootStrapIP = make([]net.IP, 0, len(members))
272	for _, m := range members {
273		nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
274	}
275	nDB.Unlock()
276	return nDB.clusterJoin(members)
277}
278
279// Close destroys this NetworkDB instance by leave the cluster,
280// stopping timers, canceling goroutines etc.
281func (nDB *NetworkDB) Close() {
282	if err := nDB.clusterLeave(); err != nil {
283		logrus.Errorf("%v(%v) Could not close DB: %v", nDB.config.Hostname, nDB.config.NodeID, err)
284	}
285
286	//Avoid (*Broadcaster).run goroutine leak
287	nDB.broadcaster.Close()
288}
289
290// ClusterPeers returns all the gossip cluster peers.
291func (nDB *NetworkDB) ClusterPeers() []PeerInfo {
292	nDB.RLock()
293	defer nDB.RUnlock()
294	peers := make([]PeerInfo, 0, len(nDB.nodes))
295	for _, node := range nDB.nodes {
296		peers = append(peers, PeerInfo{
297			Name: node.Name,
298			IP:   node.Node.Addr.String(),
299		})
300	}
301	return peers
302}
303
304// Peers returns the gossip peers for a given network.
305func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
306	nDB.RLock()
307	defer nDB.RUnlock()
308	peers := make([]PeerInfo, 0, len(nDB.networkNodes[nid]))
309	for _, nodeName := range nDB.networkNodes[nid] {
310		if node, ok := nDB.nodes[nodeName]; ok {
311			peers = append(peers, PeerInfo{
312				Name: node.Name,
313				IP:   node.Addr.String(),
314			})
315		} else {
316			// Added for testing purposes, this condition should never happen else mean that the network list
317			// is out of sync with the node list
318			peers = append(peers, PeerInfo{Name: nodeName, IP: "unknown"})
319		}
320	}
321	return peers
322}
323
324// GetEntry retrieves the value of a table entry in a given (network,
325// table, key) tuple
326func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
327	nDB.RLock()
328	defer nDB.RUnlock()
329	entry, err := nDB.getEntry(tname, nid, key)
330	if err != nil {
331		return nil, err
332	}
333	if entry != nil && entry.deleting {
334		return nil, types.NotFoundErrorf("entry in table %s network id %s and key %s deleted and pending garbage collection", tname, nid, key)
335	}
336
337	return entry.value, nil
338}
339
340func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
341	e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
342	if !ok {
343		return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
344	}
345
346	return e.(*entry), nil
347}
348
349// CreateEntry creates a table entry in NetworkDB for given (network,
350// table, key) tuple and if the NetworkDB is part of the cluster
351// propagates this event to the cluster. It is an error to create an
352// entry for the same tuple for which there is already an existing
353// entry unless the current entry is deleting state.
354func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
355	nDB.Lock()
356	oldEntry, err := nDB.getEntry(tname, nid, key)
357	if err == nil || (oldEntry != nil && !oldEntry.deleting) {
358		nDB.Unlock()
359		return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
360	}
361
362	entry := &entry{
363		ltime: nDB.tableClock.Increment(),
364		node:  nDB.config.NodeID,
365		value: value,
366	}
367
368	nDB.createOrUpdateEntry(nid, tname, key, entry)
369	nDB.Unlock()
370
371	if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
372		return fmt.Errorf("cannot send create event for table %s, %v", tname, err)
373	}
374
375	return nil
376}
377
378// UpdateEntry updates a table entry in NetworkDB for given (network,
379// table, key) tuple and if the NetworkDB is part of the cluster
380// propagates this event to the cluster. It is an error to update a
381// non-existent entry.
382func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
383	nDB.Lock()
384	if _, err := nDB.getEntry(tname, nid, key); err != nil {
385		nDB.Unlock()
386		return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
387	}
388
389	entry := &entry{
390		ltime: nDB.tableClock.Increment(),
391		node:  nDB.config.NodeID,
392		value: value,
393	}
394
395	nDB.createOrUpdateEntry(nid, tname, key, entry)
396	nDB.Unlock()
397
398	if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
399		return fmt.Errorf("cannot send table update event: %v", err)
400	}
401
402	return nil
403}
404
405// TableElem elem
406type TableElem struct {
407	Value []byte
408	owner string
409}
410
411// GetTableByNetwork walks the networkdb by the give table and network id and
412// returns a map of keys and values
413func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem {
414	entries := make(map[string]*TableElem)
415	nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s/%s", tname, nid), func(k string, v interface{}) bool {
416		entry := v.(*entry)
417		if entry.deleting {
418			return false
419		}
420		key := k[strings.LastIndex(k, "/")+1:]
421		entries[key] = &TableElem{Value: entry.value, owner: entry.node}
422		return false
423	})
424	return entries
425}
426
427// DeleteEntry deletes a table entry in NetworkDB for given (network,
428// table, key) tuple and if the NetworkDB is part of the cluster
429// propagates this event to the cluster.
430func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
431	nDB.Lock()
432	oldEntry, err := nDB.getEntry(tname, nid, key)
433	if err != nil || oldEntry == nil || oldEntry.deleting {
434		nDB.Unlock()
435		return fmt.Errorf("cannot delete entry %s with network id %s and key %s "+
436			"does not exist or is already being deleted", tname, nid, key)
437	}
438
439	entry := &entry{
440		ltime:    nDB.tableClock.Increment(),
441		node:     nDB.config.NodeID,
442		value:    oldEntry.value,
443		deleting: true,
444		reapTime: nDB.config.reapEntryInterval,
445	}
446
447	nDB.createOrUpdateEntry(nid, tname, key, entry)
448	nDB.Unlock()
449
450	if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
451		return fmt.Errorf("cannot send table delete event: %v", err)
452	}
453
454	return nil
455}
456
457func (nDB *NetworkDB) deleteNodeFromNetworks(deletedNode string) {
458	for nid, nodes := range nDB.networkNodes {
459		updatedNodes := make([]string, 0, len(nodes))
460		for _, node := range nodes {
461			if node == deletedNode {
462				continue
463			}
464
465			updatedNodes = append(updatedNodes, node)
466		}
467
468		nDB.networkNodes[nid] = updatedNodes
469	}
470
471	delete(nDB.networks, deletedNode)
472}
473
474// deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes:
475// 1) when a notification is coming of a node leaving the network
476//		- Walk all the network entries and mark the leaving node's entries for deletion
477//			These will be garbage collected when the reap timer will expire
478// 2) when the local node is leaving the network
479//		- Walk all the network entries:
480//			A) if the entry is owned by the local node
481//		  then we will mark it for deletion. This will ensure that if a node did not
482//		  yet received the notification that the local node is leaving, will be aware
483//		  of the entries to be deleted.
484//			B) if the entry is owned by a remote node, then we can safely delete it. This
485//			ensures that if we join back this network as we receive the CREATE event for
486//		  entries owned by remote nodes, we will accept them and we notify the application
487func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
488	// Indicates if the delete is triggered for the local node
489	isNodeLocal := node == nDB.config.NodeID
490
491	nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
492		func(path string, v interface{}) bool {
493			oldEntry := v.(*entry)
494			params := strings.Split(path[1:], "/")
495			nid := params[0]
496			tname := params[1]
497			key := params[2]
498
499			// If the entry is owned by a remote node and this node is not leaving the network
500			if oldEntry.node != node && !isNodeLocal {
501				// Don't do anything because the event is triggered for a node that does not own this entry
502				return false
503			}
504
505			// If this entry is already marked for deletion and this node is not leaving the network
506			if oldEntry.deleting && !isNodeLocal {
507				// Don't do anything this entry will be already garbage collected using the old reapTime
508				return false
509			}
510
511			entry := &entry{
512				ltime:    oldEntry.ltime,
513				node:     oldEntry.node,
514				value:    oldEntry.value,
515				deleting: true,
516				reapTime: nDB.config.reapEntryInterval,
517			}
518
519			// we arrived at this point in 2 cases:
520			// 1) this entry is owned by the node that is leaving the network
521			// 2) the local node is leaving the network
522			if oldEntry.node == node {
523				if isNodeLocal {
524					// TODO fcrisciani: this can be removed if there is no way to leave the network
525					// without doing a delete of all the objects
526					entry.ltime++
527				}
528
529				if !oldEntry.deleting {
530					nDB.createOrUpdateEntry(nid, tname, key, entry)
531				}
532			} else {
533				// the local node is leaving the network, all the entries of remote nodes can be safely removed
534				nDB.deleteEntry(nid, tname, key)
535			}
536
537			// Notify to the upper layer only entries not already marked for deletion
538			if !oldEntry.deleting {
539				nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
540			}
541			return false
542		})
543}
544
545func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
546	nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
547		oldEntry := v.(*entry)
548		if oldEntry.node != node {
549			return false
550		}
551
552		params := strings.Split(path[1:], "/")
553		tname := params[0]
554		nid := params[1]
555		key := params[2]
556
557		nDB.deleteEntry(nid, tname, key)
558
559		if !oldEntry.deleting {
560			nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
561		}
562		return false
563	})
564}
565
566// WalkTable walks a single table in NetworkDB and invokes the passed
567// function for each entry in the table passing the network, key,
568// value. The walk stops if the passed function returns a true.
569func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error {
570	nDB.RLock()
571	values := make(map[string]interface{})
572	nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
573		values[path] = v
574		return false
575	})
576	nDB.RUnlock()
577
578	for k, v := range values {
579		params := strings.Split(k[1:], "/")
580		nid := params[1]
581		key := params[2]
582		if fn(nid, key, v.(*entry).value, v.(*entry).deleting) {
583			return nil
584		}
585	}
586
587	return nil
588}
589
590// JoinNetwork joins this node to a given network and propagates this
591// event across the cluster. This triggers this node joining the
592// sub-cluster of this network and participates in the network-scoped
593// gossip and bulk sync for this network.
594func (nDB *NetworkDB) JoinNetwork(nid string) error {
595	ltime := nDB.networkClock.Increment()
596
597	nDB.Lock()
598	nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
599	if !ok {
600		nodeNetworks = make(map[string]*network)
601		nDB.networks[nDB.config.NodeID] = nodeNetworks
602	}
603	n, ok := nodeNetworks[nid]
604	var entries int
605	if ok {
606		entries = n.entriesNumber
607	}
608	nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries}
609	nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
610		NumNodes: func() int {
611			//TODO fcrisciani this can be optimized maybe avoiding the lock?
612			// this call is done each GetBroadcasts call to evaluate the number of
613			// replicas for the message
614			nDB.RLock()
615			defer nDB.RUnlock()
616			return len(nDB.networkNodes[nid])
617		},
618		RetransmitMult: 4,
619	}
620	nDB.addNetworkNode(nid, nDB.config.NodeID)
621	networkNodes := nDB.networkNodes[nid]
622	nDB.Unlock()
623
624	if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
625		return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
626	}
627
628	logrus.Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
629	if _, err := nDB.bulkSync(networkNodes, true); err != nil {
630		logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
631	}
632
633	return nil
634}
635
636// LeaveNetwork leaves this node from a given network and propagates
637// this event across the cluster. This triggers this node leaving the
638// sub-cluster of this network and as a result will no longer
639// participate in the network-scoped gossip and bulk sync for this
640// network. Also remove all the table entries for this network from
641// networkdb
642func (nDB *NetworkDB) LeaveNetwork(nid string) error {
643	ltime := nDB.networkClock.Increment()
644	if err := nDB.sendNetworkEvent(nid, NetworkEventTypeLeave, ltime); err != nil {
645		return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
646	}
647
648	nDB.Lock()
649	defer nDB.Unlock()
650
651	// Remove myself from the list of the nodes participating to the network
652	nDB.deleteNetworkNode(nid, nDB.config.NodeID)
653
654	// Update all the local entries marking them for deletion and delete all the remote entries
655	nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeID)
656
657	nodeNetworks, ok := nDB.networks[nDB.config.NodeID]
658	if !ok {
659		return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
660	}
661
662	n, ok := nodeNetworks[nid]
663	if !ok {
664		return fmt.Errorf("could not find network %s while trying to leave", nid)
665	}
666
667	logrus.Debugf("%v(%v): leaving network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
668	n.ltime = ltime
669	n.reapTime = nDB.config.reapNetworkInterval
670	n.leaving = true
671	return nil
672}
673
674// addNetworkNode adds the node to the list of nodes which participate
675// in the passed network only if it is not already present. Caller
676// should hold the NetworkDB lock while calling this
677func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) {
678	nodes := nDB.networkNodes[nid]
679	for _, node := range nodes {
680		if node == nodeName {
681			return
682		}
683	}
684
685	nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName)
686}
687
688// Deletes the node from the list of nodes which participate in the
689// passed network. Caller should hold the NetworkDB lock while calling
690// this
691func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
692	nodes, ok := nDB.networkNodes[nid]
693	if !ok || len(nodes) == 0 {
694		return
695	}
696	newNodes := make([]string, 0, len(nodes)-1)
697	for _, name := range nodes {
698		if name == nodeName {
699			continue
700		}
701		newNodes = append(newNodes, name)
702	}
703	nDB.networkNodes[nid] = newNodes
704}
705
706// findCommonnetworks find the networks that both this node and the
707// passed node have joined.
708func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
709	nDB.RLock()
710	defer nDB.RUnlock()
711
712	var networks []string
713	for nid := range nDB.networks[nDB.config.NodeID] {
714		if n, ok := nDB.networks[nodeName][nid]; ok {
715			if !n.leaving {
716				networks = append(networks, nid)
717			}
718		}
719	}
720
721	return networks
722}
723
724func (nDB *NetworkDB) updateLocalNetworkTime() {
725	nDB.Lock()
726	defer nDB.Unlock()
727
728	ltime := nDB.networkClock.Increment()
729	for _, n := range nDB.networks[nDB.config.NodeID] {
730		n.ltime = ltime
731	}
732}
733
734// createOrUpdateEntry this function handles the creation or update of entries into the local
735// tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated)
736func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (bool, bool) {
737	_, okTable := nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
738	_, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
739	if !okNetwork {
740		// Add only if it is an insert not an update
741		n, ok := nDB.networks[nDB.config.NodeID][nid]
742		if ok {
743			n.entriesNumber++
744		}
745	}
746	return okTable, okNetwork
747}
748
749// deleteEntry this function handles the deletion of entries into the local tree store.
750// It is also used to keep in sync the entries number of the network (all tables are aggregated)
751func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
752	_, okTable := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
753	_, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
754	if okNetwork {
755		// Remove only if the delete is successful
756		n, ok := nDB.networks[nDB.config.NodeID][nid]
757		if ok {
758			n.entriesNumber--
759		}
760	}
761	return okTable, okNetwork
762}
763