1package network 2 3import ( 4 "crypto/rand" 5 "encoding/binary" 6 "time" 7 8 "github.com/Arceliar/phony" 9) 10 11const ( 12 treeTIMEOUT = time.Hour // TODO figure out what makes sense 13 treeANNOUNCE = treeTIMEOUT / 2 14 treeTHROTTLE = treeANNOUNCE / 2 // TODO use this to limit how fast seqs can update 15) 16 17/********** 18 * dhtree * 19 **********/ 20 21type dhtree struct { 22 phony.Inbox 23 core *core 24 pathfinder pathfinder 25 expired map[publicKey]treeExpiredInfo // stores root highest seq and when it expires 26 tinfos map[*peer]*treeInfo 27 dinfos map[dhtMapKey]*dhtInfo 28 self *treeInfo // self info 29 parent *peer // peer that sent t.self to us 30 prev *dhtInfo // previous key in dht, who we maintain a path to 31 next *dhtInfo // next in dht, they maintain a path to us 32 dkeys map[*dhtInfo]publicKey // map of *dhtInfo->destKey for current and past prev 33 seq uint64 // updated whenever we send a new setup, technically it doesn't need to increase (it just needs to be different) 34 btimer *time.Timer // time.AfterFunc to send bootstrap packets 35 stimer *time.Timer // time.AfterFunc for self/parent expiration 36 wait bool // FIXME this shouldn't be needed 37} 38 39type treeExpiredInfo struct { 40 seq uint64 // sequence number that expires 41 time time.Time // Time when it expires 42} 43 44func (t *dhtree) init(c *core) { 45 t.core = c 46 t.expired = make(map[publicKey]treeExpiredInfo) 47 t.tinfos = make(map[*peer]*treeInfo) 48 t.dinfos = make(map[dhtMapKey]*dhtInfo) 49 t.dkeys = make(map[*dhtInfo]publicKey) 50 t.seq = uint64(time.Now().UnixNano()) 51 r := make([]byte, 8) 52 if _, err := rand.Read(r); err != nil { 53 panic(err) 54 } 55 for idx := range r { 56 t.seq |= uint64(r[idx]) << 8 * uint64(idx) 57 } 58 t.btimer = time.AfterFunc(0, func() {}) // non-nil until closed 59 t.stimer = time.AfterFunc(0, func() {}) // non-nil until closed 60 t._fix() // Initialize t.self and start announce and timeout timers 61 t.pathfinder.init(t) 62} 63 64func (t *dhtree) _sendTree() { 65 for p := range t.tinfos { 66 p.sendTree(t, t.self) 67 } 68} 69 70// update adds a treeInfo to the spanning tree 71// it then fixes the tree (selecting a new parent, if needed) and the dht (restarting the bootstrap process) 72// if the info is from the current parent, then there's a delay before the tree/dht are fixed 73// that prevents a race where we immediately switch to a new parent, who tries to do the same with us 74// this avoids the tons of traffic generated when nodes race to use each other as parents 75func (t *dhtree) update(from phony.Actor, info *treeInfo, p *peer) { 76 t.Act(from, func() { 77 // The tree info should have been checked before this point 78 info.time = time.Now() // Order by processing time, not receiving time... 79 if exp, isIn := t.expired[info.root]; !isIn || exp.seq < info.seq { 80 t.expired[info.root] = treeExpiredInfo{seq: info.seq, time: info.time} 81 } 82 if t.tinfos[p] == nil { 83 // The peer may have missed an update due to a race between creating the peer and now 84 // The easiest way to fix the problem is to just send it another update right now 85 p.sendTree(t, t.self) 86 } 87 t.tinfos[p] = info 88 if p == t.parent { 89 if t.wait { 90 panic("this should never happen") 91 } 92 var doWait bool 93 if treeLess(t.self.root, info.root) { 94 doWait = true // worse root 95 } else if info.root.equal(t.self.root) && info.seq == t.self.seq { 96 doWait = true // same root and seq 97 } 98 t.self, t.parent = nil, nil // The old self/parent are now invalid 99 if doWait { 100 // FIXME this is a hack 101 // We seem to busyloop if we process parent updates immediately 102 // E.g. we get bad news and immediately switch to a different peer 103 // Then we get more bad news and switch again, etc... 104 // Set self to root, send, then process things correctly 1 second later 105 t.wait = true 106 t.self = &treeInfo{root: t.core.crypto.publicKey} 107 t._sendTree() // send bad news immediately 108 time.AfterFunc(time.Second, func() { 109 t.Act(nil, func() { 110 t.wait = false 111 t.self, t.parent = nil, nil 112 t._fix() 113 t._doBootstrap() 114 }) 115 }) 116 } 117 } 118 if !t.wait { 119 t._fix() 120 t._doBootstrap() 121 } 122 }) 123} 124 125// remove removes a peer from the tree, along with any paths through that peer in the dht 126func (t *dhtree) remove(from phony.Actor, p *peer) { 127 t.Act(from, func() { 128 oldInfo := t.tinfos[p] 129 delete(t.tinfos, p) 130 if t.self == oldInfo { 131 t.self = nil 132 t.parent = nil 133 t._fix() 134 } 135 for _, dinfo := range t.dinfos { 136 if dinfo.peer == p || dinfo.rest == p { 137 t._teardown(p, dinfo.getTeardown()) 138 } 139 } 140 }) 141} 142 143// _fix selects the best parent (and is called in response to receiving a tree update) 144// if this is not the same as our current parent, then it sends a tree update to our peers and resets our prev/next in the dht 145func (t *dhtree) _fix() { 146 if t.stimer == nil { 147 return // closed 148 } 149 oldSelf := t.self 150 if t.self == nil || treeLess(t.core.crypto.publicKey, t.self.root) { 151 // Note that seq needs to be non-decreasing for the node to function as a root 152 // a timestamp it used to partly mitigate rollbacks from restarting 153 t.self = &treeInfo{ 154 root: t.core.crypto.publicKey, 155 seq: uint64(time.Now().Unix()), 156 time: time.Now(), 157 } 158 t.parent = nil 159 } 160 for _, info := range t.tinfos { 161 // Refill expired to include non-root nodes (in case we're replacing an expired 162 if exp, isIn := t.expired[info.root]; !isIn || exp.seq < info.seq || exp.seq == info.seq && info.time.Before(exp.time) { 163 // Fill expired as we 164 t.expired[info.root] = treeExpiredInfo{seq: info.seq, time: info.time} 165 } 166 } 167 for p, info := range t.tinfos { 168 if exp, isIn := t.expired[info.root]; isIn { 169 if info.seq < exp.seq { 170 continue // skip old sequence numbers 171 } else if info.seq == exp.seq && time.Since(exp.time) > treeTIMEOUT { 172 continue // skip expired sequence numbers 173 } 174 } 175 switch { 176 case !info.checkLoops(): 177 // This has a loop, e.g. it's from a child, so skip it 178 case treeLess(info.root, t.self.root): 179 // This is a better root 180 t.self, t.parent = info, p 181 case treeLess(t.self.root, info.root): 182 // This is a worse root, so don't do anything with it 183 case info.seq > t.self.seq: 184 // This is a newer sequence number, so update parent 185 t.self, t.parent = info, p 186 case info.seq < t.self.seq: 187 // This is an older sequnce number, so ignore it 188 case info.time.Before(t.self.time): 189 // This info has been around for longer (e.g. the path is more stable) 190 t.self, t.parent = info, p 191 case info.time.After(t.self.time): 192 // This info has been around for less time (e.g. the path is less stable) 193 // Note that everything after this is extremely unlikely to be reached... 194 case len(info.hops) < len(t.self.hops): 195 // This is a shorter path to the root 196 t.self, t.parent = info, p 197 case len(info.hops) > len(t.self.hops): 198 // This is a longer path to the root, so don't do anything 199 case treeLess(info.from(), t.self.from()): 200 // This peer has a higher key than our current parent 201 t.self, t.parent = info, p 202 } 203 } 204 if t.self != oldSelf { 205 // Reset a timer to make t.self expire at some point 206 t.stimer.Stop() 207 self := t.self 208 var delay time.Duration 209 if t.self.root.equal(t.core.crypto.publicKey) { 210 // We are the root, so we need to expire after treeANNOUNCE to update seq 211 delay = treeANNOUNCE 212 } else { 213 // Figure out when the root needs to time out 214 stopTime := t.expired[t.self.root].time.Add(treeTIMEOUT) 215 delay = time.Until(stopTime) 216 } 217 t.stimer = time.AfterFunc(delay, func() { 218 t.Act(nil, func() { 219 if t.self == self { 220 t.self = nil 221 t.parent = nil 222 t._fix() 223 t._doBootstrap() 224 } 225 }) 226 }) 227 t._sendTree() // Send the tree update to our peers 228 } 229 // Clean up t.expired (remove anything worse than the current root) 230 for skey := range t.expired { 231 key := publicKey(skey) 232 if key.equal(t.self.root) || treeLess(t.self.root, key) { 233 delete(t.expired, skey) 234 } 235 } 236} 237 238// _treeLookup selects the best next hop (in treespace) for the destination 239func (t *dhtree) _treeLookup(dest *treeLabel) *peer { 240 if t.core.crypto.publicKey.equal(dest.key) { 241 return nil 242 } 243 best := t.self 244 bestDist := best.dist(dest) 245 var bestPeer *peer 246 for p, info := range t.tinfos { 247 if !info.root.equal(dest.root) || info.seq != dest.seq { 248 continue 249 } 250 tmp := *info 251 tmp.hops = tmp.hops[:len(tmp.hops)-1] 252 dist := tmp.dist(dest) 253 var isBetter bool 254 switch { 255 case dist < bestDist: 256 isBetter = true 257 case dist > bestDist: 258 case treeLess(info.from(), best.from()): 259 isBetter = true 260 } 261 if isBetter { 262 best = info 263 bestDist = dist 264 bestPeer = p 265 } 266 } 267 if !best.root.equal(dest.root) || best.seq != dest.seq { // TODO? check self, not next/dest? 268 // Dead end, so stay here 269 return nil 270 } 271 return bestPeer 272} 273 274// _dhtLookup selects the next hop needed to route closer to the destination in dht keyspace 275// this only uses the source direction of paths through the dht 276// bootstraps use slightly different logic, since they need to stop short of the destination key 277func (t *dhtree) _dhtLookup(dest publicKey, isBootstrap bool) *peer { 278 // Start by defining variables and helper functions 279 best := t.core.crypto.publicKey 280 var bestPeer *peer 281 var bestInfo *dhtInfo 282 // doUpdate is just to make sure we don't forget to update something 283 doUpdate := func(key publicKey, p *peer, d *dhtInfo) { 284 best, bestPeer, bestInfo = key, p, d 285 } 286 // doCheckedUpdate checks if the provided key is better than the current best, and updates if so 287 doCheckedUpdate := func(key publicKey, p *peer, d *dhtInfo) { 288 switch { 289 case !isBootstrap && key.equal(dest) && !best.equal(dest): 290 fallthrough 291 case dhtOrdered(best, key, dest): 292 doUpdate(key, p, nil) 293 } 294 } 295 // doAncestry updates based on the ancestry information in a treeInfo 296 doAncestry := func(info *treeInfo, p *peer) { 297 doCheckedUpdate(info.root, p, nil) // updates if the root is better 298 for _, hop := range info.hops { 299 doCheckedUpdate(hop.next, p, nil) // updates if this hop is better 300 tinfo := t.tinfos[bestPeer] // may be nil if we're in the middle of a remove 301 if tinfo != nil && best.equal(hop.next) && info.time.Before(tinfo.time) { 302 // This ancestor matches our current next hop, but this peer's treeInfo is better, so switch to it 303 doUpdate(hop.next, p, nil) 304 } 305 } 306 } 307 // doDHT updates best based on a DHT path 308 doDHT := func(info *dhtInfo) { 309 doCheckedUpdate(info.key, info.peer, info) // updates if the source is better 310 if bestInfo != nil && info.key.equal(bestInfo.key) { 311 if treeLess(info.root, bestInfo.root) { 312 doUpdate(info.key, info.peer, info) // same source, but the root is better 313 } else if info.root.equal(bestInfo.root) && info.rootSeq > bestInfo.rootSeq { 314 doUpdate(info.key, info.peer, info) // same source, same root, but the rootSeq is newer 315 } 316 } 317 } 318 // Update the best key and peer 319 // First check if the current best (ourself) is an invalid next hop 320 if (isBootstrap && best.equal(dest)) || dhtOrdered(t.self.root, dest, best) { 321 // We're the current best, and we're already too far through keyspace 322 // That means we need to default to heading towards the root 323 doUpdate(t.self.root, t.parent, nil) 324 } 325 // Update based on the ancestry of our own treeInfo 326 doAncestry(t.self, t.parent) 327 // Update based on the ancestry of our peers 328 for p, info := range t.tinfos { 329 doAncestry(info, p) 330 } 331 // Check peers 332 for p := range t.tinfos { 333 if best.equal(p.key) { 334 // The best next hop is one of our peers 335 // We may have stumbled upon them too early, as the ancestor of another peer 336 // Switch to using the direct route to this peer, just in case 337 doUpdate(p.key, p, nil) 338 } 339 } 340 // Update based on our DHT infos 341 for _, info := range t.dinfos { 342 doDHT(info) 343 } 344 return bestPeer 345} 346 347// _dhtAdd adds a dhtInfo to the dht and returns true 348// it may return false if the path associated with the dhtInfo isn't allowed for some reason 349// e.g. we know a better prev/next for one of the nodes in the path, which can happen if there's multiple split rings that haven't converged on their own yet 350// as of writing, that never happens, it always adds and returns true 351func (t *dhtree) _dhtAdd(info *dhtInfo) bool { 352 // TODO? check existing paths, don't allow this one if the source/dest pair makes no sense 353 t.dinfos[info.getMapKey()] = info 354 return true 355} 356 357// _newBootstrap returns a *dhtBootstrap for this node, using t.self, with a signature 358func (t *dhtree) _newBootstrap() *dhtBootstrap { 359 dbs := new(dhtBootstrap) 360 dbs.label = *t._getLabel() 361 return dbs 362} 363 364// _handleBootstrap takes a bootstrap packet and checks if we know of a better prev for the source node 365// if yes, then we forward to the next hop in the path towards that prev 366// if no, then we reply with a dhtBootstrapAck (unless sanity checks fail) 367func (t *dhtree) _handleBootstrap(bootstrap *dhtBootstrap) { 368 source := bootstrap.label.key 369 if next := t._dhtLookup(source, true); next != nil { 370 next.sendBootstrap(t, bootstrap) 371 return 372 } else if source.equal(t.core.crypto.publicKey) { 373 return 374 } else if !bootstrap.check() { 375 return 376 } 377 ack := new(dhtBootstrapAck) 378 ack.bootstrap = *bootstrap 379 ack.response = *t._getToken(source) 380 t._handleBootstrapAck(ack) 381} 382 383// handleBootstrap is the externally callable actor behavior that sends a message to the dhtree that it should _handleBootstrap 384func (t *dhtree) handleBootstrap(from phony.Actor, bootstrap *dhtBootstrap) { 385 t.Act(from, func() { 386 t._handleBootstrap(bootstrap) 387 }) 388} 389 390// _handleBootstrapAck takes an ack packet and checks if we know a next hop on the tree 391// if yes, then we forward to the next hop 392// if no, then we decide whether or not this node is better than our current prev 393// if yes, then we get rid of our current prev (if any) and start setting up a new path to the response node in the ack 394// if no, then we drop the bootstrap acknowledgement without doing anything 395func (t *dhtree) _handleBootstrapAck(ack *dhtBootstrapAck) { 396 source := ack.response.dest.key 397 next := t._treeLookup(&ack.bootstrap.label) 398 switch { 399 case next != nil: 400 next.sendBootstrapAck(t, ack) 401 return 402 case t.core.crypto.publicKey.equal(source): 403 // This is our own ack, but we failed to find a next hop 404 return 405 case !t.core.crypto.publicKey.equal(ack.bootstrap.label.key): 406 // This isn't an ack of our own bootstrap 407 return 408 case !t.core.crypto.publicKey.equal(ack.response.source): 409 // This is an ack of or own bootstrap, but the token isn't for us 410 return 411 case !ack.response.dest.root.equal(t.self.root): 412 // We have a different root, so tree lookups would fail 413 return 414 case ack.response.dest.seq != t.self.seq: 415 // This response is too old, so path setup would fail 416 return 417 case t.prev == nil: 418 // We have no prev, so anything matching the above is good enough 419 case dhtOrdered(t.dkeys[t.prev], source, t.core.crypto.publicKey): 420 // This is from a better prev than our current one 421 case !source.equal(t.dkeys[t.prev]): 422 // This isn't from the current prev or better, so ignore it 423 return 424 case !t.prev.root.equal(t.self.root) || t.prev.rootSeq != t.self.seq: 425 // The curent prev needs replacing (old tree info) 426 default: 427 // We already have a better (FIXME? or equal) prev 428 return 429 } 430 if !ack.response.check() { 431 // Final thing to check, if the signatures are bad then ignore it 432 return 433 } 434 t.prev = nil 435 for _, dinfo := range t.dinfos { 436 // Former prev need to be notified that we're no longer next 437 // The only way to signal that is by tearing down the path 438 // We may have multiple former prev paths 439 // From t.prev = nil when the tree changes, but kept around to bootstrap 440 // So loop over paths and close any going to a *different* node than the current prev 441 // The current prev can close the old path from that side after setup 442 if dest, isIn := t.dkeys[dinfo]; isIn && !dest.equal(source) { 443 t._teardown(nil, dinfo.getTeardown()) 444 } 445 } 446 setup := t._newSetup(&ack.response) 447 t._handleSetup(nil, setup) 448 if t.prev == nil { 449 // This can happen if the treeLookup in handleSetup fails 450 // FIXME we should avoid letting this happen 451 // E.g. check that the lookup will fail, or at least that the roots match 452 } 453} 454 455// handleBootstrapAck is the externally callable actor behavior that sends a message to the dhtree that it should _handleBootstrapAck 456func (t *dhtree) handleBootstrapAck(from phony.Actor, ack *dhtBootstrapAck) { 457 t.Act(from, func() { 458 t._handleBootstrapAck(ack) 459 }) 460} 461 462// _newSetup returns a *dhtSetup for this node, with a new sequence number and signature 463func (t *dhtree) _newSetup(token *dhtSetupToken) *dhtSetup { 464 t.seq++ 465 setup := new(dhtSetup) 466 setup.seq = t.seq 467 setup.token = *token 468 setup.sig = t.core.crypto.privateKey.sign(setup.bytesForSig()) 469 return setup 470} 471 472// _handleSetup checks if it's safe to add a path from the setup source to the setup destination 473// if we can't add it (due to no next hop to forward it to, or if we're the destination but we already have a better next, or if we already have a path from the same source node), then we send a teardown to remove the path from the network 474// otherwise, we add the path to our table, and forward it (if we're not the destination) or set it as our next path (if we are, tearing down our existing next if one exists) 475func (t *dhtree) _handleSetup(prev *peer, setup *dhtSetup) { 476 next := t._treeLookup(&setup.token.dest) 477 dest := setup.token.dest.key 478 if next == nil && !dest.equal(t.core.crypto.publicKey) { 479 // FIXME? this has problems if prev is self (from changes to tree state?) 480 if prev != nil { 481 prev.sendTeardown(t, setup.getTeardown()) 482 } 483 return 484 } 485 dinfo := new(dhtInfo) 486 dinfo.seq = setup.seq 487 dinfo.key = setup.token.source 488 dinfo.peer = prev 489 dinfo.rest = next 490 dinfo.root = setup.token.dest.root 491 dinfo.rootSeq = setup.token.dest.seq 492 if !dinfo.root.equal(t.self.root) || dinfo.rootSeq != t.self.seq { 493 // Wrong root or mismatched seq 494 if prev != nil { 495 prev.sendTeardown(t, setup.getTeardown()) 496 } 497 return 498 } 499 if _, isIn := t.dinfos[dinfo.getMapKey()]; isIn { 500 // Already have a path from this source 501 if prev != nil { 502 prev.sendTeardown(t, setup.getTeardown()) 503 } 504 return 505 } 506 if !t._dhtAdd(dinfo) { 507 if prev != nil { 508 prev.sendTeardown(t, setup.getTeardown()) 509 } 510 } 511 dinfo.timer = time.AfterFunc(2*treeTIMEOUT, func() { 512 t.Act(nil, func() { 513 // Clean up path if it has timed out 514 if info, isIn := t.dinfos[dinfo.getMapKey()]; isIn { 515 if info.peer != nil { 516 info.peer.sendTeardown(t, info.getTeardown()) 517 } 518 t._teardown(info.peer, info.getTeardown()) 519 } 520 }) 521 }) 522 if prev == nil { 523 // sanity checks, this should only happen when setting up our prev 524 if !setup.token.source.equal(t.core.crypto.publicKey) { 525 panic("wrong source") 526 } else if setup.seq != t.seq { 527 panic("wrong seq") 528 } else if t.prev != nil { 529 panic("already have a prev") 530 } 531 t.prev = dinfo 532 t.dkeys[dinfo] = dest 533 } 534 if next != nil { 535 next.sendSetup(t, setup) 536 } else { 537 if t.next != nil { 538 // TODO get this right! 539 // We need to replace the old next in most cases 540 // The exceptions are when: 541 // 1. The dinfo's root/seq don't match our current root/seq 542 // 2. The dinfo matches, but so does t.next, and t.next is better 543 // What happens when the dinfo matches, t.next does not, but t.next is still better?... 544 // Just doing something for now (replace next) but not sure that's right... 545 var doUpdate bool 546 if !dinfo.root.equal(t.self.root) || dinfo.rootSeq != t.self.seq { 547 // The root/seq is bad, so don't update 548 } else if dinfo.key.equal(t.next.key) { 549 // It's an update from the current next 550 doUpdate = true 551 } else if dhtOrdered(t.core.crypto.publicKey, dinfo.key, t.next.key) { 552 // It's an update from a better next 553 doUpdate = true 554 } 555 if doUpdate { 556 t._teardown(nil, t.next.getTeardown()) 557 t.next = dinfo 558 } else { 559 t._teardown(nil, dinfo.getTeardown()) 560 } 561 } else { 562 t.next = dinfo 563 } 564 } 565} 566 567// handleSetup is the dhtree actor behavior that sends a message to _handleSetup 568func (t *dhtree) handleSetup(from phony.Actor, prev *peer, setup *dhtSetup) { 569 t.Act(from, func() { 570 t._handleSetup(prev, setup) 571 }) 572} 573 574// _teardown removes the path associated with the teardown from our dht and forwards it to the next hop along that path (or does nothing if the teardown doesn't match a known path) 575func (t *dhtree) _teardown(from *peer, teardown *dhtTeardown) { 576 if dinfo, isIn := t.dinfos[teardown.getMapKey()]; isIn { 577 if teardown.seq != dinfo.seq { 578 return 579 } else if !teardown.key.equal(dinfo.key) { 580 panic("this should never happen") 581 } 582 var next *peer 583 if from == dinfo.peer { 584 next = dinfo.rest 585 } else if from == dinfo.rest { 586 next = dinfo.peer 587 } else { 588 return //panic("DEBUG teardown of path from wrong node") 589 } 590 dinfo.timer.Stop() 591 delete(t.dkeys, dinfo) 592 delete(t.dinfos, teardown.getMapKey()) 593 if next != nil { 594 next.sendTeardown(t, teardown) 595 } 596 if t.next == dinfo { 597 t.next = nil 598 } 599 if t.prev == dinfo { 600 t.prev = nil 601 t._doBootstrap() 602 } 603 } 604} 605 606// teardown is the dhtinfo actor behavior that sends a message to _teardown 607func (t *dhtree) teardown(from phony.Actor, p *peer, teardown *dhtTeardown) { 608 t.Act(from, func() { 609 t._teardown(p, teardown) 610 }) 611} 612 613// _doBootstrap decides whether or not to send a bootstrap packet 614// if a bootstrap is sent, then it sets things up to attempt to send another bootstrap at a later point 615func (t *dhtree) _doBootstrap() { 616 //return // FIXME debug tree (root offline -> too much traffic to fix) 617 if t.btimer != nil { 618 if t.prev != nil && t.prev.root.equal(t.self.root) && t.prev.rootSeq == t.self.seq { 619 return 620 } 621 t._handleBootstrap(t._newBootstrap()) 622 t.btimer.Stop() 623 t.btimer = time.AfterFunc(time.Second, func() { t.Act(nil, t._doBootstrap) }) 624 } 625} 626 627// handleDHTTraffic take a dht traffic packet (still marshaled as []bytes) and decides where to forward it to next to take it closer to its destination in keyspace 628// if there's nowhere better to send it, then it hands it off to be read out from the local PacketConn interface 629func (t *dhtree) handleDHTTraffic(from phony.Actor, tr *dhtTraffic, doNotify bool) { 630 t.Act(from, func() { 631 next := t._dhtLookup(tr.dest, false) 632 if next == nil { 633 if tr.dest.equal(t.core.crypto.publicKey) { 634 dest := tr.source 635 t.pathfinder._doNotify(dest, !doNotify) 636 } 637 t.core.pconn.handleTraffic(tr) 638 } else { 639 next.sendDHTTraffic(t, tr) 640 } 641 }) 642} 643 644func (t *dhtree) sendTraffic(from phony.Actor, tr *dhtTraffic) { 645 t.Act(from, func() { 646 if path := t.pathfinder._getPath(tr.dest); path != nil { 647 pt := new(pathTraffic) 648 pt.path = path 649 pt.dt = *tr 650 t.core.peers.handlePathTraffic(t, pt) 651 } else { 652 t.handleDHTTraffic(nil, tr, false) 653 } 654 }) 655} 656 657func (t *dhtree) _getLabel() *treeLabel { 658 // TODO do this once when t.self changes and save it somewhere 659 // (to avoid repeated signing every time we call this) 660 // Fill easy fields of label 661 label := new(treeLabel) 662 label.key = t.core.crypto.publicKey 663 label.root = t.self.root 664 label.seq = t.self.seq 665 for _, hop := range t.self.hops { 666 label.path = append(label.path, hop.port) 667 } 668 label.sig = t.core.crypto.privateKey.sign(label.bytesForSig()) 669 return label 670} 671 672func (t *dhtree) _getToken(source publicKey) *dhtSetupToken { 673 token := new(dhtSetupToken) 674 token.source = source 675 token.dest = *t._getLabel() 676 token.sig = t.core.crypto.privateKey.sign(token.bytesForSig()) 677 return token 678} 679 680/************ 681 * treeInfo * 682 ************/ 683 684type treeInfo struct { 685 time time.Time // Note: *NOT* serialized 686 root publicKey 687 seq uint64 688 hops []treeHop 689} 690 691type treeHop struct { 692 next publicKey 693 port peerPort 694 sig signature 695} 696 697func (info *treeInfo) dest() publicKey { 698 key := info.root 699 if len(info.hops) > 0 { 700 key = info.hops[len(info.hops)-1].next 701 } 702 return key 703} 704 705func (info *treeInfo) from() publicKey { 706 key := info.root 707 if len(info.hops) > 1 { 708 // last hop is to this node, 2nd to last is to the previous hop, which is who this is from 709 key = info.hops[len(info.hops)-2].next 710 } 711 return key 712} 713 714func (info *treeInfo) checkSigs() bool { 715 if len(info.hops) == 0 { 716 return false 717 } 718 var bs []byte 719 key := info.root 720 bs = append(bs, info.root[:]...) 721 seq := make([]byte, 8) 722 binary.BigEndian.PutUint64(seq, info.seq) 723 bs = append(bs, seq...) 724 for _, hop := range info.hops { 725 bs = append(bs, hop.next[:]...) 726 bs = wireEncodeUint(bs, uint64(hop.port)) 727 if !key.verify(bs, &hop.sig) { 728 return false 729 } 730 key = hop.next 731 } 732 return true 733} 734 735func (info *treeInfo) checkLoops() bool { 736 key := info.root 737 keys := make(map[publicKey]bool) // Used to avoid loops 738 for _, hop := range info.hops { 739 if keys[key] { 740 return false 741 } 742 keys[key] = true 743 key = hop.next 744 } 745 return !keys[key] 746} 747 748func (info *treeInfo) add(priv privateKey, next *peer) *treeInfo { 749 var bs []byte 750 bs = append(bs, info.root[:]...) 751 seq := make([]byte, 8) 752 binary.BigEndian.PutUint64(seq, info.seq) 753 bs = append(bs, seq...) 754 for _, hop := range info.hops { 755 bs = append(bs, hop.next[:]...) 756 bs = wireEncodeUint(bs, uint64(hop.port)) 757 } 758 bs = append(bs, next.key[:]...) 759 bs = wireEncodeUint(bs, uint64(next.port)) 760 sig := priv.sign(bs) 761 hop := treeHop{next: next.key, port: next.port, sig: sig} 762 newInfo := *info 763 newInfo.hops = nil 764 newInfo.hops = append(newInfo.hops, info.hops...) 765 newInfo.hops = append(newInfo.hops, hop) 766 return &newInfo 767} 768 769func (info *treeInfo) dist(dest *treeLabel) int { 770 if !info.root.equal(dest.root) { 771 // TODO? also check the root sequence number? 772 return int(^(uint(0)) >> 1) // max int, but you should really check this first 773 } 774 a, b := len(info.hops), len(dest.path) 775 if b < a { 776 a, b = b, a // make 'a' be the smaller value 777 } 778 lcaIdx := -1 // last common ancestor 779 for idx := 0; idx < a; idx++ { 780 if info.hops[idx].port != dest.path[idx] { 781 break 782 } 783 lcaIdx = idx 784 } 785 return a + b - 2*lcaIdx 786} 787 788func (info *treeInfo) encode(out []byte) ([]byte, error) { 789 out = append(out, info.root[:]...) 790 seq := make([]byte, 8) 791 binary.BigEndian.PutUint64(seq, info.seq) 792 out = append(out, seq...) 793 for _, hop := range info.hops { 794 out = append(out, hop.next[:]...) 795 out = wireEncodeUint(out, uint64(hop.port)) 796 out = append(out, hop.sig[:]...) 797 } 798 return out, nil 799} 800 801func (info *treeInfo) decode(data []byte) error { 802 nfo := treeInfo{} 803 if !wireChopSlice(nfo.root[:], &data) { 804 return wireDecodeError 805 } 806 if len(data) >= 8 { 807 nfo.seq = binary.BigEndian.Uint64(data[:8]) 808 data = data[8:] 809 } else { 810 return wireDecodeError 811 } 812 for len(data) > 0 { 813 hop := treeHop{} 814 switch { 815 case !wireChopSlice(hop.next[:], &data): 816 return wireDecodeError 817 case !wireChopUint((*uint64)(&hop.port), &data): 818 return wireDecodeError 819 case !wireChopSlice(hop.sig[:], &data): 820 return wireDecodeError 821 } 822 nfo.hops = append(nfo.hops, hop) 823 } 824 //nfo.time = time.Now() // Set by the dhtree in update 825 *info = nfo 826 return nil 827} 828 829/************* 830 * treeLabel * 831 *************/ 832 833type treeLabel struct { 834 sig signature 835 key publicKey 836 root publicKey 837 seq uint64 838 path []peerPort 839} 840 841func (l *treeLabel) bytesForSig() []byte { 842 var bs []byte 843 bs = append(bs, l.root[:]...) 844 seq := make([]byte, 8) 845 binary.BigEndian.PutUint64(seq, l.seq) 846 bs = append(bs, seq...) 847 bs = wireEncodePath(bs, l.path) 848 return bs 849} 850 851func (l *treeLabel) check() bool { 852 bs := l.bytesForSig() 853 return l.key.verify(bs, &l.sig) 854} 855 856func (l *treeLabel) encode(out []byte) ([]byte, error) { 857 out = append(out, l.sig[:]...) 858 out = append(out, l.key[:]...) 859 out = append(out, l.root[:]...) 860 seq := make([]byte, 8) 861 binary.BigEndian.PutUint64(seq, l.seq) 862 out = append(out, seq...) 863 out = wireEncodePath(out, l.path) 864 return out, nil 865} 866 867func (l *treeLabel) decode(data []byte) error { 868 var tmp treeLabel 869 if !wireChopSlice(tmp.sig[:], &data) { 870 return wireDecodeError 871 } else if !wireChopSlice(tmp.key[:], &data) { 872 return wireDecodeError 873 } else if !wireChopSlice(tmp.root[:], &data) { 874 return wireDecodeError 875 } else if len(data) < 8 { 876 return wireDecodeError 877 } else { 878 tmp.seq = binary.BigEndian.Uint64(data[:8]) 879 data = data[8:] 880 } 881 if !wireChopPath(&tmp.path, &data) { 882 return wireDecodeError 883 } else if len(data) != 0 { 884 return wireDecodeError 885 } 886 *l = tmp 887 return nil 888} 889 890/*********** 891 * dhtInfo * 892 ***********/ 893 894type dhtInfo struct { 895 seq uint64 896 key publicKey 897 peer *peer 898 rest *peer 899 root publicKey 900 rootSeq uint64 901 timer *time.Timer // time.AfterFunc to clean up after timeout, stop this on teardown 902} 903 904func (info *dhtInfo) getTeardown() *dhtTeardown { 905 return &dhtTeardown{seq: info.seq, key: info.key, root: info.root, rootSeq: info.rootSeq} 906} 907 908type dhtMapKey struct { 909 key publicKey 910 root publicKey 911 rootSeq uint64 912} 913 914func (info *dhtInfo) getMapKey() dhtMapKey { 915 return dhtMapKey{info.key, info.root, info.rootSeq} 916} 917 918/**************** 919 * dhtBootstrap * 920 ****************/ 921 922type dhtBootstrap struct { 923 label treeLabel 924} 925 926func (dbs *dhtBootstrap) check() bool { 927 return dbs.label.check() 928} 929 930func (dbs *dhtBootstrap) encode(out []byte) ([]byte, error) { 931 return dbs.label.encode(out) 932} 933 934func (dbs *dhtBootstrap) decode(data []byte) error { 935 var tmp dhtBootstrap 936 if err := tmp.label.decode(data); err != nil { 937 return err 938 } 939 *dbs = tmp 940 return nil 941} 942 943/***************** 944 * dhtSetupToken * 945 *****************/ 946 947// When you send a bootstrap, this is the thing you're trying to get back in a response. 948// It's what lets you open a path to a keyspace neighbor. 949 950// TODO? change the token format? The dest part contains a redundant sig inside of the treeLabel... technically we could reuse it, but that seems weird? 951// Maybe remove the sig from treeLabel, put that in a signedTreeLabel? 952 953type dhtSetupToken struct { 954 sig signature // Signed by dest 955 source publicKey // Who the dest permits a path from 956 dest treeLabel // Path to dest 957} 958 959func (st *dhtSetupToken) bytesForSig() []byte { 960 var bs []byte 961 bs = append(bs, st.source[:]...) 962 var err error 963 if bs, err = st.dest.encode(bs); err != nil { 964 panic("this should never happen") 965 } 966 return bs 967} 968 969// TODO? remove the redundant sig and check? both from same node, one should be a superset of the other... 970 971func (st *dhtSetupToken) check() bool { 972 bs := st.bytesForSig() 973 return st.dest.key.verify(bs, &st.sig) && st.dest.check() 974} 975 976func (st *dhtSetupToken) encode(out []byte) ([]byte, error) { 977 out = append(out, st.sig[:]...) 978 out = append(out, st.source[:]...) 979 return st.dest.encode(out) 980} 981 982func (st *dhtSetupToken) decode(data []byte) error { 983 if !wireChopSlice(st.sig[:], &data) { 984 return wireDecodeError 985 } else if !wireChopSlice(st.source[:], &data) { 986 return wireDecodeError 987 } 988 return st.dest.decode(data) 989} 990 991/******************* 992 * dhtBootstrapAck * 993 *******************/ 994 995type dhtBootstrapAck struct { 996 bootstrap dhtBootstrap 997 response dhtSetupToken 998} 999 1000func (ack *dhtBootstrapAck) check() bool { 1001 return ack.bootstrap.check() && ack.response.check() 1002} 1003 1004func (ack *dhtBootstrapAck) encode(out []byte) ([]byte, error) { 1005 var bootBytes, resBytes []byte // TODO get rid of these 1006 var err error 1007 if bootBytes, err = ack.bootstrap.encode(nil); err != nil { 1008 return nil, err 1009 } else if resBytes, err = ack.response.encode(nil); err != nil { 1010 return nil, err 1011 } 1012 out = wireEncodeUint(out, uint64(len(bootBytes))) 1013 out = append(out, bootBytes...) 1014 out = append(out, resBytes...) 1015 return out, nil 1016} 1017 1018func (ack *dhtBootstrapAck) decode(data []byte) error { 1019 bootLen, begin := wireDecodeUint(data) 1020 end := begin + int(bootLen) 1021 var tmp dhtBootstrapAck 1022 if end > len(data) { 1023 return wireDecodeError 1024 } else if err := tmp.bootstrap.decode(data[begin:end]); err != nil { 1025 return err 1026 } else if err := tmp.response.decode(data[end:]); err != nil { 1027 return err 1028 } 1029 *ack = tmp 1030 return nil 1031} 1032 1033/************ 1034 * dhtSetup * 1035 ************/ 1036 1037type dhtSetup struct { 1038 sig signature 1039 seq uint64 1040 token dhtSetupToken 1041} 1042 1043func (s *dhtSetup) bytesForSig() []byte { 1044 bs := make([]byte, 8) 1045 binary.BigEndian.PutUint64(bs, s.seq) 1046 var err error 1047 if bs, err = s.token.encode(bs); err != nil { 1048 panic("this should never happen") 1049 } 1050 return bs 1051} 1052 1053func (s *dhtSetup) check() bool { 1054 if !s.token.check() { 1055 return false 1056 } 1057 bs := s.bytesForSig() 1058 return s.token.source.verify(bs, &s.sig) 1059} 1060 1061func (s *dhtSetup) getTeardown() *dhtTeardown { 1062 return &dhtTeardown{ 1063 seq: s.seq, 1064 key: s.token.source, 1065 root: s.token.dest.root, 1066 rootSeq: s.token.dest.seq, 1067 } 1068} 1069 1070func (s *dhtSetup) encode(out []byte) ([]byte, error) { 1071 seq := make([]byte, 8) 1072 binary.BigEndian.PutUint64(seq, s.seq) 1073 out = append(out, s.sig[:]...) 1074 out = append(out, seq...) 1075 return s.token.encode(out) 1076} 1077 1078func (s *dhtSetup) decode(data []byte) error { 1079 var tmp dhtSetup 1080 if !wireChopSlice(tmp.sig[:], &data) { 1081 return wireDecodeError 1082 } 1083 if len(data) < 8 { 1084 return wireDecodeError 1085 } 1086 tmp.seq, data = binary.BigEndian.Uint64(data[:8]), data[8:] 1087 if err := tmp.token.decode(data); err != nil { 1088 return err 1089 } 1090 *s = tmp 1091 return nil 1092} 1093 1094/*************** 1095 * dhtTeardown * 1096 ***************/ 1097 1098type dhtTeardown struct { 1099 seq uint64 1100 key publicKey 1101 root publicKey 1102 rootSeq uint64 1103} 1104 1105func (t *dhtTeardown) getMapKey() dhtMapKey { 1106 return dhtMapKey{t.key, t.root, t.rootSeq} 1107} 1108 1109func (t *dhtTeardown) encode(out []byte) ([]byte, error) { 1110 seq := make([]byte, 8) 1111 binary.BigEndian.PutUint64(seq, t.seq) 1112 out = append(out, seq...) 1113 out = append(out, t.key[:]...) 1114 out = append(out, t.root[:]...) 1115 rseq := make([]byte, 8) 1116 binary.BigEndian.PutUint64(rseq, t.rootSeq) 1117 out = append(out, rseq...) 1118 return out, nil 1119} 1120 1121func (t *dhtTeardown) decode(data []byte) error { 1122 var tmp dhtTeardown 1123 if len(data) < 8 { 1124 return wireDecodeError 1125 } 1126 tmp.seq, data = binary.BigEndian.Uint64(data[:8]), data[8:] 1127 if !wireChopSlice(tmp.key[:], &data) { 1128 return wireDecodeError 1129 } else if !wireChopSlice(tmp.root[:], &data) { 1130 return wireDecodeError 1131 } else if len(data) != 8 { 1132 return wireDecodeError 1133 } 1134 tmp.rootSeq = binary.BigEndian.Uint64(data) 1135 *t = tmp 1136 return nil 1137} 1138 1139/************** 1140 * dhtTraffic * 1141 **************/ 1142 1143type dhtTraffic struct { 1144 source publicKey 1145 dest publicKey 1146 kind byte // in-band vs out-of-band, TODO? separate type? 1147 payload []byte 1148} 1149 1150func (t *dhtTraffic) encode(out []byte) ([]byte, error) { 1151 out = append(out, t.source[:]...) 1152 out = append(out, t.dest[:]...) 1153 out = append(out, t.kind) 1154 out = append(out, t.payload...) 1155 return out, nil 1156} 1157 1158func (t *dhtTraffic) decode(data []byte) error { 1159 var tmp dhtTraffic 1160 if !wireChopSlice(tmp.source[:], &data) { 1161 return wireDecodeError 1162 } else if !wireChopSlice(tmp.dest[:], &data) { 1163 return wireDecodeError 1164 } 1165 if len(data) < 1 { 1166 return wireDecodeError 1167 } 1168 tmp.kind, data = data[0], data[1:] 1169 tmp.payload = append(tmp.payload[:0], data...) 1170 *t = tmp 1171 return nil 1172} 1173 1174/********************* 1175 * utility functions * 1176 *********************/ 1177 1178func treeLess(key1, key2 publicKey) bool { 1179 for idx := range key1 { 1180 switch { 1181 case key1[idx] < key2[idx]: 1182 return true 1183 case key1[idx] > key2[idx]: 1184 return false 1185 } 1186 } 1187 return false 1188} 1189 1190func dhtOrdered(first, second, third publicKey) bool { 1191 return treeLess(first, second) && treeLess(second, third) 1192} 1193