1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package trust 5 6import ( 7 "context" 8 "math/rand" 9 "sort" 10 "sync" 11 "time" 12 13 "github.com/spacemonkeygo/monkit/v3" 14 "github.com/zeebo/errs" 15 "go.uber.org/zap" 16 17 "storj.io/common/identity" 18 "storj.io/common/rpc" 19 "storj.io/common/signing" 20 "storj.io/common/storj" 21 "storj.io/common/sync2" 22 "storj.io/storj/storagenode/satellites" 23) 24 25// Error is the default error class. 26var ( 27 Error = errs.Class("trust") 28 29 mon = monkit.Package() 30) 31 32// IdentityResolver resolves peer identities from a node URL. 33type IdentityResolver interface { 34 // ResolveIdentity returns the peer identity of the peer located at the Node URL 35 ResolveIdentity(ctx context.Context, url storj.NodeURL) (*identity.PeerIdentity, error) 36} 37 38// IdentityResolverFunc is a convenience type for implementing IdentityResolver using a 39// function literal. 40type IdentityResolverFunc func(ctx context.Context, url storj.NodeURL) (*identity.PeerIdentity, error) 41 42// ResolveIdentity returns the peer identity of the peer located at the Node URL. 43func (fn IdentityResolverFunc) ResolveIdentity(ctx context.Context, url storj.NodeURL) (*identity.PeerIdentity, error) { 44 return fn(ctx, url) 45} 46 47// Dialer implements an IdentityResolver using an RPC dialer. 48func Dialer(dialer rpc.Dialer) IdentityResolver { 49 return IdentityResolverFunc(func(ctx context.Context, url storj.NodeURL) (_ *identity.PeerIdentity, err error) { 50 defer mon.Task()(&ctx)(&err) 51 52 conn, err := dialer.DialNodeURL(ctx, url) 53 if err != nil { 54 return nil, err 55 } 56 defer func() { err = errs.Combine(err, conn.Close()) }() 57 return conn.PeerIdentity() 58 }) 59} 60 61// Pool implements different peer verifications. 62// 63// architecture: Service 64type Pool struct { 65 log *zap.Logger 66 resolver IdentityResolver 67 refreshInterval time.Duration 68 69 listMu sync.Mutex 70 list *List 71 72 satellitesDB satellites.DB 73 74 satellitesMu sync.RWMutex 75 satellites map[storj.NodeID]*satelliteInfoCache 76} 77 78// satelliteInfoCache caches identity information about a satellite. 79type satelliteInfoCache struct { 80 mu sync.Mutex 81 url storj.NodeURL 82 identity *identity.PeerIdentity 83} 84 85// NewPool creates a new trust pool of the specified list of trusted satellites. 86func NewPool(log *zap.Logger, resolver IdentityResolver, config Config, satellitesDB satellites.DB) (*Pool, error) { 87 // TODO: preload all satellite peer identities 88 89 cache, err := LoadCache(config.CachePath) 90 if err != nil { 91 return nil, err 92 } 93 94 list, err := NewList(log, config.Sources, config.Exclusions.Rules, cache) 95 if err != nil { 96 return nil, err 97 } 98 99 return &Pool{ 100 log: log, 101 resolver: resolver, 102 refreshInterval: config.RefreshInterval, 103 list: list, 104 satellitesDB: satellitesDB, 105 satellites: make(map[storj.NodeID]*satelliteInfoCache), 106 }, nil 107} 108 109// Run periodically refreshes the pool. The initial refresh is intended to 110// happen before run is call. Therefore Run does not refresh right away. 111func (pool *Pool) Run(ctx context.Context) error { 112 for { 113 refreshAfter := jitter(pool.refreshInterval) 114 pool.log.Info("Scheduling next refresh", zap.Duration("after", refreshAfter)) 115 if !sync2.Sleep(ctx, refreshAfter) { 116 return ctx.Err() 117 } 118 if err := pool.Refresh(ctx); err != nil { 119 pool.log.Error("Failed to refresh", zap.Error(err)) 120 return err 121 } 122 123 for _, trustedSatellite := range pool.satellites { 124 if err := pool.satellitesDB.SetAddress(ctx, trustedSatellite.url.ID, trustedSatellite.url.Address); err != nil { 125 return err 126 } 127 } 128 } 129} 130 131// VerifySatelliteID checks whether id corresponds to a trusted satellite. 132func (pool *Pool) VerifySatelliteID(ctx context.Context, id storj.NodeID) (err error) { 133 defer mon.Task()(&ctx)(&err) 134 135 _, err = pool.getInfo(id) 136 return err 137} 138 139// GetSignee gets the corresponding signee for verifying signatures. 140// It ignores passed in ctx cancellation to avoid miscaching between concurrent requests. 141func (pool *Pool) GetSignee(ctx context.Context, id storj.NodeID) (_ signing.Signee, err error) { 142 defer mon.Task()(&ctx)(&err) 143 144 info, err := pool.getInfo(id) 145 if err != nil { 146 return nil, err 147 } 148 149 info.mu.Lock() 150 defer info.mu.Unlock() 151 152 if info.identity == nil { 153 identity, err := pool.resolver.ResolveIdentity(ctx, info.url) 154 if err != nil { 155 return nil, Error.Wrap(err) 156 } 157 info.identity = identity 158 } 159 160 return signing.SigneeFromPeerIdentity(info.identity), nil 161} 162 163// GetSatellites returns a slice containing all trusted satellites. 164func (pool *Pool) GetSatellites(ctx context.Context) (satellites []storj.NodeID) { 165 defer mon.Task()(&ctx)(nil) 166 for sat := range pool.satellites { 167 satellites = append(satellites, sat) 168 } 169 sort.Sort(storj.NodeIDList(satellites)) 170 return satellites 171} 172 173// GetNodeURL returns the node url of a satellite in the trusted list. 174func (pool *Pool) GetNodeURL(ctx context.Context, id storj.NodeID) (_ storj.NodeURL, err error) { 175 defer mon.Task()(&ctx)(&err) 176 177 info, err := pool.getInfo(id) 178 if err != nil { 179 return storj.NodeURL{}, err 180 } 181 return info.url, nil 182} 183 184// Refresh refreshes the set of trusted satellites in the pool. Concurrent 185// callers will be synchronized so only one proceeds at a time. 186func (pool *Pool) Refresh(ctx context.Context) error { 187 urls, err := pool.fetchURLs(ctx) 188 if err != nil { 189 return err 190 } 191 192 pool.satellitesMu.Lock() 193 defer pool.satellitesMu.Unlock() 194 195 // add/update trusted IDs 196 trustedIDs := make(map[storj.NodeID]struct{}) 197 for _, url := range urls { 198 trustedIDs[url.ID] = struct{}{} 199 200 info, ok := pool.satellites[url.ID] 201 if !ok { 202 info = &satelliteInfoCache{ 203 url: url, 204 } 205 pool.log.Debug("Satellite is trusted", zap.String("id", url.ID.String())) 206 pool.satellites[url.ID] = info 207 } 208 209 // update the URL address and reset the identity if it changed 210 if info.url.Address != url.Address { 211 pool.log.Debug("Satellite address updated; identity cache purged", 212 zap.String("id", url.ID.String()), 213 zap.String("old", info.url.Address), 214 zap.String("new", url.Address), 215 ) 216 info.url.Address = url.Address 217 info.identity = nil 218 } 219 } 220 221 // remove trusted IDs that are no longer in the URL list 222 for id := range pool.satellites { 223 if _, ok := trustedIDs[id]; !ok { 224 pool.log.Debug("Satellite is no longer trusted", zap.String("id", id.String())) 225 delete(pool.satellites, id) 226 } 227 } 228 229 return nil 230} 231 232func (pool *Pool) getInfo(id storj.NodeID) (*satelliteInfoCache, error) { 233 pool.satellitesMu.RLock() 234 defer pool.satellitesMu.RUnlock() 235 236 info, ok := pool.satellites[id] 237 if !ok { 238 return nil, Error.New("satellite %q is untrusted", id) 239 } 240 return info, nil 241} 242 243func (pool *Pool) fetchURLs(ctx context.Context) ([]storj.NodeURL, error) { 244 // Typically there will only be one caller of refresh (i.e. Run()) but 245 // if at some point we might want on-demand refresh, and *List is designed 246 // to be used by a single goroutine (don't want multiple callers racing 247 // on the cache, etc). 248 pool.listMu.Lock() 249 defer pool.listMu.Unlock() 250 return pool.list.FetchURLs(ctx) 251} 252 253func jitter(t time.Duration) time.Duration { 254 nanos := rand.NormFloat64()*float64(t/4) + float64(t) 255 if nanos <= 0 { 256 nanos = 1 257 } 258 return time.Duration(nanos) 259} 260