1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package trust
5
6import (
7	"context"
8	"math/rand"
9	"sort"
10	"sync"
11	"time"
12
13	"github.com/spacemonkeygo/monkit/v3"
14	"github.com/zeebo/errs"
15	"go.uber.org/zap"
16
17	"storj.io/common/identity"
18	"storj.io/common/rpc"
19	"storj.io/common/signing"
20	"storj.io/common/storj"
21	"storj.io/common/sync2"
22	"storj.io/storj/storagenode/satellites"
23)
24
25// Error is the default error class.
26var (
27	Error = errs.Class("trust")
28
29	mon = monkit.Package()
30)
31
32// IdentityResolver resolves peer identities from a node URL.
33type IdentityResolver interface {
34	// ResolveIdentity returns the peer identity of the peer located at the Node URL
35	ResolveIdentity(ctx context.Context, url storj.NodeURL) (*identity.PeerIdentity, error)
36}
37
38// IdentityResolverFunc is a convenience type for implementing IdentityResolver using a
39// function literal.
40type IdentityResolverFunc func(ctx context.Context, url storj.NodeURL) (*identity.PeerIdentity, error)
41
42// ResolveIdentity returns the peer identity of the peer located at the Node URL.
43func (fn IdentityResolverFunc) ResolveIdentity(ctx context.Context, url storj.NodeURL) (*identity.PeerIdentity, error) {
44	return fn(ctx, url)
45}
46
47// Dialer implements an IdentityResolver using an RPC dialer.
48func Dialer(dialer rpc.Dialer) IdentityResolver {
49	return IdentityResolverFunc(func(ctx context.Context, url storj.NodeURL) (_ *identity.PeerIdentity, err error) {
50		defer mon.Task()(&ctx)(&err)
51
52		conn, err := dialer.DialNodeURL(ctx, url)
53		if err != nil {
54			return nil, err
55		}
56		defer func() { err = errs.Combine(err, conn.Close()) }()
57		return conn.PeerIdentity()
58	})
59}
60
61// Pool implements different peer verifications.
62//
63// architecture: Service
64type Pool struct {
65	log             *zap.Logger
66	resolver        IdentityResolver
67	refreshInterval time.Duration
68
69	listMu sync.Mutex
70	list   *List
71
72	satellitesDB satellites.DB
73
74	satellitesMu sync.RWMutex
75	satellites   map[storj.NodeID]*satelliteInfoCache
76}
77
78// satelliteInfoCache caches identity information about a satellite.
79type satelliteInfoCache struct {
80	mu       sync.Mutex
81	url      storj.NodeURL
82	identity *identity.PeerIdentity
83}
84
85// NewPool creates a new trust pool of the specified list of trusted satellites.
86func NewPool(log *zap.Logger, resolver IdentityResolver, config Config, satellitesDB satellites.DB) (*Pool, error) {
87	// TODO: preload all satellite peer identities
88
89	cache, err := LoadCache(config.CachePath)
90	if err != nil {
91		return nil, err
92	}
93
94	list, err := NewList(log, config.Sources, config.Exclusions.Rules, cache)
95	if err != nil {
96		return nil, err
97	}
98
99	return &Pool{
100		log:             log,
101		resolver:        resolver,
102		refreshInterval: config.RefreshInterval,
103		list:            list,
104		satellitesDB:    satellitesDB,
105		satellites:      make(map[storj.NodeID]*satelliteInfoCache),
106	}, nil
107}
108
109// Run periodically refreshes the pool. The initial refresh is intended to
110// happen before run is call. Therefore Run does not refresh right away.
111func (pool *Pool) Run(ctx context.Context) error {
112	for {
113		refreshAfter := jitter(pool.refreshInterval)
114		pool.log.Info("Scheduling next refresh", zap.Duration("after", refreshAfter))
115		if !sync2.Sleep(ctx, refreshAfter) {
116			return ctx.Err()
117		}
118		if err := pool.Refresh(ctx); err != nil {
119			pool.log.Error("Failed to refresh", zap.Error(err))
120			return err
121		}
122
123		for _, trustedSatellite := range pool.satellites {
124			if err := pool.satellitesDB.SetAddress(ctx, trustedSatellite.url.ID, trustedSatellite.url.Address); err != nil {
125				return err
126			}
127		}
128	}
129}
130
131// VerifySatelliteID checks whether id corresponds to a trusted satellite.
132func (pool *Pool) VerifySatelliteID(ctx context.Context, id storj.NodeID) (err error) {
133	defer mon.Task()(&ctx)(&err)
134
135	_, err = pool.getInfo(id)
136	return err
137}
138
139// GetSignee gets the corresponding signee for verifying signatures.
140// It ignores passed in ctx cancellation to avoid miscaching between concurrent requests.
141func (pool *Pool) GetSignee(ctx context.Context, id storj.NodeID) (_ signing.Signee, err error) {
142	defer mon.Task()(&ctx)(&err)
143
144	info, err := pool.getInfo(id)
145	if err != nil {
146		return nil, err
147	}
148
149	info.mu.Lock()
150	defer info.mu.Unlock()
151
152	if info.identity == nil {
153		identity, err := pool.resolver.ResolveIdentity(ctx, info.url)
154		if err != nil {
155			return nil, Error.Wrap(err)
156		}
157		info.identity = identity
158	}
159
160	return signing.SigneeFromPeerIdentity(info.identity), nil
161}
162
163// GetSatellites returns a slice containing all trusted satellites.
164func (pool *Pool) GetSatellites(ctx context.Context) (satellites []storj.NodeID) {
165	defer mon.Task()(&ctx)(nil)
166	for sat := range pool.satellites {
167		satellites = append(satellites, sat)
168	}
169	sort.Sort(storj.NodeIDList(satellites))
170	return satellites
171}
172
173// GetNodeURL returns the node url of a satellite in the trusted list.
174func (pool *Pool) GetNodeURL(ctx context.Context, id storj.NodeID) (_ storj.NodeURL, err error) {
175	defer mon.Task()(&ctx)(&err)
176
177	info, err := pool.getInfo(id)
178	if err != nil {
179		return storj.NodeURL{}, err
180	}
181	return info.url, nil
182}
183
184// Refresh refreshes the set of trusted satellites in the pool. Concurrent
185// callers will be synchronized so only one proceeds at a time.
186func (pool *Pool) Refresh(ctx context.Context) error {
187	urls, err := pool.fetchURLs(ctx)
188	if err != nil {
189		return err
190	}
191
192	pool.satellitesMu.Lock()
193	defer pool.satellitesMu.Unlock()
194
195	// add/update trusted IDs
196	trustedIDs := make(map[storj.NodeID]struct{})
197	for _, url := range urls {
198		trustedIDs[url.ID] = struct{}{}
199
200		info, ok := pool.satellites[url.ID]
201		if !ok {
202			info = &satelliteInfoCache{
203				url: url,
204			}
205			pool.log.Debug("Satellite is trusted", zap.String("id", url.ID.String()))
206			pool.satellites[url.ID] = info
207		}
208
209		// update the URL address and reset the identity if it changed
210		if info.url.Address != url.Address {
211			pool.log.Debug("Satellite address updated; identity cache purged",
212				zap.String("id", url.ID.String()),
213				zap.String("old", info.url.Address),
214				zap.String("new", url.Address),
215			)
216			info.url.Address = url.Address
217			info.identity = nil
218		}
219	}
220
221	// remove trusted IDs that are no longer in the URL list
222	for id := range pool.satellites {
223		if _, ok := trustedIDs[id]; !ok {
224			pool.log.Debug("Satellite is no longer trusted", zap.String("id", id.String()))
225			delete(pool.satellites, id)
226		}
227	}
228
229	return nil
230}
231
232func (pool *Pool) getInfo(id storj.NodeID) (*satelliteInfoCache, error) {
233	pool.satellitesMu.RLock()
234	defer pool.satellitesMu.RUnlock()
235
236	info, ok := pool.satellites[id]
237	if !ok {
238		return nil, Error.New("satellite %q is untrusted", id)
239	}
240	return info, nil
241}
242
243func (pool *Pool) fetchURLs(ctx context.Context) ([]storj.NodeURL, error) {
244	// Typically there will only be one caller of refresh (i.e. Run()) but
245	// if at some point we might want  on-demand refresh, and *List is designed
246	// to be used by a single goroutine (don't want multiple callers racing
247	// on the cache, etc).
248	pool.listMu.Lock()
249	defer pool.listMu.Unlock()
250	return pool.list.FetchURLs(ctx)
251}
252
253func jitter(t time.Duration) time.Duration {
254	nanos := rand.NormFloat64()*float64(t/4) + float64(t)
255	if nanos <= 0 {
256		nanos = 1
257	}
258	return time.Duration(nanos)
259}
260