1package blankhost 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 9 "github.com/libp2p/go-libp2p-core/connmgr" 10 "github.com/libp2p/go-libp2p-core/event" 11 "github.com/libp2p/go-libp2p-core/host" 12 "github.com/libp2p/go-libp2p-core/network" 13 "github.com/libp2p/go-libp2p-core/peer" 14 "github.com/libp2p/go-libp2p-core/peerstore" 15 "github.com/libp2p/go-libp2p-core/protocol" 16 "github.com/libp2p/go-libp2p-core/record" 17 18 "github.com/libp2p/go-eventbus" 19 20 logging "github.com/ipfs/go-log" 21 22 ma "github.com/multiformats/go-multiaddr" 23 mstream "github.com/multiformats/go-multistream" 24) 25 26var log = logging.Logger("blankhost") 27 28// BlankHost is the thinnest implementation of the host.Host interface 29type BlankHost struct { 30 n network.Network 31 mux *mstream.MultistreamMuxer 32 cmgr connmgr.ConnManager 33 eventbus event.Bus 34 emitters struct { 35 evtLocalProtocolsUpdated event.Emitter 36 } 37} 38 39type config struct { 40 cmgr connmgr.ConnManager 41} 42 43type Option = func(cfg *config) 44 45func WithConnectionManager(cmgr connmgr.ConnManager) Option { 46 return func(cfg *config) { 47 cfg.cmgr = cmgr 48 } 49} 50 51func NewBlankHost(n network.Network, options ...Option) *BlankHost { 52 cfg := config{ 53 cmgr: &connmgr.NullConnMgr{}, 54 } 55 for _, opt := range options { 56 opt(&cfg) 57 } 58 59 bh := &BlankHost{ 60 n: n, 61 cmgr: cfg.cmgr, 62 mux: mstream.NewMultistreamMuxer(), 63 eventbus: eventbus.NewBus(), 64 } 65 66 // subscribe the connection manager to network notifications (has no effect with NullConnMgr) 67 n.Notify(bh.cmgr.Notifee()) 68 69 var err error 70 if bh.emitters.evtLocalProtocolsUpdated, err = bh.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil { 71 return nil 72 } 73 74 n.SetStreamHandler(bh.newStreamHandler) 75 76 // persist a signed peer record for self to the peerstore. 77 if err := bh.initSignedRecord(); err != nil { 78 log.Errorf("error creating blank host, err=%s", err) 79 return nil 80 } 81 82 return bh 83} 84 85func (bh *BlankHost) initSignedRecord() error { 86 cab, ok := peerstore.GetCertifiedAddrBook(bh.n.Peerstore()) 87 if !ok { 88 log.Error("peerstore does not support signed records") 89 return errors.New("peerstore does not support signed records") 90 } 91 rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{bh.ID(), bh.Addrs()}) 92 ev, err := record.Seal(rec, bh.Peerstore().PrivKey(bh.ID())) 93 if err != nil { 94 log.Errorf("failed to create signed record for self, err=%s", err) 95 return fmt.Errorf("failed to create signed record for self, err=%s", err) 96 } 97 _, err = cab.ConsumePeerRecord(ev, peerstore.PermanentAddrTTL) 98 if err != nil { 99 log.Errorf("failed to persist signed record to peerstore,err=%s", err) 100 return fmt.Errorf("failed to persist signed record for self, err=%s", err) 101 } 102 return err 103} 104 105var _ host.Host = (*BlankHost)(nil) 106 107func (bh *BlankHost) Addrs() []ma.Multiaddr { 108 addrs, err := bh.n.InterfaceListenAddresses() 109 if err != nil { 110 log.Debug("error retrieving network interface addrs: ", err) 111 return nil 112 } 113 114 return addrs 115} 116 117func (bh *BlankHost) Close() error { 118 return bh.n.Close() 119} 120 121func (bh *BlankHost) Connect(ctx context.Context, ai peer.AddrInfo) error { 122 // absorb addresses into peerstore 123 bh.Peerstore().AddAddrs(ai.ID, ai.Addrs, peerstore.TempAddrTTL) 124 125 cs := bh.n.ConnsToPeer(ai.ID) 126 if len(cs) > 0 { 127 return nil 128 } 129 130 _, err := bh.Network().DialPeer(ctx, ai.ID) 131 return err 132} 133 134func (bh *BlankHost) Peerstore() peerstore.Peerstore { 135 return bh.n.Peerstore() 136} 137 138func (bh *BlankHost) ID() peer.ID { 139 return bh.n.LocalPeer() 140} 141 142func (bh *BlankHost) NewStream(ctx context.Context, p peer.ID, protos ...protocol.ID) (network.Stream, error) { 143 s, err := bh.n.NewStream(ctx, p) 144 if err != nil { 145 return nil, err 146 } 147 148 var protoStrs []string 149 for _, pid := range protos { 150 protoStrs = append(protoStrs, string(pid)) 151 } 152 153 selected, err := mstream.SelectOneOf(protoStrs, s) 154 if err != nil { 155 s.Reset() 156 return nil, err 157 } 158 159 selpid := protocol.ID(selected) 160 s.SetProtocol(selpid) 161 bh.Peerstore().AddProtocols(p, selected) 162 163 return s, nil 164} 165 166func (bh *BlankHost) RemoveStreamHandler(pid protocol.ID) { 167 bh.Mux().RemoveHandler(string(pid)) 168 bh.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{ 169 Removed: []protocol.ID{pid}, 170 }) 171} 172 173func (bh *BlankHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) { 174 bh.Mux().AddHandler(string(pid), func(p string, rwc io.ReadWriteCloser) error { 175 is := rwc.(network.Stream) 176 is.SetProtocol(protocol.ID(p)) 177 handler(is) 178 return nil 179 }) 180 bh.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{ 181 Added: []protocol.ID{pid}, 182 }) 183} 184 185func (bh *BlankHost) SetStreamHandlerMatch(pid protocol.ID, m func(string) bool, handler network.StreamHandler) { 186 bh.Mux().AddHandlerWithFunc(string(pid), m, func(p string, rwc io.ReadWriteCloser) error { 187 is := rwc.(network.Stream) 188 is.SetProtocol(protocol.ID(p)) 189 handler(is) 190 return nil 191 }) 192 bh.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{ 193 Added: []protocol.ID{pid}, 194 }) 195} 196 197// newStreamHandler is the remote-opened stream handler for network.Network 198func (bh *BlankHost) newStreamHandler(s network.Stream) { 199 protoID, handle, err := bh.Mux().Negotiate(s) 200 if err != nil { 201 log.Infow("protocol negotiation failed", "error", err) 202 s.Reset() 203 return 204 } 205 206 s.SetProtocol(protocol.ID(protoID)) 207 208 go handle(protoID, s) 209} 210 211// TODO: i'm not sure this really needs to be here 212func (bh *BlankHost) Mux() protocol.Switch { 213 return bh.mux 214} 215 216// TODO: also not sure this fits... Might be better ways around this (leaky abstractions) 217func (bh *BlankHost) Network() network.Network { 218 return bh.n 219} 220 221func (bh *BlankHost) ConnManager() connmgr.ConnManager { 222 return bh.cmgr 223} 224 225func (bh *BlankHost) EventBus() event.Bus { 226 return bh.eventbus 227} 228