1package gostream 2 3import ( 4 "context" 5 "net" 6 7 "github.com/libp2p/go-libp2p-core/host" 8 "github.com/libp2p/go-libp2p-core/network" 9 "github.com/libp2p/go-libp2p-core/protocol" 10) 11 12// listener is an implementation of net.Listener which handles 13// http-tagged streams from a libp2p connection. 14// A listener can be built with Listen() 15type listener struct { 16 host host.Host 17 ctx context.Context 18 tag protocol.ID 19 cancel func() 20 streamCh chan network.Stream 21} 22 23// Accept returns the next a connection to this listener. 24// It blocks if there are no connections. Under the hood, 25// connections are libp2p streams. 26func (l *listener) Accept() (net.Conn, error) { 27 select { 28 case s := <-l.streamCh: 29 return newConn(s), nil 30 case <-l.ctx.Done(): 31 return nil, l.ctx.Err() 32 } 33} 34 35// Close terminates this listener. It will no longer handle any 36// incoming streams 37func (l *listener) Close() error { 38 l.cancel() 39 l.host.RemoveStreamHandler(l.tag) 40 return nil 41} 42 43// Addr returns the address for this listener, which is its libp2p Peer ID. 44func (l *listener) Addr() net.Addr { 45 return &addr{l.host.ID()} 46} 47 48// Listen provides a standard net.Listener ready to accept "connections". 49// Under the hood, these connections are libp2p streams tagged with the 50// given protocol.ID. 51func Listen(h host.Host, tag protocol.ID) (net.Listener, error) { 52 ctx, cancel := context.WithCancel(context.Background()) 53 54 l := &listener{ 55 host: h, 56 ctx: ctx, 57 cancel: cancel, 58 tag: tag, 59 streamCh: make(chan network.Stream), 60 } 61 62 h.SetStreamHandler(tag, func(s network.Stream) { 63 select { 64 case l.streamCh <- s: 65 case <-ctx.Done(): 66 s.Reset() 67 } 68 }) 69 70 return l, nil 71} 72