1package yggdrasil 2 3// This is the session manager 4// It's responsible for keeping track of open sessions to other nodes 5// The session information consists of crypto keys and coords 6 7import ( 8 "bytes" 9 "sync" 10 "time" 11 12 "github.com/yggdrasil-network/yggdrasil-go/src/address" 13 "github.com/yggdrasil-network/yggdrasil-go/src/crypto" 14 "github.com/yggdrasil-network/yggdrasil-go/src/util" 15 16 "github.com/Arceliar/phony" 17) 18 19// Duration that we keep track of old nonces per session, to allow some out-of-order packet delivery 20const nonceWindow = time.Second 21 22// All the information we know about an active session. 23// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. 24type sessionInfo struct { 25 phony.Inbox // Protects all of the below, use it any time you read/change the contents of a session 26 sessions *sessions // 27 theirAddr address.Address // 28 theirSubnet address.Subnet // 29 theirPermPub crypto.BoxPubKey // 30 theirSesPub crypto.BoxPubKey // 31 mySesPub crypto.BoxPubKey // 32 mySesPriv crypto.BoxPrivKey // 33 sharedPermKey crypto.BoxSharedKey // used for session pings 34 sharedSesKey crypto.BoxSharedKey // derived from session keys 35 theirHandle crypto.Handle // 36 myHandle crypto.Handle // 37 theirNonce crypto.BoxNonce // 38 myNonce crypto.BoxNonce // 39 theirMTU uint16 // 40 myMTU uint16 // 41 wasMTUFixed bool // Was the MTU fixed by a receive error? 42 timeOpened time.Time // Time the sessino was opened 43 time time.Time // Time we last received a packet 44 mtuTime time.Time // time myMTU was last changed 45 pingTime time.Time // time the first ping was sent since the last received packet 46 pingSend time.Time // time the last ping was sent 47 coords []byte // coords of destination 48 reset bool // reset if coords change 49 tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation 50 bytesSent uint64 // Bytes of real traffic sent in this session 51 bytesRecvd uint64 // Bytes of real traffic received in this session 52 init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use 53 cancel util.Cancellation // Used to terminate workers 54 conn *Conn // The associated Conn object 55 callbacks []chan func() // Finished work from crypto workers 56} 57 58func (sinfo *sessionInfo) reconfigure() { 59 // This is where reconfiguration would go, if we had anything to do 60} 61 62// Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. 63type sessionPing struct { 64 SendPermPub crypto.BoxPubKey // Sender's permanent key 65 Handle crypto.Handle // Random number to ID session 66 SendSesPub crypto.BoxPubKey // Session key to use 67 Coords []byte // 68 Tstamp int64 // unix time, but the only real requirement is that it increases 69 IsPong bool // 70 MTU uint16 // 71} 72 73// Updates session info in response to a ping, after checking that the ping is OK. 74// Returns true if the session was updated, or false otherwise. 75func (s *sessionInfo) _update(p *sessionPing) bool { 76 if !(p.Tstamp > s.tstamp) { 77 // To protect against replay attacks 78 return false 79 } 80 if p.SendPermPub != s.theirPermPub { 81 // Should only happen if two sessions got the same handle 82 // That shouldn't be allowed anyway, but if it happens then let one time out 83 return false 84 } 85 if p.SendSesPub != s.theirSesPub { 86 s.theirSesPub = p.SendSesPub 87 s.theirHandle = p.Handle 88 s.sharedSesKey = *crypto.GetSharedKey(&s.mySesPriv, &s.theirSesPub) 89 s.theirNonce = crypto.BoxNonce{} 90 } 91 if p.MTU >= 1280 || p.MTU == 0 { 92 s.theirMTU = p.MTU 93 if s.conn != nil { 94 s.conn.setMTU(s, s._getMTU()) 95 } 96 } 97 if !bytes.Equal(s.coords, p.Coords) { 98 // allocate enough space for additional coords 99 s.coords = append(make([]byte, 0, len(p.Coords)+11), p.Coords...) 100 } 101 s.time = time.Now() 102 s.tstamp = p.Tstamp 103 s.reset = false 104 defer func() { recover() }() // Recover if the below panics 105 select { 106 case <-s.init: 107 default: 108 // Unblock anything waiting for the session to initialize 109 close(s.init) 110 } 111 return true 112} 113 114// Struct of all active sessions. 115// Sessions are indexed by handle. 116// Additionally, stores maps of address/subnet onto keys, and keys onto handles. 117type sessions struct { 118 router *router 119 listener *Listener 120 listenerMutex sync.Mutex 121 lastCleanup time.Time 122 isAllowedHandler func(pubkey *crypto.BoxPubKey, initiator bool) bool // Returns true or false if session setup is allowed 123 isAllowedMutex sync.RWMutex // Protects the above 124 permShared map[crypto.BoxPubKey]*crypto.BoxSharedKey // Maps known permanent keys to their shared key, used by DHT a lot 125 sinfos map[crypto.Handle]*sessionInfo // Maps handle onto session info 126 byTheirPerm map[crypto.BoxPubKey]*crypto.Handle // Maps theirPermPub onto handle 127} 128 129// Initializes the session struct. 130func (ss *sessions) init(r *router) { 131 ss.router = r 132 ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey) 133 ss.sinfos = make(map[crypto.Handle]*sessionInfo) 134 ss.byTheirPerm = make(map[crypto.BoxPubKey]*crypto.Handle) 135 ss.lastCleanup = time.Now() 136} 137 138func (ss *sessions) reconfigure() { 139 for _, session := range ss.sinfos { 140 session.reconfigure() 141 } 142} 143 144// Determines whether the session with a given publickey is allowed based on 145// session firewall rules. 146func (ss *sessions) isSessionAllowed(pubkey *crypto.BoxPubKey, initiator bool) bool { 147 ss.isAllowedMutex.RLock() 148 defer ss.isAllowedMutex.RUnlock() 149 150 if ss.isAllowedHandler == nil { 151 return true 152 } 153 154 return ss.isAllowedHandler(pubkey, initiator) 155} 156 157// Gets the session corresponding to a given handle. 158func (ss *sessions) getSessionForHandle(handle *crypto.Handle) (*sessionInfo, bool) { 159 sinfo, isIn := ss.sinfos[*handle] 160 return sinfo, isIn 161} 162 163// Gets a session corresponding to a permanent key used by the remote node. 164func (ss *sessions) getByTheirPerm(key *crypto.BoxPubKey) (*sessionInfo, bool) { 165 h, isIn := ss.byTheirPerm[*key] 166 if !isIn { 167 return nil, false 168 } 169 sinfo, isIn := ss.getSessionForHandle(h) 170 return sinfo, isIn 171} 172 173// Creates a new session and lazily cleans up old existing sessions. This 174// includse initializing session info to sane defaults (e.g. lowest supported 175// MTU). 176func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { 177 // TODO: this check definitely needs to be moved 178 if !ss.isSessionAllowed(theirPermKey, true) { 179 return nil 180 } 181 sinfo := sessionInfo{} 182 sinfo.sessions = ss 183 sinfo.theirPermPub = *theirPermKey 184 sinfo.sharedPermKey = *ss.getSharedKey(&ss.router.core.boxPriv, &sinfo.theirPermPub) 185 pub, priv := crypto.NewBoxKeys() 186 sinfo.mySesPub = *pub 187 sinfo.mySesPriv = *priv 188 sinfo.myNonce = *crypto.NewBoxNonce() 189 sinfo.theirMTU = 1280 190 ss.router.core.config.Mutex.RLock() 191 sinfo.myMTU = uint16(ss.router.core.config.Current.IfMTU) 192 ss.router.core.config.Mutex.RUnlock() 193 now := time.Now() 194 sinfo.timeOpened = now 195 sinfo.time = now 196 sinfo.mtuTime = now 197 sinfo.pingTime = now 198 sinfo.pingSend = now 199 sinfo.init = make(chan struct{}) 200 sinfo.cancel = util.NewCancellation() 201 higher := false 202 for idx := range ss.router.core.boxPub { 203 if ss.router.core.boxPub[idx] > sinfo.theirPermPub[idx] { 204 higher = true 205 break 206 } else if ss.router.core.boxPub[idx] < sinfo.theirPermPub[idx] { 207 break 208 } 209 } 210 if higher { 211 // higher => odd nonce 212 sinfo.myNonce[len(sinfo.myNonce)-1] |= 0x01 213 } else { 214 // lower => even nonce 215 sinfo.myNonce[len(sinfo.myNonce)-1] &= 0xfe 216 } 217 sinfo.myHandle = *crypto.NewHandle() 218 sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) 219 sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) 220 ss.sinfos[sinfo.myHandle] = &sinfo 221 ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle 222 return &sinfo 223} 224 225func (ss *sessions) cleanup() { 226 // Time thresholds almost certainly could use some adjusting 227 for k := range ss.permShared { 228 // Delete a key, to make sure this eventually shrinks to 0 229 delete(ss.permShared, k) 230 break 231 } 232 if time.Since(ss.lastCleanup) < time.Minute { 233 return 234 } 235 permShared := make(map[crypto.BoxPubKey]*crypto.BoxSharedKey, len(ss.permShared)) 236 for k, v := range ss.permShared { 237 permShared[k] = v 238 } 239 ss.permShared = permShared 240 sinfos := make(map[crypto.Handle]*sessionInfo, len(ss.sinfos)) 241 for k, v := range ss.sinfos { 242 sinfos[k] = v 243 } 244 ss.sinfos = sinfos 245 byTheirPerm := make(map[crypto.BoxPubKey]*crypto.Handle, len(ss.byTheirPerm)) 246 for k, v := range ss.byTheirPerm { 247 byTheirPerm[k] = v 248 } 249 ss.byTheirPerm = byTheirPerm 250 ss.lastCleanup = time.Now() 251} 252 253func (sinfo *sessionInfo) doRemove() { 254 sinfo.sessions.router.Act(nil, func() { 255 sinfo.sessions.removeSession(sinfo) 256 }) 257} 258 259// Closes a session, removing it from sessions maps. 260func (ss *sessions) removeSession(sinfo *sessionInfo) { 261 if s := sinfo.sessions.sinfos[sinfo.myHandle]; s == sinfo { 262 delete(sinfo.sessions.sinfos, sinfo.myHandle) 263 delete(sinfo.sessions.byTheirPerm, sinfo.theirPermPub) 264 } 265} 266 267// Returns a session ping appropriate for the given session info. 268func (sinfo *sessionInfo) _getPing() sessionPing { 269 loc := sinfo.sessions.router.core.switchTable.getLocator() 270 coords := loc.getCoords() 271 ping := sessionPing{ 272 SendPermPub: sinfo.sessions.router.core.boxPub, 273 Handle: sinfo.myHandle, 274 SendSesPub: sinfo.mySesPub, 275 Tstamp: time.Now().Unix(), 276 Coords: coords, 277 MTU: sinfo.myMTU, 278 } 279 sinfo.myNonce.Increment() 280 return ping 281} 282 283// Gets the shared key for a pair of box keys. 284// Used to cache recently used shared keys for protocol traffic. 285// This comes up with dht req/res and session ping/pong traffic. 286func (ss *sessions) getSharedKey(myPriv *crypto.BoxPrivKey, 287 theirPub *crypto.BoxPubKey) *crypto.BoxSharedKey { 288 return crypto.GetSharedKey(myPriv, theirPub) 289 // FIXME concurrency issues with the below, so for now we just burn the CPU every time 290 if skey, isIn := ss.permShared[*theirPub]; isIn { 291 return skey 292 } 293 // First do some cleanup 294 const maxKeys = 1024 295 for key := range ss.permShared { 296 // Remove a random key until the store is small enough 297 if len(ss.permShared) < maxKeys { 298 break 299 } 300 delete(ss.permShared, key) 301 } 302 ss.permShared[*theirPub] = crypto.GetSharedKey(myPriv, theirPub) 303 return ss.permShared[*theirPub] 304} 305 306// Sends a session ping by calling sendPingPong in ping mode. 307func (sinfo *sessionInfo) ping(from phony.Actor) { 308 sinfo.Act(from, func() { 309 sinfo._sendPingPong(false) 310 }) 311} 312 313// Calls getPing, sets the appropriate ping/pong flag, encodes to wire format, and send it. 314// Updates the time the last ping was sent in the session info. 315func (sinfo *sessionInfo) _sendPingPong(isPong bool) { 316 ping := sinfo._getPing() 317 ping.IsPong = isPong 318 bs := ping.encode() 319 payload, nonce := crypto.BoxSeal(&sinfo.sharedPermKey, bs, nil) 320 p := wire_protoTrafficPacket{ 321 Coords: sinfo.coords, 322 ToKey: sinfo.theirPermPub, 323 FromKey: sinfo.sessions.router.core.boxPub, 324 Nonce: *nonce, 325 Payload: payload, 326 } 327 packet := p.encode() 328 // TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first 329 sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.out(packet) }) 330 if sinfo.pingTime.Before(sinfo.time) { 331 sinfo.pingTime = time.Now() 332 } 333} 334 335func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) { 336 sinfo.Act(from, func() { 337 sinfo.conn = conn 338 sinfo.conn.setMTU(sinfo, sinfo._getMTU()) 339 }) 340} 341 342// Handles a session ping, creating a session if needed and calling update, then possibly responding with a pong if the ping was in ping mode and the update was successful. 343// If the session has a packet cached (common when first setting up a session), it will be sent. 344func (ss *sessions) handlePing(ping *sessionPing) { 345 // Get the corresponding session (or create a new session) 346 sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub) 347 switch { 348 case ping.IsPong: // This is a response, not an initial ping, so ignore it. 349 case isIn: // Session already exists 350 case !ss.isSessionAllowed(&ping.SendPermPub, false): // Session is not allowed 351 default: 352 ss.listenerMutex.Lock() 353 if ss.listener != nil { 354 // This is a ping from an allowed node for which no session exists, and we have a listener ready to handle sessions. 355 // We need to create a session and pass it to the listener. 356 sinfo = ss.createSession(&ping.SendPermPub) 357 if s, _ := ss.getByTheirPerm(&ping.SendPermPub); s != sinfo { 358 panic("This should not happen") 359 } 360 conn := newConn(ss.router.core, crypto.GetNodeID(&sinfo.theirPermPub), &crypto.NodeID{}, sinfo) 361 for i := range conn.nodeMask { 362 conn.nodeMask[i] = 0xFF 363 } 364 sinfo.setConn(ss.router, conn) 365 c := ss.listener.conn 366 go func() { c <- conn }() 367 } 368 ss.listenerMutex.Unlock() 369 } 370 if sinfo != nil { 371 sinfo.Act(ss.router, func() { 372 // Update the session 373 if !sinfo._update(ping) { /*panic("Should not happen in testing")*/ 374 return 375 } 376 if !ping.IsPong { 377 sinfo._sendPingPong(true) 378 } 379 }) 380 } 381} 382 383// Get the MTU of the session. 384// Will be equal to the smaller of this node's MTU or the remote node's MTU. 385// If sending over links with a maximum message size (this was a thing with the old UDP code), it could be further lowered, to a minimum of 1280. 386func (sinfo *sessionInfo) _getMTU() uint16 { 387 if sinfo.theirMTU == 0 || sinfo.myMTU == 0 { 388 return 0 389 } 390 if sinfo.theirMTU < sinfo.myMTU { 391 return sinfo.theirMTU 392 } 393 return sinfo.myMTU 394} 395 396// Checks if a packet's nonce is recent enough to fall within the window of allowed packets, and not already received. 397func (sinfo *sessionInfo) _nonceIsOK(theirNonce *crypto.BoxNonce) bool { 398 // The bitmask is to allow for some non-duplicate out-of-order packets 399 if theirNonce.Minus(&sinfo.theirNonce) > 0 { 400 // This is newer than the newest nonce we've seen 401 return true 402 } 403 return time.Since(sinfo.time) < nonceWindow 404} 405 406// Updates the nonce mask by (possibly) shifting the bitmask and setting the bit corresponding to this nonce to 1, and then updating the most recent nonce 407func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) { 408 if theirNonce.Minus(&sinfo.theirNonce) > 0 { 409 // This nonce is the newest we've seen, so make a note of that 410 sinfo.theirNonce = *theirNonce 411 sinfo.time = time.Now() 412 } 413} 414 415// Resets all sessions to an uninitialized state. 416// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. 417// Only call this from the router actor. 418func (ss *sessions) reset() { 419 for _, _sinfo := range ss.sinfos { 420 sinfo := _sinfo // So we can safely put it in a closure 421 sinfo.Act(ss.router, func() { 422 sinfo.reset = true 423 }) 424 } 425} 426 427//////////////////////////////////////////////////////////////////////////////// 428//////////////////////////// Worker Functions Below //////////////////////////// 429//////////////////////////////////////////////////////////////////////////////// 430 431type sessionCryptoManager struct { 432 phony.Inbox 433} 434 435func (m *sessionCryptoManager) workerGo(from phony.Actor, f func()) { 436 m.Act(from, func() { 437 util.WorkerGo(f) 438 }) 439} 440 441var manager = sessionCryptoManager{} 442 443type FlowKeyMessage struct { 444 FlowKey uint64 445 Message []byte 446} 447 448func (sinfo *sessionInfo) recv(from phony.Actor, packet *wire_trafficPacket) { 449 sinfo.Act(from, func() { 450 sinfo._recvPacket(packet) 451 }) 452} 453 454func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) { 455 select { 456 case <-sinfo.init: 457 default: 458 // TODO find a better way to drop things until initialized 459 util.PutBytes(p.Payload) 460 return 461 } 462 if !sinfo._nonceIsOK(&p.Nonce) { 463 util.PutBytes(p.Payload) 464 return 465 } 466 k := sinfo.sharedSesKey 467 var isOK bool 468 var bs []byte 469 ch := make(chan func(), 1) 470 poolFunc := func() { 471 bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce) 472 callback := func() { 473 util.PutBytes(p.Payload) 474 if !isOK || k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) { 475 // Either we failed to decrypt, or the session was updated, or we 476 // received this packet in the mean time 477 util.PutBytes(bs) 478 return 479 } 480 sinfo._updateNonce(&p.Nonce) 481 sinfo.bytesRecvd += uint64(len(bs)) 482 sinfo.conn.recvMsg(sinfo, bs) 483 } 484 ch <- callback 485 sinfo.checkCallbacks() 486 } 487 sinfo.callbacks = append(sinfo.callbacks, ch) 488 manager.workerGo(sinfo, poolFunc) 489} 490 491func (sinfo *sessionInfo) _send(msg FlowKeyMessage) { 492 select { 493 case <-sinfo.init: 494 default: 495 // TODO find a better way to drop things until initialized 496 util.PutBytes(msg.Message) 497 return 498 } 499 sinfo.bytesSent += uint64(len(msg.Message)) 500 coords := append([]byte(nil), sinfo.coords...) 501 if msg.FlowKey != 0 { 502 coords = append(coords, 0) 503 coords = append(coords, wire_encode_uint64(msg.FlowKey)...) 504 } 505 p := wire_trafficPacket{ 506 Coords: coords, 507 Handle: sinfo.theirHandle, 508 Nonce: sinfo.myNonce, 509 } 510 sinfo.myNonce.Increment() 511 k := sinfo.sharedSesKey 512 ch := make(chan func(), 1) 513 poolFunc := func() { 514 p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce) 515 callback := func() { 516 // Encoding may block on a util.GetBytes(), so kept out of the worker pool 517 packet := p.encode() 518 // Cleanup 519 util.PutBytes(msg.Message) 520 util.PutBytes(p.Payload) 521 // Send the packet 522 // TODO replace this with a send to the peer struct if that becomes an actor 523 sinfo.sessions.router.Act(sinfo, func() { 524 sinfo.sessions.router.out(packet) 525 }) 526 } 527 ch <- callback 528 sinfo.checkCallbacks() 529 } 530 sinfo.callbacks = append(sinfo.callbacks, ch) 531 manager.workerGo(sinfo, poolFunc) 532} 533 534func (sinfo *sessionInfo) checkCallbacks() { 535 sinfo.Act(nil, func() { 536 if len(sinfo.callbacks) > 0 { 537 select { 538 case callback := <-sinfo.callbacks[0]: 539 sinfo.callbacks = sinfo.callbacks[1:] 540 callback() 541 sinfo.checkCallbacks() 542 default: 543 } 544 } 545 }) 546} 547