1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package storagenode
5
6import (
7	"context"
8	"errors"
9	"fmt"
10	"net"
11	"net/http"
12	"path/filepath"
13	"time"
14
15	"github.com/spacemonkeygo/monkit/v3"
16	"github.com/zeebo/errs"
17	"go.uber.org/zap"
18	"golang.org/x/sync/errgroup"
19
20	"storj.io/common/identity"
21	"storj.io/common/pb"
22	"storj.io/common/peertls/extensions"
23	"storj.io/common/peertls/tlsopts"
24	"storj.io/common/rpc"
25	"storj.io/common/signing"
26	"storj.io/common/storj"
27	"storj.io/private/debug"
28	"storj.io/private/version"
29	"storj.io/storj/private/lifecycle"
30	"storj.io/storj/private/multinodepb"
31	"storj.io/storj/private/server"
32	"storj.io/storj/private/version/checker"
33	"storj.io/storj/storage"
34	"storj.io/storj/storage/filestore"
35	"storj.io/storj/storagenode/apikeys"
36	"storj.io/storj/storagenode/bandwidth"
37	"storj.io/storj/storagenode/collector"
38	"storj.io/storj/storagenode/console"
39	"storj.io/storj/storagenode/console/consoleassets"
40	"storj.io/storj/storagenode/console/consoleserver"
41	"storj.io/storj/storagenode/contact"
42	"storj.io/storj/storagenode/gracefulexit"
43	"storj.io/storj/storagenode/inspector"
44	"storj.io/storj/storagenode/internalpb"
45	"storj.io/storj/storagenode/monitor"
46	"storj.io/storj/storagenode/multinode"
47	"storj.io/storj/storagenode/nodestats"
48	"storj.io/storj/storagenode/notifications"
49	"storj.io/storj/storagenode/operator"
50	"storj.io/storj/storagenode/orders"
51	"storj.io/storj/storagenode/payouts"
52	"storj.io/storj/storagenode/payouts/estimatedpayouts"
53	"storj.io/storj/storagenode/pieces"
54	"storj.io/storj/storagenode/piecestore"
55	"storj.io/storj/storagenode/piecestore/usedserials"
56	"storj.io/storj/storagenode/piecetransfer"
57	"storj.io/storj/storagenode/preflight"
58	"storj.io/storj/storagenode/pricing"
59	"storj.io/storj/storagenode/reputation"
60	"storj.io/storj/storagenode/retain"
61	"storj.io/storj/storagenode/satellites"
62	"storj.io/storj/storagenode/storagenodedb"
63	"storj.io/storj/storagenode/storageusage"
64	"storj.io/storj/storagenode/trust"
65	version2 "storj.io/storj/storagenode/version"
66)
67
68var (
69	mon = monkit.Package()
70)
71
72// DB is the master database for Storage Node.
73//
74// architecture: Master Database
75type DB interface {
76	// MigrateToLatest initializes the database
77	MigrateToLatest(ctx context.Context) error
78	// Close closes the database
79	Close() error
80
81	Pieces() storage.Blobs
82
83	Orders() orders.DB
84	V0PieceInfo() pieces.V0PieceInfoDB
85	PieceExpirationDB() pieces.PieceExpirationDB
86	PieceSpaceUsedDB() pieces.PieceSpaceUsedDB
87	Bandwidth() bandwidth.DB
88	Reputation() reputation.DB
89	StorageUsage() storageusage.DB
90	Satellites() satellites.DB
91	Notifications() notifications.DB
92	Payout() payouts.DB
93	Pricing() pricing.DB
94	APIKeys() apikeys.DB
95
96	Preflight(ctx context.Context) error
97}
98
99// Config is all the configuration parameters for a Storage Node.
100type Config struct {
101	Identity identity.Config
102
103	Server server.Config
104	Debug  debug.Config
105
106	Preflight preflight.Config
107	Contact   contact.Config
108	Operator  operator.Config
109
110	// TODO: flatten storage config and only keep the new one
111	Storage   piecestore.OldConfig
112	Storage2  piecestore.Config
113	Collector collector.Config
114
115	Filestore filestore.Config
116
117	Pieces pieces.Config
118
119	Retain retain.Config
120
121	Nodestats nodestats.Config
122
123	Console consoleserver.Config
124
125	Version checker.Config
126
127	Bandwidth bandwidth.Config
128
129	GracefulExit gracefulexit.Config
130}
131
132// DatabaseConfig returns the storagenodedb.Config that should be used with this Config.
133func (config *Config) DatabaseConfig() storagenodedb.Config {
134	dbdir := config.Storage2.DatabaseDir
135	if dbdir == "" {
136		dbdir = config.Storage.Path
137	}
138	return storagenodedb.Config{
139		Storage:   config.Storage.Path,
140		Info:      filepath.Join(dbdir, "piecestore.db"),
141		Info2:     filepath.Join(dbdir, "info.db"),
142		Pieces:    config.Storage.Path,
143		Filestore: config.Filestore,
144	}
145}
146
147// Verify verifies whether configuration is consistent and acceptable.
148func (config *Config) Verify(log *zap.Logger) error {
149	err := config.Operator.Verify(log)
150	if err != nil {
151		return err
152	}
153
154	if config.Contact.ExternalAddress != "" {
155		err := isAddressValid(config.Contact.ExternalAddress)
156		if err != nil {
157			return errs.New("invalid contact.external-address: %v", err)
158		}
159	}
160
161	if config.Server.Address != "" {
162		err := isAddressValid(config.Server.Address)
163		if err != nil {
164			return errs.New("invalid server.address: %v", err)
165		}
166	}
167
168	return nil
169}
170
171func isAddressValid(addrstring string) error {
172	addr, port, err := net.SplitHostPort(addrstring)
173	if err != nil || port == "" {
174		return errs.New("split host-port %q failed: %+v", addrstring, err)
175	}
176	if addr == "" {
177		return nil
178	}
179	resolvedhosts, err := net.LookupHost(addr)
180	if err != nil || len(resolvedhosts) == 0 {
181		return errs.New("lookup %q failed: %+v", addr, err)
182	}
183
184	return nil
185}
186
187// Peer is the representation of a Storage Node.
188//
189// architecture: Peer
190type Peer struct {
191	// core dependencies
192	Log         *zap.Logger
193	Identity    *identity.FullIdentity
194	DB          DB
195	UsedSerials *usedserials.Table
196	OrdersStore *orders.FileStore
197
198	Servers  *lifecycle.Group
199	Services *lifecycle.Group
200
201	Dialer rpc.Dialer
202
203	Server *server.Server
204
205	Version struct {
206		Chore   *version2.Chore
207		Service *checker.Service
208	}
209
210	Debug struct {
211		Listener net.Listener
212		Server   *debug.Server
213	}
214
215	// services and endpoints
216	// TODO: similar grouping to satellite.Core
217
218	Preflight struct {
219		LocalTime *preflight.LocalTime
220	}
221
222	Contact struct {
223		Service   *contact.Service
224		Chore     *contact.Chore
225		Endpoint  *contact.Endpoint
226		PingStats *contact.PingStats
227	}
228
229	Estimation struct {
230		Service *estimatedpayouts.Service
231	}
232
233	Storage2 struct {
234		// TODO: lift things outside of it to organize better
235		Trust         *trust.Pool
236		Store         *pieces.Store
237		TrashChore    *pieces.TrashChore
238		BlobsCache    *pieces.BlobsUsageCache
239		CacheService  *pieces.CacheService
240		RetainService *retain.Service
241		PieceDeleter  *pieces.Deleter
242		Endpoint      *piecestore.Endpoint
243		Inspector     *inspector.Endpoint
244		Monitor       *monitor.Service
245		Orders        *orders.Service
246	}
247
248	Collector *collector.Service
249
250	NodeStats struct {
251		Service *nodestats.Service
252		Cache   *nodestats.Cache
253	}
254
255	// Web server with web UI
256	Console struct {
257		Listener net.Listener
258		Service  *console.Service
259		Endpoint *consoleserver.Server
260	}
261
262	PieceTransfer struct {
263		Service piecetransfer.Service
264	}
265
266	GracefulExit struct {
267		Service      gracefulexit.Service
268		Endpoint     *gracefulexit.Endpoint
269		Chore        *gracefulexit.Chore
270		BlobsCleaner *gracefulexit.BlobsCleaner
271	}
272
273	Notifications struct {
274		Service *notifications.Service
275	}
276
277	Payout struct {
278		Service  *payouts.Service
279		Endpoint *payouts.Endpoint
280	}
281
282	Bandwidth *bandwidth.Service
283
284	Reputation *reputation.Service
285
286	Multinode struct {
287		Storage   *multinode.StorageEndpoint
288		Bandwidth *multinode.BandwidthEndpoint
289		Node      *multinode.NodeEndpoint
290		Payout    *multinode.PayoutEndpoint
291	}
292}
293
294// New creates a new Storage Node.
295func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB extensions.RevocationDB, config Config, versionInfo version.Info, atomicLogLevel *zap.AtomicLevel) (*Peer, error) {
296	peer := &Peer{
297		Log:      log,
298		Identity: full,
299		DB:       db,
300
301		Servers:  lifecycle.NewGroup(log.Named("servers")),
302		Services: lifecycle.NewGroup(log.Named("services")),
303	}
304
305	{ // setup notification service.
306		peer.Notifications.Service = notifications.NewService(peer.Log, peer.DB.Notifications())
307	}
308
309	{ // setup debug
310		var err error
311		if config.Debug.Address != "" {
312			peer.Debug.Listener, err = net.Listen("tcp", config.Debug.Address)
313			if err != nil {
314				withoutStack := errors.New(err.Error())
315				peer.Log.Debug("failed to start debug endpoints", zap.Error(withoutStack))
316			}
317		}
318		debugConfig := config.Debug
319		debugConfig.ControlTitle = "Storage Node"
320		peer.Debug.Server = debug.NewServerWithAtomicLevel(log.Named("debug"), peer.Debug.Listener, monkit.Default, debugConfig, atomicLogLevel)
321		peer.Servers.Add(lifecycle.Item{
322			Name:  "debug",
323			Run:   peer.Debug.Server.Run,
324			Close: peer.Debug.Server.Close,
325		})
326	}
327
328	var err error
329
330	{ // version setup
331		if !versionInfo.IsZero() {
332			peer.Log.Debug("Version info",
333				zap.Stringer("Version", versionInfo.Version.Version),
334				zap.String("Commit Hash", versionInfo.CommitHash),
335				zap.Stringer("Build Timestamp", versionInfo.Timestamp),
336				zap.Bool("Release Build", versionInfo.Release),
337			)
338		}
339
340		peer.Version.Service = checker.NewService(log.Named("version"), config.Version, versionInfo, "Storagenode")
341		versionCheckInterval := 12 * time.Hour
342		peer.Version.Chore = version2.NewChore(peer.Log.Named("version:chore"), peer.Version.Service, peer.Notifications.Service, peer.Identity.ID, versionCheckInterval)
343		peer.Services.Add(lifecycle.Item{
344			Name: "version",
345			Run:  peer.Version.Chore.Run,
346		})
347	}
348
349	{ // setup listener and server
350		sc := config.Server
351
352		tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB)
353		if err != nil {
354			return nil, errs.Combine(err, peer.Close())
355		}
356
357		peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
358
359		peer.Server, err = server.New(log.Named("server"), tlsOptions, sc)
360		if err != nil {
361			return nil, errs.Combine(err, peer.Close())
362		}
363
364		peer.Servers.Add(lifecycle.Item{
365			Name: "server",
366			Run: func(ctx context.Context) error {
367				// Don't change the format of this comment, it is used to figure out the node id.
368				peer.Log.Info(fmt.Sprintf("Node %s started", peer.Identity.ID))
369				peer.Log.Info(fmt.Sprintf("Public server started on %s", peer.Addr()))
370				peer.Log.Info(fmt.Sprintf("Private server started on %s", peer.PrivateAddr()))
371				return peer.Server.Run(ctx)
372			},
373			Close: peer.Server.Close,
374		})
375	}
376
377	{ // setup trust pool
378		peer.Storage2.Trust, err = trust.NewPool(log.Named("trust"), trust.Dialer(peer.Dialer), config.Storage2.Trust, peer.DB.Satellites())
379		if err != nil {
380			return nil, errs.Combine(err, peer.Close())
381		}
382		peer.Services.Add(lifecycle.Item{
383			Name: "trust",
384			Run:  peer.Storage2.Trust.Run,
385		})
386	}
387
388	{
389		peer.Preflight.LocalTime = preflight.NewLocalTime(peer.Log.Named("preflight:localtime"), config.Preflight, peer.Storage2.Trust, peer.Dialer)
390	}
391
392	{ // setup contact service
393		c := config.Contact
394		if c.ExternalAddress == "" {
395			c.ExternalAddress = peer.Addr()
396		}
397
398		pbVersion, err := versionInfo.Proto()
399		if err != nil {
400			return nil, errs.Combine(err, peer.Close())
401		}
402		self := contact.NodeInfo{
403			ID:      peer.ID(),
404			Address: c.ExternalAddress,
405			Operator: pb.NodeOperator{
406				Email:          config.Operator.Email,
407				Wallet:         config.Operator.Wallet,
408				WalletFeatures: config.Operator.WalletFeatures,
409			},
410			Version: *pbVersion,
411		}
412		peer.Contact.PingStats = new(contact.PingStats)
413		peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), peer.Dialer, self, peer.Storage2.Trust)
414
415		peer.Contact.Chore = contact.NewChore(peer.Log.Named("contact:chore"), config.Contact.Interval, peer.Contact.Service)
416		peer.Services.Add(lifecycle.Item{
417			Name:  "contact:chore",
418			Run:   peer.Contact.Chore.Run,
419			Close: peer.Contact.Chore.Close,
420		})
421
422		peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Storage2.Trust, peer.Contact.PingStats)
423		if err := pb.DRPCRegisterContact(peer.Server.DRPC(), peer.Contact.Endpoint); err != nil {
424			return nil, errs.Combine(err, peer.Close())
425		}
426	}
427
428	{ // setup storage
429		peer.Storage2.BlobsCache = pieces.NewBlobsUsageCache(peer.Log.Named("blobscache"), peer.DB.Pieces())
430
431		peer.Storage2.Store = pieces.NewStore(peer.Log.Named("pieces"),
432			peer.Storage2.BlobsCache,
433			peer.DB.V0PieceInfo(),
434			peer.DB.PieceExpirationDB(),
435			peer.DB.PieceSpaceUsedDB(),
436			config.Pieces,
437		)
438
439		peer.Storage2.PieceDeleter = pieces.NewDeleter(log.Named("piecedeleter"), peer.Storage2.Store, config.Storage2.DeleteWorkers, config.Storage2.DeleteQueueSize)
440		peer.Services.Add(lifecycle.Item{
441			Name:  "PieceDeleter",
442			Run:   peer.Storage2.PieceDeleter.Run,
443			Close: peer.Storage2.PieceDeleter.Close,
444		})
445
446		peer.Storage2.TrashChore = pieces.NewTrashChore(
447			log.Named("pieces:trash"),
448			24*time.Hour,   // choreInterval: how often to run the chore
449			7*24*time.Hour, // trashExpiryInterval: when items in the trash should be deleted
450			peer.Storage2.Trust,
451			peer.Storage2.Store,
452		)
453		peer.Services.Add(lifecycle.Item{
454			Name:  "pieces:trash",
455			Run:   peer.Storage2.TrashChore.Run,
456			Close: peer.Storage2.TrashChore.Close,
457		})
458
459		peer.Storage2.CacheService = pieces.NewService(
460			log.Named("piecestore:cache"),
461			peer.Storage2.BlobsCache,
462			peer.Storage2.Store,
463			config.Storage2.CacheSyncInterval,
464		)
465		peer.Services.Add(lifecycle.Item{
466			Name:  "piecestore:cache",
467			Run:   peer.Storage2.CacheService.Run,
468			Close: peer.Storage2.CacheService.Close,
469		})
470		peer.Debug.Server.Panel.Add(
471			debug.Cycle("Piecestore Cache", peer.Storage2.CacheService.Loop))
472
473		peer.Storage2.Monitor = monitor.NewService(
474			log.Named("piecestore:monitor"),
475			peer.Storage2.Store,
476			peer.Contact.Service,
477			peer.DB.Bandwidth(),
478			config.Storage.AllocatedDiskSpace.Int64(),
479			// TODO: use config.Storage.Monitor.Interval, but for some reason is not set
480			config.Storage.KBucketRefreshInterval,
481			peer.Contact.Chore.Trigger,
482			config.Storage2.Monitor,
483		)
484		peer.Services.Add(lifecycle.Item{
485			Name:  "piecestore:monitor",
486			Run:   peer.Storage2.Monitor.Run,
487			Close: peer.Storage2.Monitor.Close,
488		})
489		peer.Debug.Server.Panel.Add(
490			debug.Cycle("Piecestore Monitor", peer.Storage2.Monitor.Loop))
491
492		peer.Storage2.RetainService = retain.NewService(
493			peer.Log.Named("retain"),
494			peer.Storage2.Store,
495			config.Retain,
496		)
497		peer.Services.Add(lifecycle.Item{
498			Name:  "retain",
499			Run:   peer.Storage2.RetainService.Run,
500			Close: peer.Storage2.RetainService.Close,
501		})
502
503		peer.UsedSerials = usedserials.NewTable(config.Storage2.MaxUsedSerialsSize)
504
505		peer.OrdersStore, err = orders.NewFileStore(
506			peer.Log.Named("ordersfilestore"),
507			config.Storage2.Orders.Path,
508			config.Storage2.OrderLimitGracePeriod,
509		)
510		if err != nil {
511			return nil, errs.Combine(err, peer.Close())
512		}
513
514		peer.Storage2.Endpoint, err = piecestore.NewEndpoint(
515			peer.Log.Named("piecestore"),
516			signing.SignerFromFullIdentity(peer.Identity),
517			peer.Storage2.Trust,
518			peer.Storage2.Monitor,
519			peer.Storage2.RetainService,
520			peer.Contact.PingStats,
521			peer.Storage2.Store,
522			peer.Storage2.PieceDeleter,
523			peer.OrdersStore,
524			peer.DB.Bandwidth(),
525			peer.UsedSerials,
526			config.Storage2,
527		)
528		if err != nil {
529			return nil, errs.Combine(err, peer.Close())
530		}
531
532		if err := pb.DRPCRegisterPiecestore(peer.Server.DRPC(), peer.Storage2.Endpoint); err != nil {
533			return nil, errs.Combine(err, peer.Close())
534		}
535
536		// TODO workaround for custom timeout for order sending request (read/write)
537		sc := config.Server
538
539		tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB)
540		if err != nil {
541			return nil, errs.Combine(err, peer.Close())
542		}
543
544		dialer := rpc.NewDefaultDialer(tlsOptions)
545		dialer.DialTimeout = config.Storage2.Orders.SenderDialTimeout
546
547		peer.Storage2.Orders = orders.NewService(
548			log.Named("orders"),
549			dialer,
550			peer.OrdersStore,
551			peer.DB.Orders(),
552			peer.Storage2.Trust,
553			config.Storage2.Orders,
554		)
555		peer.Services.Add(lifecycle.Item{
556			Name:  "orders",
557			Run:   peer.Storage2.Orders.Run,
558			Close: peer.Storage2.Orders.Close,
559		})
560		peer.Debug.Server.Panel.Add(
561			debug.Cycle("Orders Sender", peer.Storage2.Orders.Sender))
562		peer.Debug.Server.Panel.Add(
563			debug.Cycle("Orders Cleanup", peer.Storage2.Orders.Cleanup))
564	}
565
566	{ // setup payouts.
567		peer.Payout.Service, err = payouts.NewService(
568			peer.Log.Named("payouts:service"),
569			peer.DB.Payout(),
570			peer.DB.Reputation(),
571			peer.DB.Satellites(),
572			peer.Storage2.Trust,
573		)
574		if err != nil {
575			return nil, errs.Combine(err, peer.Close())
576		}
577
578		peer.Payout.Endpoint = payouts.NewEndpoint(
579			peer.Log.Named("payouts:endpoint"),
580			peer.Dialer,
581			peer.Storage2.Trust,
582		)
583	}
584
585	{ // setup reputation service.
586		peer.Reputation = reputation.NewService(
587			peer.Log.Named("reputation:service"),
588			peer.DB.Reputation(),
589			peer.Identity.ID,
590			peer.Notifications.Service,
591		)
592	}
593
594	{ // setup node stats service
595		peer.NodeStats.Service = nodestats.NewService(
596			peer.Log.Named("nodestats:service"),
597			peer.Dialer,
598			peer.Storage2.Trust,
599		)
600
601		peer.NodeStats.Cache = nodestats.NewCache(
602			peer.Log.Named("nodestats:cache"),
603			config.Nodestats,
604			nodestats.CacheStorage{
605				Reputation:   peer.DB.Reputation(),
606				StorageUsage: peer.DB.StorageUsage(),
607				Payout:       peer.DB.Payout(),
608				Pricing:      peer.DB.Pricing(),
609			},
610			peer.NodeStats.Service,
611			peer.Payout.Endpoint,
612			peer.Reputation,
613			peer.Storage2.Trust,
614		)
615		peer.Services.Add(lifecycle.Item{
616			Name:  "nodestats:cache",
617			Run:   peer.NodeStats.Cache.Run,
618			Close: peer.NodeStats.Cache.Close,
619		})
620		peer.Debug.Server.Panel.Add(
621			debug.Cycle("Node Stats Cache Reputation", peer.NodeStats.Cache.Reputation))
622		peer.Debug.Server.Panel.Add(
623			debug.Cycle("Node Stats Cache Storage", peer.NodeStats.Cache.Storage))
624	}
625
626	{ // setup estimation service
627		peer.Estimation.Service = estimatedpayouts.NewService(
628			peer.DB.Bandwidth(),
629			peer.DB.Reputation(),
630			peer.DB.StorageUsage(),
631			peer.DB.Pricing(),
632			peer.DB.Satellites(),
633			peer.Storage2.Trust,
634		)
635	}
636
637	{ // setup storage node operator dashboard
638		peer.Console.Service, err = console.NewService(
639			peer.Log.Named("console:service"),
640			peer.DB.Bandwidth(),
641			peer.Storage2.Store,
642			peer.Version.Service,
643			config.Storage.AllocatedDiskSpace,
644			config.Operator.Wallet,
645			versionInfo,
646			peer.Storage2.Trust,
647			peer.DB.Reputation(),
648			peer.DB.StorageUsage(),
649			peer.DB.Pricing(),
650			peer.DB.Satellites(),
651			peer.Contact.PingStats,
652			peer.Contact.Service,
653			peer.Estimation.Service,
654			peer.Storage2.BlobsCache,
655			config.Operator.WalletFeatures,
656		)
657		if err != nil {
658			return nil, errs.Combine(err, peer.Close())
659		}
660
661		peer.Console.Listener, err = net.Listen("tcp", config.Console.Address)
662		if err != nil {
663			return nil, errs.Combine(err, peer.Close())
664		}
665
666		assets := consoleassets.FileSystem
667		if config.Console.StaticDir != "" {
668			// a specific directory has been configured. use it
669			assets = http.Dir(config.Console.StaticDir)
670		}
671
672		peer.Console.Endpoint = consoleserver.NewServer(
673			peer.Log.Named("console:endpoint"),
674			assets,
675			peer.Notifications.Service,
676			peer.Console.Service,
677			peer.Payout.Service,
678			peer.Console.Listener,
679		)
680		peer.Services.Add(lifecycle.Item{
681			Name:  "console:endpoint",
682			Run:   peer.Console.Endpoint.Run,
683			Close: peer.Console.Endpoint.Close,
684		})
685	}
686
687	{ // setup storage inspector
688		peer.Storage2.Inspector = inspector.NewEndpoint(
689			peer.Log.Named("pieces:inspector"),
690			peer.Storage2.Store,
691			peer.Contact.Service,
692			peer.Contact.PingStats,
693			peer.DB.Bandwidth(),
694			config.Storage,
695			peer.Console.Listener.Addr(),
696			config.Contact.ExternalAddress,
697		)
698		if err := internalpb.DRPCRegisterPieceStoreInspector(peer.Server.PrivateDRPC(), peer.Storage2.Inspector); err != nil {
699			return nil, errs.Combine(err, peer.Close())
700		}
701	}
702
703	{ // setup piecetransfer service
704		peer.PieceTransfer.Service = piecetransfer.NewService(
705			peer.Log.Named("piecetransfer"),
706			peer.Storage2.Store,
707			peer.Storage2.Trust,
708			peer.Dialer,
709			// using GracefulExit config here for historical reasons
710			config.GracefulExit.MinDownloadTimeout,
711			config.GracefulExit.MinBytesPerSecond,
712		)
713	}
714
715	{ // setup graceful exit service
716		peer.GracefulExit.Service = gracefulexit.NewService(
717			peer.Log.Named("gracefulexit:service"),
718			peer.Storage2.Store,
719			peer.Storage2.Trust,
720			peer.DB.Satellites(),
721			peer.Dialer,
722			config.GracefulExit,
723		)
724
725		peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint(
726			peer.Log.Named("gracefulexit:endpoint"),
727			peer.Storage2.Trust,
728			peer.DB.Satellites(),
729			peer.Dialer,
730			peer.Storage2.BlobsCache,
731		)
732		if err := internalpb.DRPCRegisterNodeGracefulExit(peer.Server.PrivateDRPC(), peer.GracefulExit.Endpoint); err != nil {
733			return nil, errs.Combine(err, peer.Close())
734		}
735
736		peer.GracefulExit.Chore = gracefulexit.NewChore(
737			peer.Log.Named("gracefulexit:chore"),
738			peer.GracefulExit.Service,
739			peer.PieceTransfer.Service,
740			peer.Dialer,
741			config.GracefulExit,
742		)
743		peer.GracefulExit.BlobsCleaner = gracefulexit.NewBlobsCleaner(
744			peer.Log.Named("gracefulexit:blobscleaner"),
745			peer.Storage2.Store,
746			peer.Storage2.Trust,
747			peer.DB.Satellites(),
748		)
749		// Runs once on node start to clean blobs from trash that left after successful GE.
750		peer.Services.Add(lifecycle.Item{
751			Name: "gracefulexit:blobscleaner",
752			Run:  peer.GracefulExit.BlobsCleaner.RemoveBlobs,
753		})
754		peer.Services.Add(lifecycle.Item{
755			Name:  "gracefulexit:chore",
756			Run:   peer.GracefulExit.Chore.Run,
757			Close: peer.GracefulExit.Chore.Close,
758		})
759		peer.Debug.Server.Panel.Add(
760			debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop))
761	}
762
763	peer.Collector = collector.NewService(peer.Log.Named("collector"), peer.Storage2.Store, peer.UsedSerials, config.Collector)
764	peer.Services.Add(lifecycle.Item{
765		Name:  "collector",
766		Run:   peer.Collector.Run,
767		Close: peer.Collector.Close,
768	})
769	peer.Debug.Server.Panel.Add(
770		debug.Cycle("Collector", peer.Collector.Loop))
771
772	peer.Bandwidth = bandwidth.NewService(peer.Log.Named("bandwidth"), peer.DB.Bandwidth(), config.Bandwidth)
773	peer.Services.Add(lifecycle.Item{
774		Name:  "bandwidth",
775		Run:   peer.Bandwidth.Run,
776		Close: peer.Bandwidth.Close,
777	})
778	peer.Debug.Server.Panel.Add(
779		debug.Cycle("Bandwidth", peer.Bandwidth.Loop))
780
781	{ // setup multinode endpoints
782		// TODO: add to peer?
783		apiKeys := apikeys.NewService(peer.DB.APIKeys())
784
785		peer.Multinode.Storage = multinode.NewStorageEndpoint(
786			peer.Log.Named("multinode:storage-endpoint"),
787			apiKeys,
788			peer.Storage2.Monitor,
789			peer.DB.StorageUsage(),
790		)
791
792		peer.Multinode.Bandwidth = multinode.NewBandwidthEndpoint(
793			peer.Log.Named("multinode:bandwidth-endpoint"),
794			apiKeys,
795			peer.DB.Bandwidth(),
796		)
797
798		peer.Multinode.Node = multinode.NewNodeEndpoint(
799			peer.Log.Named("multinode:node-endpoint"),
800			config.Operator,
801			apiKeys,
802			peer.Version.Service.Info,
803			peer.Contact.PingStats,
804			peer.DB.Reputation(),
805			peer.Storage2.Trust,
806		)
807
808		peer.Multinode.Payout = multinode.NewPayoutEndpoint(
809			peer.Log.Named("multinode:payout-endpoint"),
810			apiKeys,
811			peer.DB.Payout(),
812			peer.Estimation.Service,
813			peer.Payout.Service,
814		)
815
816		if err = multinodepb.DRPCRegisterStorage(peer.Server.DRPC(), peer.Multinode.Storage); err != nil {
817			return nil, errs.Combine(err, peer.Close())
818		}
819		if err = multinodepb.DRPCRegisterBandwidth(peer.Server.DRPC(), peer.Multinode.Bandwidth); err != nil {
820			return nil, errs.Combine(err, peer.Close())
821		}
822		if err = multinodepb.DRPCRegisterNode(peer.Server.DRPC(), peer.Multinode.Node); err != nil {
823			return nil, errs.Combine(err, peer.Close())
824		}
825		if err = multinodepb.DRPCRegisterPayout(peer.Server.DRPC(), peer.Multinode.Payout); err != nil {
826			return nil, errs.Combine(err, peer.Close())
827		}
828		if err = multinodepb.DRPCRegisterPayouts(peer.Server.DRPC(), peer.Multinode.Payout); err != nil {
829			return nil, errs.Combine(err, peer.Close())
830		}
831	}
832
833	return peer, nil
834}
835
836// Run runs storage node until it's either closed or it errors.
837func (peer *Peer) Run(ctx context.Context) (err error) {
838	defer mon.Task()(&ctx)(&err)
839
840	// Refresh the trust pool first. It will be updated periodically via
841	// Run() below.
842	if err := peer.Storage2.Trust.Refresh(ctx); err != nil {
843		return err
844	}
845
846	if err := peer.Preflight.LocalTime.Check(ctx); err != nil {
847		peer.Log.Error("Failed preflight check.", zap.Error(err))
848		return err
849	}
850
851	group, ctx := errgroup.WithContext(ctx)
852
853	peer.Servers.Run(ctx, group)
854	peer.Services.Run(ctx, group)
855
856	return group.Wait()
857}
858
859// Close closes all the resources.
860func (peer *Peer) Close() error {
861	return errs.Combine(
862		peer.Servers.Close(),
863		peer.Services.Close(),
864	)
865}
866
867// ID returns the peer ID.
868func (peer *Peer) ID() storj.NodeID { return peer.Identity.ID }
869
870// Addr returns the public address.
871func (peer *Peer) Addr() string { return peer.Server.Addr().String() }
872
873// URL returns the storj.NodeURL.
874func (peer *Peer) URL() storj.NodeURL { return storj.NodeURL{ID: peer.ID(), Address: peer.Addr()} }
875
876// PrivateAddr returns the private address.
877func (peer *Peer) PrivateAddr() string { return peer.Server.PrivateAddr().String() }
878