1package yggdrasil
2
3// This is the session manager
4// It's responsible for keeping track of open sessions to other nodes
5// The session information consists of crypto keys and coords
6
7import (
8	"bytes"
9	"sync"
10	"time"
11
12	"github.com/yggdrasil-network/yggdrasil-go/src/address"
13	"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
14	"github.com/yggdrasil-network/yggdrasil-go/src/util"
15
16	"github.com/Arceliar/phony"
17)
18
19// Duration that we keep track of old nonces per session, to allow some out-of-order packet delivery
20const nonceWindow = time.Second
21
22// All the information we know about an active session.
23// This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API.
24type sessionInfo struct {
25	phony.Inbox                       // Protects all of the below, use it any time you read/change the contents of a session
26	sessions      *sessions           //
27	theirAddr     address.Address     //
28	theirSubnet   address.Subnet      //
29	theirPermPub  crypto.BoxPubKey    //
30	theirSesPub   crypto.BoxPubKey    //
31	mySesPub      crypto.BoxPubKey    //
32	mySesPriv     crypto.BoxPrivKey   //
33	sharedPermKey crypto.BoxSharedKey // used for session pings
34	sharedSesKey  crypto.BoxSharedKey // derived from session keys
35	theirHandle   crypto.Handle       //
36	myHandle      crypto.Handle       //
37	theirNonce    crypto.BoxNonce     //
38	myNonce       crypto.BoxNonce     //
39	theirMTU      uint16              //
40	myMTU         uint16              //
41	wasMTUFixed   bool                // Was the MTU fixed by a receive error?
42	timeOpened    time.Time           // Time the sessino was opened
43	time          time.Time           // Time we last received a packet
44	mtuTime       time.Time           // time myMTU was last changed
45	pingTime      time.Time           // time the first ping was sent since the last received packet
46	pingSend      time.Time           // time the last ping was sent
47	coords        []byte              // coords of destination
48	reset         bool                // reset if coords change
49	tstamp        int64               // ATOMIC - tstamp from their last session ping, replay attack mitigation
50	bytesSent     uint64              // Bytes of real traffic sent in this session
51	bytesRecvd    uint64              // Bytes of real traffic received in this session
52	init          chan struct{}       // Closed when the first session pong arrives, used to signal that the session is ready for initial use
53	cancel        util.Cancellation   // Used to terminate workers
54	conn          *Conn               // The associated Conn object
55	callbacks     []chan func()       // Finished work from crypto workers
56}
57
58func (sinfo *sessionInfo) reconfigure() {
59	// This is where reconfiguration would go, if we had anything to do
60}
61
62// Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU.
63type sessionPing struct {
64	SendPermPub crypto.BoxPubKey // Sender's permanent key
65	Handle      crypto.Handle    // Random number to ID session
66	SendSesPub  crypto.BoxPubKey // Session key to use
67	Coords      []byte           //
68	Tstamp      int64            // unix time, but the only real requirement is that it increases
69	IsPong      bool             //
70	MTU         uint16           //
71}
72
73// Updates session info in response to a ping, after checking that the ping is OK.
74// Returns true if the session was updated, or false otherwise.
75func (s *sessionInfo) _update(p *sessionPing) bool {
76	if !(p.Tstamp > s.tstamp) {
77		// To protect against replay attacks
78		return false
79	}
80	if p.SendPermPub != s.theirPermPub {
81		// Should only happen if two sessions got the same handle
82		// That shouldn't be allowed anyway, but if it happens then let one time out
83		return false
84	}
85	if p.SendSesPub != s.theirSesPub {
86		s.theirSesPub = p.SendSesPub
87		s.theirHandle = p.Handle
88		s.sharedSesKey = *crypto.GetSharedKey(&s.mySesPriv, &s.theirSesPub)
89		s.theirNonce = crypto.BoxNonce{}
90	}
91	if p.MTU >= 1280 || p.MTU == 0 {
92		s.theirMTU = p.MTU
93		if s.conn != nil {
94			s.conn.setMTU(s, s._getMTU())
95		}
96	}
97	if !bytes.Equal(s.coords, p.Coords) {
98		// allocate enough space for additional coords
99		s.coords = append(make([]byte, 0, len(p.Coords)+11), p.Coords...)
100	}
101	s.time = time.Now()
102	s.tstamp = p.Tstamp
103	s.reset = false
104	defer func() { recover() }() // Recover if the below panics
105	select {
106	case <-s.init:
107	default:
108		// Unblock anything waiting for the session to initialize
109		close(s.init)
110	}
111	return true
112}
113
114// Struct of all active sessions.
115// Sessions are indexed by handle.
116// Additionally, stores maps of address/subnet onto keys, and keys onto handles.
117type sessions struct {
118	router           *router
119	listener         *Listener
120	listenerMutex    sync.Mutex
121	lastCleanup      time.Time
122	isAllowedHandler func(pubkey *crypto.BoxPubKey, initiator bool) bool // Returns true or false if session setup is allowed
123	isAllowedMutex   sync.RWMutex                                        // Protects the above
124	permShared       map[crypto.BoxPubKey]*crypto.BoxSharedKey           // Maps known permanent keys to their shared key, used by DHT a lot
125	sinfos           map[crypto.Handle]*sessionInfo                      // Maps handle onto session info
126	byTheirPerm      map[crypto.BoxPubKey]*crypto.Handle                 // Maps theirPermPub onto handle
127}
128
129// Initializes the session struct.
130func (ss *sessions) init(r *router) {
131	ss.router = r
132	ss.permShared = make(map[crypto.BoxPubKey]*crypto.BoxSharedKey)
133	ss.sinfos = make(map[crypto.Handle]*sessionInfo)
134	ss.byTheirPerm = make(map[crypto.BoxPubKey]*crypto.Handle)
135	ss.lastCleanup = time.Now()
136}
137
138func (ss *sessions) reconfigure() {
139	for _, session := range ss.sinfos {
140		session.reconfigure()
141	}
142}
143
144// Determines whether the session with a given publickey is allowed based on
145// session firewall rules.
146func (ss *sessions) isSessionAllowed(pubkey *crypto.BoxPubKey, initiator bool) bool {
147	ss.isAllowedMutex.RLock()
148	defer ss.isAllowedMutex.RUnlock()
149
150	if ss.isAllowedHandler == nil {
151		return true
152	}
153
154	return ss.isAllowedHandler(pubkey, initiator)
155}
156
157// Gets the session corresponding to a given handle.
158func (ss *sessions) getSessionForHandle(handle *crypto.Handle) (*sessionInfo, bool) {
159	sinfo, isIn := ss.sinfos[*handle]
160	return sinfo, isIn
161}
162
163// Gets a session corresponding to a permanent key used by the remote node.
164func (ss *sessions) getByTheirPerm(key *crypto.BoxPubKey) (*sessionInfo, bool) {
165	h, isIn := ss.byTheirPerm[*key]
166	if !isIn {
167		return nil, false
168	}
169	sinfo, isIn := ss.getSessionForHandle(h)
170	return sinfo, isIn
171}
172
173// Creates a new session and lazily cleans up old existing sessions. This
174// includse initializing session info to sane defaults (e.g. lowest supported
175// MTU).
176func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
177	// TODO: this check definitely needs to be moved
178	if !ss.isSessionAllowed(theirPermKey, true) {
179		return nil
180	}
181	sinfo := sessionInfo{}
182	sinfo.sessions = ss
183	sinfo.theirPermPub = *theirPermKey
184	sinfo.sharedPermKey = *ss.getSharedKey(&ss.router.core.boxPriv, &sinfo.theirPermPub)
185	pub, priv := crypto.NewBoxKeys()
186	sinfo.mySesPub = *pub
187	sinfo.mySesPriv = *priv
188	sinfo.myNonce = *crypto.NewBoxNonce()
189	sinfo.theirMTU = 1280
190	ss.router.core.config.Mutex.RLock()
191	sinfo.myMTU = uint16(ss.router.core.config.Current.IfMTU)
192	ss.router.core.config.Mutex.RUnlock()
193	now := time.Now()
194	sinfo.timeOpened = now
195	sinfo.time = now
196	sinfo.mtuTime = now
197	sinfo.pingTime = now
198	sinfo.pingSend = now
199	sinfo.init = make(chan struct{})
200	sinfo.cancel = util.NewCancellation()
201	higher := false
202	for idx := range ss.router.core.boxPub {
203		if ss.router.core.boxPub[idx] > sinfo.theirPermPub[idx] {
204			higher = true
205			break
206		} else if ss.router.core.boxPub[idx] < sinfo.theirPermPub[idx] {
207			break
208		}
209	}
210	if higher {
211		// higher => odd nonce
212		sinfo.myNonce[len(sinfo.myNonce)-1] |= 0x01
213	} else {
214		// lower => even nonce
215		sinfo.myNonce[len(sinfo.myNonce)-1] &= 0xfe
216	}
217	sinfo.myHandle = *crypto.NewHandle()
218	sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
219	sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
220	ss.sinfos[sinfo.myHandle] = &sinfo
221	ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
222	return &sinfo
223}
224
225func (ss *sessions) cleanup() {
226	// Time thresholds almost certainly could use some adjusting
227	for k := range ss.permShared {
228		// Delete a key, to make sure this eventually shrinks to 0
229		delete(ss.permShared, k)
230		break
231	}
232	if time.Since(ss.lastCleanup) < time.Minute {
233		return
234	}
235	permShared := make(map[crypto.BoxPubKey]*crypto.BoxSharedKey, len(ss.permShared))
236	for k, v := range ss.permShared {
237		permShared[k] = v
238	}
239	ss.permShared = permShared
240	sinfos := make(map[crypto.Handle]*sessionInfo, len(ss.sinfos))
241	for k, v := range ss.sinfos {
242		sinfos[k] = v
243	}
244	ss.sinfos = sinfos
245	byTheirPerm := make(map[crypto.BoxPubKey]*crypto.Handle, len(ss.byTheirPerm))
246	for k, v := range ss.byTheirPerm {
247		byTheirPerm[k] = v
248	}
249	ss.byTheirPerm = byTheirPerm
250	ss.lastCleanup = time.Now()
251}
252
253func (sinfo *sessionInfo) doRemove() {
254	sinfo.sessions.router.Act(nil, func() {
255		sinfo.sessions.removeSession(sinfo)
256	})
257}
258
259// Closes a session, removing it from sessions maps.
260func (ss *sessions) removeSession(sinfo *sessionInfo) {
261	if s := sinfo.sessions.sinfos[sinfo.myHandle]; s == sinfo {
262		delete(sinfo.sessions.sinfos, sinfo.myHandle)
263		delete(sinfo.sessions.byTheirPerm, sinfo.theirPermPub)
264	}
265}
266
267// Returns a session ping appropriate for the given session info.
268func (sinfo *sessionInfo) _getPing() sessionPing {
269	loc := sinfo.sessions.router.core.switchTable.getLocator()
270	coords := loc.getCoords()
271	ping := sessionPing{
272		SendPermPub: sinfo.sessions.router.core.boxPub,
273		Handle:      sinfo.myHandle,
274		SendSesPub:  sinfo.mySesPub,
275		Tstamp:      time.Now().Unix(),
276		Coords:      coords,
277		MTU:         sinfo.myMTU,
278	}
279	sinfo.myNonce.Increment()
280	return ping
281}
282
283// Gets the shared key for a pair of box keys.
284// Used to cache recently used shared keys for protocol traffic.
285// This comes up with dht req/res and session ping/pong traffic.
286func (ss *sessions) getSharedKey(myPriv *crypto.BoxPrivKey,
287	theirPub *crypto.BoxPubKey) *crypto.BoxSharedKey {
288	return crypto.GetSharedKey(myPriv, theirPub)
289	// FIXME concurrency issues with the below, so for now we just burn the CPU every time
290	if skey, isIn := ss.permShared[*theirPub]; isIn {
291		return skey
292	}
293	// First do some cleanup
294	const maxKeys = 1024
295	for key := range ss.permShared {
296		// Remove a random key until the store is small enough
297		if len(ss.permShared) < maxKeys {
298			break
299		}
300		delete(ss.permShared, key)
301	}
302	ss.permShared[*theirPub] = crypto.GetSharedKey(myPriv, theirPub)
303	return ss.permShared[*theirPub]
304}
305
306// Sends a session ping by calling sendPingPong in ping mode.
307func (sinfo *sessionInfo) ping(from phony.Actor) {
308	sinfo.Act(from, func() {
309		sinfo._sendPingPong(false)
310	})
311}
312
313// Calls getPing, sets the appropriate ping/pong flag, encodes to wire format, and send it.
314// Updates the time the last ping was sent in the session info.
315func (sinfo *sessionInfo) _sendPingPong(isPong bool) {
316	ping := sinfo._getPing()
317	ping.IsPong = isPong
318	bs := ping.encode()
319	payload, nonce := crypto.BoxSeal(&sinfo.sharedPermKey, bs, nil)
320	p := wire_protoTrafficPacket{
321		Coords:  sinfo.coords,
322		ToKey:   sinfo.theirPermPub,
323		FromKey: sinfo.sessions.router.core.boxPub,
324		Nonce:   *nonce,
325		Payload: payload,
326	}
327	packet := p.encode()
328	// TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first
329	sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.out(packet) })
330	if sinfo.pingTime.Before(sinfo.time) {
331		sinfo.pingTime = time.Now()
332	}
333}
334
335func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) {
336	sinfo.Act(from, func() {
337		sinfo.conn = conn
338		sinfo.conn.setMTU(sinfo, sinfo._getMTU())
339	})
340}
341
342// Handles a session ping, creating a session if needed and calling update, then possibly responding with a pong if the ping was in ping mode and the update was successful.
343// If the session has a packet cached (common when first setting up a session), it will be sent.
344func (ss *sessions) handlePing(ping *sessionPing) {
345	// Get the corresponding session (or create a new session)
346	sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub)
347	switch {
348	case ping.IsPong: // This is a response, not an initial ping, so ignore it.
349	case isIn: // Session already exists
350	case !ss.isSessionAllowed(&ping.SendPermPub, false): // Session is not allowed
351	default:
352		ss.listenerMutex.Lock()
353		if ss.listener != nil {
354			// This is a ping from an allowed node for which no session exists, and we have a listener ready to handle sessions.
355			// We need to create a session and pass it to the listener.
356			sinfo = ss.createSession(&ping.SendPermPub)
357			if s, _ := ss.getByTheirPerm(&ping.SendPermPub); s != sinfo {
358				panic("This should not happen")
359			}
360			conn := newConn(ss.router.core, crypto.GetNodeID(&sinfo.theirPermPub), &crypto.NodeID{}, sinfo)
361			for i := range conn.nodeMask {
362				conn.nodeMask[i] = 0xFF
363			}
364			sinfo.setConn(ss.router, conn)
365			c := ss.listener.conn
366			go func() { c <- conn }()
367		}
368		ss.listenerMutex.Unlock()
369	}
370	if sinfo != nil {
371		sinfo.Act(ss.router, func() {
372			// Update the session
373			if !sinfo._update(ping) { /*panic("Should not happen in testing")*/
374				return
375			}
376			if !ping.IsPong {
377				sinfo._sendPingPong(true)
378			}
379		})
380	}
381}
382
383// Get the MTU of the session.
384// Will be equal to the smaller of this node's MTU or the remote node's MTU.
385// If sending over links with a maximum message size (this was a thing with the old UDP code), it could be further lowered, to a minimum of 1280.
386func (sinfo *sessionInfo) _getMTU() uint16 {
387	if sinfo.theirMTU == 0 || sinfo.myMTU == 0 {
388		return 0
389	}
390	if sinfo.theirMTU < sinfo.myMTU {
391		return sinfo.theirMTU
392	}
393	return sinfo.myMTU
394}
395
396// Checks if a packet's nonce is recent enough to fall within the window of allowed packets, and not already received.
397func (sinfo *sessionInfo) _nonceIsOK(theirNonce *crypto.BoxNonce) bool {
398	// The bitmask is to allow for some non-duplicate out-of-order packets
399	if theirNonce.Minus(&sinfo.theirNonce) > 0 {
400		// This is newer than the newest nonce we've seen
401		return true
402	}
403	return time.Since(sinfo.time) < nonceWindow
404}
405
406// Updates the nonce mask by (possibly) shifting the bitmask and setting the bit corresponding to this nonce to 1, and then updating the most recent nonce
407func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) {
408	if theirNonce.Minus(&sinfo.theirNonce) > 0 {
409		// This nonce is the newest we've seen, so make a note of that
410		sinfo.theirNonce = *theirNonce
411		sinfo.time = time.Now()
412	}
413}
414
415// Resets all sessions to an uninitialized state.
416// Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change.
417// Only call this from the router actor.
418func (ss *sessions) reset() {
419	for _, _sinfo := range ss.sinfos {
420		sinfo := _sinfo // So we can safely put it in a closure
421		sinfo.Act(ss.router, func() {
422			sinfo.reset = true
423		})
424	}
425}
426
427////////////////////////////////////////////////////////////////////////////////
428//////////////////////////// Worker Functions Below ////////////////////////////
429////////////////////////////////////////////////////////////////////////////////
430
431type sessionCryptoManager struct {
432	phony.Inbox
433}
434
435func (m *sessionCryptoManager) workerGo(from phony.Actor, f func()) {
436	m.Act(from, func() {
437		util.WorkerGo(f)
438	})
439}
440
441var manager = sessionCryptoManager{}
442
443type FlowKeyMessage struct {
444	FlowKey uint64
445	Message []byte
446}
447
448func (sinfo *sessionInfo) recv(from phony.Actor, packet *wire_trafficPacket) {
449	sinfo.Act(from, func() {
450		sinfo._recvPacket(packet)
451	})
452}
453
454func (sinfo *sessionInfo) _recvPacket(p *wire_trafficPacket) {
455	select {
456	case <-sinfo.init:
457	default:
458		// TODO find a better way to drop things until initialized
459		util.PutBytes(p.Payload)
460		return
461	}
462	if !sinfo._nonceIsOK(&p.Nonce) {
463		util.PutBytes(p.Payload)
464		return
465	}
466	k := sinfo.sharedSesKey
467	var isOK bool
468	var bs []byte
469	ch := make(chan func(), 1)
470	poolFunc := func() {
471		bs, isOK = crypto.BoxOpen(&k, p.Payload, &p.Nonce)
472		callback := func() {
473			util.PutBytes(p.Payload)
474			if !isOK || k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) {
475				// Either we failed to decrypt, or the session was updated, or we
476				// received this packet in the mean time
477				util.PutBytes(bs)
478				return
479			}
480			sinfo._updateNonce(&p.Nonce)
481			sinfo.bytesRecvd += uint64(len(bs))
482			sinfo.conn.recvMsg(sinfo, bs)
483		}
484		ch <- callback
485		sinfo.checkCallbacks()
486	}
487	sinfo.callbacks = append(sinfo.callbacks, ch)
488	manager.workerGo(sinfo, poolFunc)
489}
490
491func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
492	select {
493	case <-sinfo.init:
494	default:
495		// TODO find a better way to drop things until initialized
496		util.PutBytes(msg.Message)
497		return
498	}
499	sinfo.bytesSent += uint64(len(msg.Message))
500	coords := append([]byte(nil), sinfo.coords...)
501	if msg.FlowKey != 0 {
502		coords = append(coords, 0)
503		coords = append(coords, wire_encode_uint64(msg.FlowKey)...)
504	}
505	p := wire_trafficPacket{
506		Coords: coords,
507		Handle: sinfo.theirHandle,
508		Nonce:  sinfo.myNonce,
509	}
510	sinfo.myNonce.Increment()
511	k := sinfo.sharedSesKey
512	ch := make(chan func(), 1)
513	poolFunc := func() {
514		p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce)
515		callback := func() {
516			// Encoding may block on a util.GetBytes(), so kept out of the worker pool
517			packet := p.encode()
518			// Cleanup
519			util.PutBytes(msg.Message)
520			util.PutBytes(p.Payload)
521			// Send the packet
522			// TODO replace this with a send to the peer struct if that becomes an actor
523			sinfo.sessions.router.Act(sinfo, func() {
524				sinfo.sessions.router.out(packet)
525			})
526		}
527		ch <- callback
528		sinfo.checkCallbacks()
529	}
530	sinfo.callbacks = append(sinfo.callbacks, ch)
531	manager.workerGo(sinfo, poolFunc)
532}
533
534func (sinfo *sessionInfo) checkCallbacks() {
535	sinfo.Act(nil, func() {
536		if len(sinfo.callbacks) > 0 {
537			select {
538			case callback := <-sinfo.callbacks[0]:
539				sinfo.callbacks = sinfo.callbacks[1:]
540				callback()
541				sinfo.checkCallbacks()
542			default:
543			}
544		}
545	})
546}
547