1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package trust 5 6import ( 7 "context" 8 9 "go.uber.org/zap" 10 11 "storj.io/common/storj" 12) 13 14// List represents a dynamic trust list. 15type List struct { 16 log *zap.Logger 17 sources Sources 18 rules Rules 19 cache *Cache 20} 21 22// NewList takes one or more sources, optional rules, and a cache and returns a new List. 23func NewList(log *zap.Logger, sources []Source, rules Rules, cache *Cache) (*List, error) { 24 // TODO: ideally we'd ensure there was at least one source configured since 25 // it doesn't make sense to run a storage node that doesn't trust any 26 // satellites, but unfortunately the check causes the backcompat tests to 27 // fail. 28 switch { 29 case log == nil: 30 return nil, Error.New("logger cannot be nil") 31 case cache == nil: 32 return nil, Error.New("cache cannot be nil") 33 } 34 return &List{ 35 log: log, 36 sources: sources, 37 rules: rules, 38 cache: cache, 39 }, nil 40} 41 42// FetchURLs returns a list of Node URLS for trusted Satellites. It queries 43// all of the configured sources for trust entries. Entries from non-fixed 44// sources are cached. If entries cannot be retrieved from a source, a 45// cached copy is used, if available. Otherwise, if there are no cached 46// entries available, the call will fail. The URLS are filtered before being 47// returned. 48func (list *List) FetchURLs(ctx context.Context) ([]storj.NodeURL, error) { 49 candidates, err := list.fetchEntries(ctx) 50 if err != nil { 51 return nil, err 52 } 53 54 byAddress := make(map[string]int) 55 entries := make([]Entry, 0, len(candidates)) 56 for _, entry := range candidates { 57 if !list.rules.IsTrusted(entry.SatelliteURL) { 58 continue 59 } 60 previousIdx, ok := byAddress[entry.SatelliteURL.Address()] 61 if ok { 62 previous := entries[previousIdx] 63 // An entry with the same address has already been aggregated. 64 // If the entry is authoritative and the the previous entry was not 65 // then replace the previous entry, otherwise ignore. 66 if entry.Authoritative && !previous.Authoritative { 67 entries[previousIdx] = entry 68 } 69 continue 70 } 71 72 byAddress[entry.SatelliteURL.Address()] = len(entries) 73 entries = append(entries, entry) 74 } 75 76 var urls []storj.NodeURL 77 for _, entry := range entries { 78 urls = append(urls, entry.SatelliteURL.NodeURL()) 79 } 80 return urls, nil 81} 82 83func (list *List) fetchEntries(ctx context.Context) (_ []Entry, err error) { 84 defer mon.Task()(&ctx)(&err) 85 86 var allEntries []Entry 87 for _, source := range list.sources { 88 sourceLog := list.log.With(zap.String("source", source.String())) 89 90 entries, err := source.FetchEntries(ctx) 91 if err != nil { 92 var ok bool 93 entries, ok = list.lookupCache(source) 94 if !ok { 95 sourceLog.Error("Failed to fetch URLs from source", zap.Error(err)) 96 return nil, Error.New("failed to fetch from source %q: %w", source.String(), err) 97 } 98 sourceLog.Warn("Failed to fetch URLs from source; used cache", zap.Error(err)) 99 } else { 100 sourceLog.Debug("Fetched URLs from source; updating cache", zap.Int("count", len(entries))) 101 list.updateCache(source, entries) 102 } 103 104 allEntries = append(allEntries, entries...) 105 } 106 107 if err := list.saveCache(ctx); err != nil { 108 list.log.Warn("Unable to save list cache", zap.Error(err)) 109 } 110 return allEntries, nil 111} 112 113func (list *List) lookupCache(source Source) ([]Entry, bool) { 114 // Static sources are not cached 115 if source.Static() { 116 return nil, false 117 } 118 return list.cache.Lookup(source.String()) 119} 120 121func (list *List) updateCache(source Source, entries []Entry) { 122 // Static sources are not cached 123 if source.Static() { 124 return 125 } 126 list.cache.Set(source.String(), entries) 127} 128 129func (list *List) saveCache(ctx context.Context) error { 130 return list.cache.Save(ctx) 131} 132