1package networkdb 2 3//go:generate protoc -I.:../vendor/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto 4 5import ( 6 "context" 7 "fmt" 8 "net" 9 "os" 10 "strings" 11 "sync" 12 "time" 13 14 "github.com/armon/go-radix" 15 "github.com/docker/docker/pkg/stringid" 16 "github.com/docker/go-events" 17 "github.com/docker/libnetwork/types" 18 "github.com/hashicorp/memberlist" 19 "github.com/hashicorp/serf/serf" 20 "github.com/sirupsen/logrus" 21) 22 23const ( 24 byTable int = 1 + iota 25 byNetwork 26) 27 28// NetworkDB instance drives the networkdb cluster and acts the broker 29// for cluster-scoped and network-scoped gossip and watches. 30type NetworkDB struct { 31 // The clocks MUST be the first things 32 // in this struct due to Golang issue #599. 33 34 // Global lamport clock for node network attach events. 35 networkClock serf.LamportClock 36 37 // Global lamport clock for table events. 38 tableClock serf.LamportClock 39 40 sync.RWMutex 41 42 // NetworkDB configuration. 43 config *Config 44 45 // All the tree index (byTable, byNetwork) that we maintain 46 // the db. 47 indexes map[int]*radix.Tree 48 49 // Memberlist we use to drive the cluster. 50 memberlist *memberlist.Memberlist 51 52 // List of all peer nodes in the cluster not-limited to any 53 // network. 54 nodes map[string]*node 55 56 // List of all peer nodes which have failed 57 failedNodes map[string]*node 58 59 // List of all peer nodes which have left 60 leftNodes map[string]*node 61 62 // A multi-dimensional map of network/node attachmemts. The 63 // first key is a node name and the second key is a network ID 64 // for the network that node is participating in. 65 networks map[string]map[string]*network 66 67 // A map of nodes which are participating in a given 68 // network. The key is a network ID. 69 networkNodes map[string][]string 70 71 // A table of ack channels for every node from which we are 72 // waiting for an ack. 73 bulkSyncAckTbl map[string]chan struct{} 74 75 // Broadcast queue for network event gossip. 76 networkBroadcasts *memberlist.TransmitLimitedQueue 77 78 // Broadcast queue for node event gossip. 79 nodeBroadcasts *memberlist.TransmitLimitedQueue 80 81 // A central context to stop all go routines running on 82 // behalf of the NetworkDB instance. 83 ctx context.Context 84 cancelCtx context.CancelFunc 85 86 // A central broadcaster for all local watchers watching table 87 // events. 88 broadcaster *events.Broadcaster 89 90 // List of all tickers which needed to be stopped when 91 // cleaning up. 92 tickers []*time.Ticker 93 94 // Reference to the memberlist's keyring to add & remove keys 95 keyring *memberlist.Keyring 96 97 // bootStrapIP is the list of IPs that can be used to bootstrap 98 // the gossip. 99 bootStrapIP []net.IP 100 101 // lastStatsTimestamp is the last timestamp when the stats got printed 102 lastStatsTimestamp time.Time 103 104 // lastHealthTimestamp is the last timestamp when the health score got printed 105 lastHealthTimestamp time.Time 106} 107 108// PeerInfo represents the peer (gossip cluster) nodes of a network 109type PeerInfo struct { 110 Name string 111 IP string 112} 113 114// PeerClusterInfo represents the peer (gossip cluster) nodes 115type PeerClusterInfo struct { 116 PeerInfo 117} 118 119type node struct { 120 memberlist.Node 121 ltime serf.LamportTime 122 // Number of hours left before the reaper removes the node 123 reapTime time.Duration 124} 125 126// network describes the node/network attachment. 127type network struct { 128 // Network ID 129 id string 130 131 // Lamport time for the latest state of the entry. 132 ltime serf.LamportTime 133 134 // Node leave is in progress. 135 leaving bool 136 137 // Number of seconds still left before a deleted network entry gets 138 // removed from networkDB 139 reapTime time.Duration 140 141 // The broadcast queue for table event gossip. This is only 142 // initialized for this node's network attachment entries. 143 tableBroadcasts *memberlist.TransmitLimitedQueue 144 145 // Number of gossip messages sent related to this network during the last stats collection period 146 qMessagesSent int 147 148 // Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network. 149 // Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod 150 // interval 151 entriesNumber int 152} 153 154// Config represents the configuration of the networdb instance and 155// can be passed by the caller. 156type Config struct { 157 // NodeID is the node unique identifier of the node when is part of the cluster 158 NodeID string 159 160 // Hostname is the node hostname. 161 Hostname string 162 163 // BindAddr is the IP on which networkdb listens. It can be 164 // 0.0.0.0 to listen on all addresses on the host. 165 BindAddr string 166 167 // AdvertiseAddr is the node's IP address that we advertise for 168 // cluster communication. 169 AdvertiseAddr string 170 171 // BindPort is the local node's port to which we bind to for 172 // cluster communication. 173 BindPort int 174 175 // Keys to be added to the Keyring of the memberlist. Key at index 176 // 0 is the primary key 177 Keys [][]byte 178 179 // PacketBufferSize is the maximum number of bytes that memberlist will 180 // put in a packet (this will be for UDP packets by default with a NetTransport). 181 // A safe value for this is typically 1400 bytes (which is the default). However, 182 // depending on your network's MTU (Maximum Transmission Unit) you may 183 // be able to increase this to get more content into each gossip packet. 184 PacketBufferSize int 185 186 // reapEntryInterval duration of a deleted entry before being garbage collected 187 reapEntryInterval time.Duration 188 189 // reapNetworkInterval duration of a delted network before being garbage collected 190 // NOTE this MUST always be higher than reapEntryInterval 191 reapNetworkInterval time.Duration 192 193 // StatsPrintPeriod the period to use to print queue stats 194 // Default is 5min 195 StatsPrintPeriod time.Duration 196 197 // HealthPrintPeriod the period to use to print the health score 198 // Default is 1min 199 HealthPrintPeriod time.Duration 200} 201 202// entry defines a table entry 203type entry struct { 204 // node from which this entry was learned. 205 node string 206 207 // Lamport time for the most recent update to the entry 208 ltime serf.LamportTime 209 210 // Opaque value store in the entry 211 value []byte 212 213 // Deleting the entry is in progress. All entries linger in 214 // the cluster for certain amount of time after deletion. 215 deleting bool 216 217 // Number of seconds still left before a deleted table entry gets 218 // removed from networkDB 219 reapTime time.Duration 220} 221 222// DefaultConfig returns a NetworkDB config with default values 223func DefaultConfig() *Config { 224 hostname, _ := os.Hostname() 225 return &Config{ 226 NodeID: stringid.TruncateID(stringid.GenerateRandomID()), 227 Hostname: hostname, 228 BindAddr: "0.0.0.0", 229 PacketBufferSize: 1400, 230 StatsPrintPeriod: 5 * time.Minute, 231 HealthPrintPeriod: 1 * time.Minute, 232 reapEntryInterval: 30 * time.Minute, 233 } 234} 235 236// New creates a new instance of NetworkDB using the Config passed by 237// the caller. 238func New(c *Config) (*NetworkDB, error) { 239 // The garbage collection logic for entries leverage the presence of the network. 240 // For this reason the expiration time of the network is put slightly higher than the entry expiration so that 241 // there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network. 242 c.reapNetworkInterval = c.reapEntryInterval + 5*reapPeriod 243 244 nDB := &NetworkDB{ 245 config: c, 246 indexes: make(map[int]*radix.Tree), 247 networks: make(map[string]map[string]*network), 248 nodes: make(map[string]*node), 249 failedNodes: make(map[string]*node), 250 leftNodes: make(map[string]*node), 251 networkNodes: make(map[string][]string), 252 bulkSyncAckTbl: make(map[string]chan struct{}), 253 broadcaster: events.NewBroadcaster(), 254 } 255 256 nDB.indexes[byTable] = radix.New() 257 nDB.indexes[byNetwork] = radix.New() 258 259 logrus.Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c) 260 if err := nDB.clusterInit(); err != nil { 261 return nil, err 262 } 263 264 return nDB, nil 265} 266 267// Join joins this NetworkDB instance with a list of peer NetworkDB 268// instances passed by the caller in the form of addr:port 269func (nDB *NetworkDB) Join(members []string) error { 270 nDB.Lock() 271 nDB.bootStrapIP = make([]net.IP, 0, len(members)) 272 for _, m := range members { 273 nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m)) 274 } 275 nDB.Unlock() 276 return nDB.clusterJoin(members) 277} 278 279// Close destroys this NetworkDB instance by leave the cluster, 280// stopping timers, canceling goroutines etc. 281func (nDB *NetworkDB) Close() { 282 if err := nDB.clusterLeave(); err != nil { 283 logrus.Errorf("%v(%v) Could not close DB: %v", nDB.config.Hostname, nDB.config.NodeID, err) 284 } 285 286 //Avoid (*Broadcaster).run goroutine leak 287 nDB.broadcaster.Close() 288} 289 290// ClusterPeers returns all the gossip cluster peers. 291func (nDB *NetworkDB) ClusterPeers() []PeerInfo { 292 nDB.RLock() 293 defer nDB.RUnlock() 294 peers := make([]PeerInfo, 0, len(nDB.nodes)) 295 for _, node := range nDB.nodes { 296 peers = append(peers, PeerInfo{ 297 Name: node.Name, 298 IP: node.Node.Addr.String(), 299 }) 300 } 301 return peers 302} 303 304// Peers returns the gossip peers for a given network. 305func (nDB *NetworkDB) Peers(nid string) []PeerInfo { 306 nDB.RLock() 307 defer nDB.RUnlock() 308 peers := make([]PeerInfo, 0, len(nDB.networkNodes[nid])) 309 for _, nodeName := range nDB.networkNodes[nid] { 310 if node, ok := nDB.nodes[nodeName]; ok { 311 peers = append(peers, PeerInfo{ 312 Name: node.Name, 313 IP: node.Addr.String(), 314 }) 315 } else { 316 // Added for testing purposes, this condition should never happen else mean that the network list 317 // is out of sync with the node list 318 peers = append(peers, PeerInfo{Name: nodeName, IP: "unknown"}) 319 } 320 } 321 return peers 322} 323 324// GetEntry retrieves the value of a table entry in a given (network, 325// table, key) tuple 326func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) { 327 nDB.RLock() 328 defer nDB.RUnlock() 329 entry, err := nDB.getEntry(tname, nid, key) 330 if err != nil { 331 return nil, err 332 } 333 if entry != nil && entry.deleting { 334 return nil, types.NotFoundErrorf("entry in table %s network id %s and key %s deleted and pending garbage collection", tname, nid, key) 335 } 336 337 return entry.value, nil 338} 339 340func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) { 341 e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) 342 if !ok { 343 return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key) 344 } 345 346 return e.(*entry), nil 347} 348 349// CreateEntry creates a table entry in NetworkDB for given (network, 350// table, key) tuple and if the NetworkDB is part of the cluster 351// propagates this event to the cluster. It is an error to create an 352// entry for the same tuple for which there is already an existing 353// entry unless the current entry is deleting state. 354func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error { 355 nDB.Lock() 356 oldEntry, err := nDB.getEntry(tname, nid, key) 357 if err == nil || (oldEntry != nil && !oldEntry.deleting) { 358 nDB.Unlock() 359 return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key) 360 } 361 362 entry := &entry{ 363 ltime: nDB.tableClock.Increment(), 364 node: nDB.config.NodeID, 365 value: value, 366 } 367 368 nDB.createOrUpdateEntry(nid, tname, key, entry) 369 nDB.Unlock() 370 371 if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil { 372 return fmt.Errorf("cannot send create event for table %s, %v", tname, err) 373 } 374 375 return nil 376} 377 378// UpdateEntry updates a table entry in NetworkDB for given (network, 379// table, key) tuple and if the NetworkDB is part of the cluster 380// propagates this event to the cluster. It is an error to update a 381// non-existent entry. 382func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error { 383 nDB.Lock() 384 if _, err := nDB.getEntry(tname, nid, key); err != nil { 385 nDB.Unlock() 386 return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key) 387 } 388 389 entry := &entry{ 390 ltime: nDB.tableClock.Increment(), 391 node: nDB.config.NodeID, 392 value: value, 393 } 394 395 nDB.createOrUpdateEntry(nid, tname, key, entry) 396 nDB.Unlock() 397 398 if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil { 399 return fmt.Errorf("cannot send table update event: %v", err) 400 } 401 402 return nil 403} 404 405// TableElem elem 406type TableElem struct { 407 Value []byte 408 owner string 409} 410 411// GetTableByNetwork walks the networkdb by the give table and network id and 412// returns a map of keys and values 413func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem { 414 entries := make(map[string]*TableElem) 415 nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s/%s", tname, nid), func(k string, v interface{}) bool { 416 entry := v.(*entry) 417 if entry.deleting { 418 return false 419 } 420 key := k[strings.LastIndex(k, "/")+1:] 421 entries[key] = &TableElem{Value: entry.value, owner: entry.node} 422 return false 423 }) 424 return entries 425} 426 427// DeleteEntry deletes a table entry in NetworkDB for given (network, 428// table, key) tuple and if the NetworkDB is part of the cluster 429// propagates this event to the cluster. 430func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { 431 nDB.Lock() 432 oldEntry, err := nDB.getEntry(tname, nid, key) 433 if err != nil || oldEntry == nil || oldEntry.deleting { 434 nDB.Unlock() 435 return fmt.Errorf("cannot delete entry %s with network id %s and key %s "+ 436 "does not exist or is already being deleted", tname, nid, key) 437 } 438 439 entry := &entry{ 440 ltime: nDB.tableClock.Increment(), 441 node: nDB.config.NodeID, 442 value: oldEntry.value, 443 deleting: true, 444 reapTime: nDB.config.reapEntryInterval, 445 } 446 447 nDB.createOrUpdateEntry(nid, tname, key, entry) 448 nDB.Unlock() 449 450 if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil { 451 return fmt.Errorf("cannot send table delete event: %v", err) 452 } 453 454 return nil 455} 456 457func (nDB *NetworkDB) deleteNodeFromNetworks(deletedNode string) { 458 for nid, nodes := range nDB.networkNodes { 459 updatedNodes := make([]string, 0, len(nodes)) 460 for _, node := range nodes { 461 if node == deletedNode { 462 continue 463 } 464 465 updatedNodes = append(updatedNodes, node) 466 } 467 468 nDB.networkNodes[nid] = updatedNodes 469 } 470 471 delete(nDB.networks, deletedNode) 472} 473 474// deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes: 475// 1) when a notification is coming of a node leaving the network 476// - Walk all the network entries and mark the leaving node's entries for deletion 477// These will be garbage collected when the reap timer will expire 478// 2) when the local node is leaving the network 479// - Walk all the network entries: 480// A) if the entry is owned by the local node 481// then we will mark it for deletion. This will ensure that if a node did not 482// yet received the notification that the local node is leaving, will be aware 483// of the entries to be deleted. 484// B) if the entry is owned by a remote node, then we can safely delete it. This 485// ensures that if we join back this network as we receive the CREATE event for 486// entries owned by remote nodes, we will accept them and we notify the application 487func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { 488 // Indicates if the delete is triggered for the local node 489 isNodeLocal := node == nDB.config.NodeID 490 491 nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), 492 func(path string, v interface{}) bool { 493 oldEntry := v.(*entry) 494 params := strings.Split(path[1:], "/") 495 nid := params[0] 496 tname := params[1] 497 key := params[2] 498 499 // If the entry is owned by a remote node and this node is not leaving the network 500 if oldEntry.node != node && !isNodeLocal { 501 // Don't do anything because the event is triggered for a node that does not own this entry 502 return false 503 } 504 505 // If this entry is already marked for deletion and this node is not leaving the network 506 if oldEntry.deleting && !isNodeLocal { 507 // Don't do anything this entry will be already garbage collected using the old reapTime 508 return false 509 } 510 511 entry := &entry{ 512 ltime: oldEntry.ltime, 513 node: oldEntry.node, 514 value: oldEntry.value, 515 deleting: true, 516 reapTime: nDB.config.reapEntryInterval, 517 } 518 519 // we arrived at this point in 2 cases: 520 // 1) this entry is owned by the node that is leaving the network 521 // 2) the local node is leaving the network 522 if oldEntry.node == node { 523 if isNodeLocal { 524 // TODO fcrisciani: this can be removed if there is no way to leave the network 525 // without doing a delete of all the objects 526 entry.ltime++ 527 } 528 529 if !oldEntry.deleting { 530 nDB.createOrUpdateEntry(nid, tname, key, entry) 531 } 532 } else { 533 // the local node is leaving the network, all the entries of remote nodes can be safely removed 534 nDB.deleteEntry(nid, tname, key) 535 } 536 537 // Notify to the upper layer only entries not already marked for deletion 538 if !oldEntry.deleting { 539 nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value)) 540 } 541 return false 542 }) 543} 544 545func (nDB *NetworkDB) deleteNodeTableEntries(node string) { 546 nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { 547 oldEntry := v.(*entry) 548 if oldEntry.node != node { 549 return false 550 } 551 552 params := strings.Split(path[1:], "/") 553 tname := params[0] 554 nid := params[1] 555 key := params[2] 556 557 nDB.deleteEntry(nid, tname, key) 558 559 if !oldEntry.deleting { 560 nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value)) 561 } 562 return false 563 }) 564} 565 566// WalkTable walks a single table in NetworkDB and invokes the passed 567// function for each entry in the table passing the network, key, 568// value. The walk stops if the passed function returns a true. 569func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error { 570 nDB.RLock() 571 values := make(map[string]interface{}) 572 nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool { 573 values[path] = v 574 return false 575 }) 576 nDB.RUnlock() 577 578 for k, v := range values { 579 params := strings.Split(k[1:], "/") 580 nid := params[1] 581 key := params[2] 582 if fn(nid, key, v.(*entry).value, v.(*entry).deleting) { 583 return nil 584 } 585 } 586 587 return nil 588} 589 590// JoinNetwork joins this node to a given network and propagates this 591// event across the cluster. This triggers this node joining the 592// sub-cluster of this network and participates in the network-scoped 593// gossip and bulk sync for this network. 594func (nDB *NetworkDB) JoinNetwork(nid string) error { 595 ltime := nDB.networkClock.Increment() 596 597 nDB.Lock() 598 nodeNetworks, ok := nDB.networks[nDB.config.NodeID] 599 if !ok { 600 nodeNetworks = make(map[string]*network) 601 nDB.networks[nDB.config.NodeID] = nodeNetworks 602 } 603 n, ok := nodeNetworks[nid] 604 var entries int 605 if ok { 606 entries = n.entriesNumber 607 } 608 nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries} 609 nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{ 610 NumNodes: func() int { 611 //TODO fcrisciani this can be optimized maybe avoiding the lock? 612 // this call is done each GetBroadcasts call to evaluate the number of 613 // replicas for the message 614 nDB.RLock() 615 defer nDB.RUnlock() 616 return len(nDB.networkNodes[nid]) 617 }, 618 RetransmitMult: 4, 619 } 620 nDB.addNetworkNode(nid, nDB.config.NodeID) 621 networkNodes := nDB.networkNodes[nid] 622 nDB.Unlock() 623 624 if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil { 625 return fmt.Errorf("failed to send leave network event for %s: %v", nid, err) 626 } 627 628 logrus.Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid) 629 if _, err := nDB.bulkSync(networkNodes, true); err != nil { 630 logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err) 631 } 632 633 return nil 634} 635 636// LeaveNetwork leaves this node from a given network and propagates 637// this event across the cluster. This triggers this node leaving the 638// sub-cluster of this network and as a result will no longer 639// participate in the network-scoped gossip and bulk sync for this 640// network. Also remove all the table entries for this network from 641// networkdb 642func (nDB *NetworkDB) LeaveNetwork(nid string) error { 643 ltime := nDB.networkClock.Increment() 644 if err := nDB.sendNetworkEvent(nid, NetworkEventTypeLeave, ltime); err != nil { 645 return fmt.Errorf("failed to send leave network event for %s: %v", nid, err) 646 } 647 648 nDB.Lock() 649 defer nDB.Unlock() 650 651 // Remove myself from the list of the nodes participating to the network 652 nDB.deleteNetworkNode(nid, nDB.config.NodeID) 653 654 // Update all the local entries marking them for deletion and delete all the remote entries 655 nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeID) 656 657 nodeNetworks, ok := nDB.networks[nDB.config.NodeID] 658 if !ok { 659 return fmt.Errorf("could not find self node for network %s while trying to leave", nid) 660 } 661 662 n, ok := nodeNetworks[nid] 663 if !ok { 664 return fmt.Errorf("could not find network %s while trying to leave", nid) 665 } 666 667 logrus.Debugf("%v(%v): leaving network %s", nDB.config.Hostname, nDB.config.NodeID, nid) 668 n.ltime = ltime 669 n.reapTime = nDB.config.reapNetworkInterval 670 n.leaving = true 671 return nil 672} 673 674// addNetworkNode adds the node to the list of nodes which participate 675// in the passed network only if it is not already present. Caller 676// should hold the NetworkDB lock while calling this 677func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) { 678 nodes := nDB.networkNodes[nid] 679 for _, node := range nodes { 680 if node == nodeName { 681 return 682 } 683 } 684 685 nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName) 686} 687 688// Deletes the node from the list of nodes which participate in the 689// passed network. Caller should hold the NetworkDB lock while calling 690// this 691func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) { 692 nodes, ok := nDB.networkNodes[nid] 693 if !ok || len(nodes) == 0 { 694 return 695 } 696 newNodes := make([]string, 0, len(nodes)-1) 697 for _, name := range nodes { 698 if name == nodeName { 699 continue 700 } 701 newNodes = append(newNodes, name) 702 } 703 nDB.networkNodes[nid] = newNodes 704} 705 706// findCommonnetworks find the networks that both this node and the 707// passed node have joined. 708func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string { 709 nDB.RLock() 710 defer nDB.RUnlock() 711 712 var networks []string 713 for nid := range nDB.networks[nDB.config.NodeID] { 714 if n, ok := nDB.networks[nodeName][nid]; ok { 715 if !n.leaving { 716 networks = append(networks, nid) 717 } 718 } 719 } 720 721 return networks 722} 723 724func (nDB *NetworkDB) updateLocalNetworkTime() { 725 nDB.Lock() 726 defer nDB.Unlock() 727 728 ltime := nDB.networkClock.Increment() 729 for _, n := range nDB.networks[nDB.config.NodeID] { 730 n.ltime = ltime 731 } 732} 733 734// createOrUpdateEntry this function handles the creation or update of entries into the local 735// tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated) 736func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (bool, bool) { 737 _, okTable := nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) 738 _, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) 739 if !okNetwork { 740 // Add only if it is an insert not an update 741 n, ok := nDB.networks[nDB.config.NodeID][nid] 742 if ok { 743 n.entriesNumber++ 744 } 745 } 746 return okTable, okNetwork 747} 748 749// deleteEntry this function handles the deletion of entries into the local tree store. 750// It is also used to keep in sync the entries number of the network (all tables are aggregated) 751func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) { 752 _, okTable := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) 753 _, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) 754 if okNetwork { 755 // Remove only if the delete is successful 756 n, ok := nDB.networks[nDB.config.NodeID][nid] 757 if ok { 758 n.entriesNumber-- 759 } 760 } 761 return okTable, okNetwork 762} 763