1package mocknet 2 3import ( 4 "container/list" 5 "context" 6 "strconv" 7 "sync" 8 "sync/atomic" 9 10 process "github.com/jbenet/goprocess" 11 ic "github.com/libp2p/go-libp2p-core/crypto" 12 "github.com/libp2p/go-libp2p-core/network" 13 "github.com/libp2p/go-libp2p-core/peer" 14 ma "github.com/multiformats/go-multiaddr" 15 manet "github.com/multiformats/go-multiaddr/net" 16) 17 18var connCounter int64 19 20// conn represents one side's perspective of a 21// live connection between two peers. 22// it goes over a particular link. 23type conn struct { 24 notifLk sync.Mutex 25 26 id int64 27 28 local peer.ID 29 remote peer.ID 30 31 localAddr ma.Multiaddr 32 remoteAddr ma.Multiaddr 33 34 localPrivKey ic.PrivKey 35 remotePubKey ic.PubKey 36 37 net *peernet 38 link *link 39 rconn *conn // counterpart 40 streams list.List 41 stat network.Stat 42 43 pairProc, connProc process.Process 44 45 sync.RWMutex 46} 47 48func newConn(p process.Process, ln, rn *peernet, l *link, dir network.Direction) *conn { 49 c := &conn{net: ln, link: l, pairProc: p} 50 c.local = ln.peer 51 c.remote = rn.peer 52 c.stat = network.Stat{Direction: dir} 53 c.id = atomic.AddInt64(&connCounter, 1) 54 55 c.localAddr = ln.ps.Addrs(ln.peer)[0] 56 for _, a := range rn.ps.Addrs(rn.peer) { 57 if !manet.IsIPUnspecified(a) { 58 c.remoteAddr = a 59 break 60 } 61 } 62 if c.remoteAddr == nil { 63 c.remoteAddr = rn.ps.Addrs(rn.peer)[0] 64 } 65 66 c.localPrivKey = ln.ps.PrivKey(ln.peer) 67 c.remotePubKey = rn.ps.PubKey(rn.peer) 68 c.connProc = process.WithParent(c.pairProc) 69 return c 70} 71 72func (c *conn) ID() string { 73 return strconv.FormatInt(c.id, 10) 74} 75 76func (c *conn) Close() error { 77 return c.pairProc.Close() 78} 79 80func (c *conn) setup() { 81 c.connProc.SetTeardown(c.teardown) 82} 83 84func (c *conn) teardown() error { 85 for _, s := range c.allStreams() { 86 s.Reset() 87 } 88 c.net.removeConn(c) 89 90 go func() { 91 c.notifLk.Lock() 92 defer c.notifLk.Unlock() 93 c.net.notifyAll(func(n network.Notifiee) { 94 n.Disconnected(c.net, c) 95 }) 96 }() 97 return nil 98} 99 100func (c *conn) addStream(s *stream) { 101 c.Lock() 102 s.conn = c 103 c.streams.PushBack(s) 104 s.notifLk.Lock() 105 defer s.notifLk.Unlock() 106 c.Unlock() 107 c.net.notifyAll(func(n network.Notifiee) { 108 n.OpenedStream(c.net, s) 109 }) 110} 111 112func (c *conn) removeStream(s *stream) { 113 c.Lock() 114 for e := c.streams.Front(); e != nil; e = e.Next() { 115 if s == e.Value { 116 c.streams.Remove(e) 117 break 118 } 119 } 120 c.Unlock() 121 122 go func() { 123 s.notifLk.Lock() 124 defer s.notifLk.Unlock() 125 s.conn.net.notifyAll(func(n network.Notifiee) { 126 n.ClosedStream(s.conn.net, s) 127 }) 128 }() 129} 130 131func (c *conn) allStreams() []network.Stream { 132 c.RLock() 133 defer c.RUnlock() 134 135 strs := make([]network.Stream, 0, c.streams.Len()) 136 for e := c.streams.Front(); e != nil; e = e.Next() { 137 s := e.Value.(*stream) 138 strs = append(strs, s) 139 } 140 return strs 141} 142 143func (c *conn) remoteOpenedStream(s *stream) { 144 c.addStream(s) 145 c.net.handleNewStream(s) 146} 147 148func (c *conn) openStream() *stream { 149 sl, sr := newStreamPair() 150 go c.rconn.remoteOpenedStream(sr) 151 c.addStream(sl) 152 return sl 153} 154 155func (c *conn) NewStream(context.Context) (network.Stream, error) { 156 log.Debugf("Conn.NewStreamWithProtocol: %s --> %s", c.local, c.remote) 157 158 s := c.openStream() 159 return s, nil 160} 161 162func (c *conn) GetStreams() []network.Stream { 163 return c.allStreams() 164} 165 166// LocalMultiaddr is the Multiaddr on this side 167func (c *conn) LocalMultiaddr() ma.Multiaddr { 168 return c.localAddr 169} 170 171// LocalPeer is the Peer on our side of the connection 172func (c *conn) LocalPeer() peer.ID { 173 return c.local 174} 175 176// LocalPrivateKey is the private key of the peer on our side. 177func (c *conn) LocalPrivateKey() ic.PrivKey { 178 return c.localPrivKey 179} 180 181// RemoteMultiaddr is the Multiaddr on the remote side 182func (c *conn) RemoteMultiaddr() ma.Multiaddr { 183 return c.remoteAddr 184} 185 186// RemotePeer is the Peer on the remote side 187func (c *conn) RemotePeer() peer.ID { 188 return c.remote 189} 190 191// RemotePublicKey is the private key of the peer on our side. 192func (c *conn) RemotePublicKey() ic.PubKey { 193 return c.remotePubKey 194} 195 196// Stat returns metadata about the connection 197func (c *conn) Stat() network.Stat { 198 return c.stat 199} 200