1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package storagenode 5 6import ( 7 "context" 8 "errors" 9 "fmt" 10 "net" 11 "net/http" 12 "path/filepath" 13 "time" 14 15 "github.com/spacemonkeygo/monkit/v3" 16 "github.com/zeebo/errs" 17 "go.uber.org/zap" 18 "golang.org/x/sync/errgroup" 19 20 "storj.io/common/identity" 21 "storj.io/common/pb" 22 "storj.io/common/peertls/extensions" 23 "storj.io/common/peertls/tlsopts" 24 "storj.io/common/rpc" 25 "storj.io/common/signing" 26 "storj.io/common/storj" 27 "storj.io/private/debug" 28 "storj.io/private/version" 29 "storj.io/storj/private/lifecycle" 30 "storj.io/storj/private/multinodepb" 31 "storj.io/storj/private/server" 32 "storj.io/storj/private/version/checker" 33 "storj.io/storj/storage" 34 "storj.io/storj/storage/filestore" 35 "storj.io/storj/storagenode/apikeys" 36 "storj.io/storj/storagenode/bandwidth" 37 "storj.io/storj/storagenode/collector" 38 "storj.io/storj/storagenode/console" 39 "storj.io/storj/storagenode/console/consoleassets" 40 "storj.io/storj/storagenode/console/consoleserver" 41 "storj.io/storj/storagenode/contact" 42 "storj.io/storj/storagenode/gracefulexit" 43 "storj.io/storj/storagenode/inspector" 44 "storj.io/storj/storagenode/internalpb" 45 "storj.io/storj/storagenode/monitor" 46 "storj.io/storj/storagenode/multinode" 47 "storj.io/storj/storagenode/nodestats" 48 "storj.io/storj/storagenode/notifications" 49 "storj.io/storj/storagenode/operator" 50 "storj.io/storj/storagenode/orders" 51 "storj.io/storj/storagenode/payouts" 52 "storj.io/storj/storagenode/payouts/estimatedpayouts" 53 "storj.io/storj/storagenode/pieces" 54 "storj.io/storj/storagenode/piecestore" 55 "storj.io/storj/storagenode/piecestore/usedserials" 56 "storj.io/storj/storagenode/piecetransfer" 57 "storj.io/storj/storagenode/preflight" 58 "storj.io/storj/storagenode/pricing" 59 "storj.io/storj/storagenode/reputation" 60 "storj.io/storj/storagenode/retain" 61 "storj.io/storj/storagenode/satellites" 62 "storj.io/storj/storagenode/storagenodedb" 63 "storj.io/storj/storagenode/storageusage" 64 "storj.io/storj/storagenode/trust" 65 version2 "storj.io/storj/storagenode/version" 66) 67 68var ( 69 mon = monkit.Package() 70) 71 72// DB is the master database for Storage Node. 73// 74// architecture: Master Database 75type DB interface { 76 // MigrateToLatest initializes the database 77 MigrateToLatest(ctx context.Context) error 78 // Close closes the database 79 Close() error 80 81 Pieces() storage.Blobs 82 83 Orders() orders.DB 84 V0PieceInfo() pieces.V0PieceInfoDB 85 PieceExpirationDB() pieces.PieceExpirationDB 86 PieceSpaceUsedDB() pieces.PieceSpaceUsedDB 87 Bandwidth() bandwidth.DB 88 Reputation() reputation.DB 89 StorageUsage() storageusage.DB 90 Satellites() satellites.DB 91 Notifications() notifications.DB 92 Payout() payouts.DB 93 Pricing() pricing.DB 94 APIKeys() apikeys.DB 95 96 Preflight(ctx context.Context) error 97} 98 99// Config is all the configuration parameters for a Storage Node. 100type Config struct { 101 Identity identity.Config 102 103 Server server.Config 104 Debug debug.Config 105 106 Preflight preflight.Config 107 Contact contact.Config 108 Operator operator.Config 109 110 // TODO: flatten storage config and only keep the new one 111 Storage piecestore.OldConfig 112 Storage2 piecestore.Config 113 Collector collector.Config 114 115 Filestore filestore.Config 116 117 Pieces pieces.Config 118 119 Retain retain.Config 120 121 Nodestats nodestats.Config 122 123 Console consoleserver.Config 124 125 Version checker.Config 126 127 Bandwidth bandwidth.Config 128 129 GracefulExit gracefulexit.Config 130} 131 132// DatabaseConfig returns the storagenodedb.Config that should be used with this Config. 133func (config *Config) DatabaseConfig() storagenodedb.Config { 134 dbdir := config.Storage2.DatabaseDir 135 if dbdir == "" { 136 dbdir = config.Storage.Path 137 } 138 return storagenodedb.Config{ 139 Storage: config.Storage.Path, 140 Info: filepath.Join(dbdir, "piecestore.db"), 141 Info2: filepath.Join(dbdir, "info.db"), 142 Pieces: config.Storage.Path, 143 Filestore: config.Filestore, 144 } 145} 146 147// Verify verifies whether configuration is consistent and acceptable. 148func (config *Config) Verify(log *zap.Logger) error { 149 err := config.Operator.Verify(log) 150 if err != nil { 151 return err 152 } 153 154 if config.Contact.ExternalAddress != "" { 155 err := isAddressValid(config.Contact.ExternalAddress) 156 if err != nil { 157 return errs.New("invalid contact.external-address: %v", err) 158 } 159 } 160 161 if config.Server.Address != "" { 162 err := isAddressValid(config.Server.Address) 163 if err != nil { 164 return errs.New("invalid server.address: %v", err) 165 } 166 } 167 168 return nil 169} 170 171func isAddressValid(addrstring string) error { 172 addr, port, err := net.SplitHostPort(addrstring) 173 if err != nil || port == "" { 174 return errs.New("split host-port %q failed: %+v", addrstring, err) 175 } 176 if addr == "" { 177 return nil 178 } 179 resolvedhosts, err := net.LookupHost(addr) 180 if err != nil || len(resolvedhosts) == 0 { 181 return errs.New("lookup %q failed: %+v", addr, err) 182 } 183 184 return nil 185} 186 187// Peer is the representation of a Storage Node. 188// 189// architecture: Peer 190type Peer struct { 191 // core dependencies 192 Log *zap.Logger 193 Identity *identity.FullIdentity 194 DB DB 195 UsedSerials *usedserials.Table 196 OrdersStore *orders.FileStore 197 198 Servers *lifecycle.Group 199 Services *lifecycle.Group 200 201 Dialer rpc.Dialer 202 203 Server *server.Server 204 205 Version struct { 206 Chore *version2.Chore 207 Service *checker.Service 208 } 209 210 Debug struct { 211 Listener net.Listener 212 Server *debug.Server 213 } 214 215 // services and endpoints 216 // TODO: similar grouping to satellite.Core 217 218 Preflight struct { 219 LocalTime *preflight.LocalTime 220 } 221 222 Contact struct { 223 Service *contact.Service 224 Chore *contact.Chore 225 Endpoint *contact.Endpoint 226 PingStats *contact.PingStats 227 } 228 229 Estimation struct { 230 Service *estimatedpayouts.Service 231 } 232 233 Storage2 struct { 234 // TODO: lift things outside of it to organize better 235 Trust *trust.Pool 236 Store *pieces.Store 237 TrashChore *pieces.TrashChore 238 BlobsCache *pieces.BlobsUsageCache 239 CacheService *pieces.CacheService 240 RetainService *retain.Service 241 PieceDeleter *pieces.Deleter 242 Endpoint *piecestore.Endpoint 243 Inspector *inspector.Endpoint 244 Monitor *monitor.Service 245 Orders *orders.Service 246 } 247 248 Collector *collector.Service 249 250 NodeStats struct { 251 Service *nodestats.Service 252 Cache *nodestats.Cache 253 } 254 255 // Web server with web UI 256 Console struct { 257 Listener net.Listener 258 Service *console.Service 259 Endpoint *consoleserver.Server 260 } 261 262 PieceTransfer struct { 263 Service piecetransfer.Service 264 } 265 266 GracefulExit struct { 267 Service gracefulexit.Service 268 Endpoint *gracefulexit.Endpoint 269 Chore *gracefulexit.Chore 270 BlobsCleaner *gracefulexit.BlobsCleaner 271 } 272 273 Notifications struct { 274 Service *notifications.Service 275 } 276 277 Payout struct { 278 Service *payouts.Service 279 Endpoint *payouts.Endpoint 280 } 281 282 Bandwidth *bandwidth.Service 283 284 Reputation *reputation.Service 285 286 Multinode struct { 287 Storage *multinode.StorageEndpoint 288 Bandwidth *multinode.BandwidthEndpoint 289 Node *multinode.NodeEndpoint 290 Payout *multinode.PayoutEndpoint 291 } 292} 293 294// New creates a new Storage Node. 295func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB extensions.RevocationDB, config Config, versionInfo version.Info, atomicLogLevel *zap.AtomicLevel) (*Peer, error) { 296 peer := &Peer{ 297 Log: log, 298 Identity: full, 299 DB: db, 300 301 Servers: lifecycle.NewGroup(log.Named("servers")), 302 Services: lifecycle.NewGroup(log.Named("services")), 303 } 304 305 { // setup notification service. 306 peer.Notifications.Service = notifications.NewService(peer.Log, peer.DB.Notifications()) 307 } 308 309 { // setup debug 310 var err error 311 if config.Debug.Address != "" { 312 peer.Debug.Listener, err = net.Listen("tcp", config.Debug.Address) 313 if err != nil { 314 withoutStack := errors.New(err.Error()) 315 peer.Log.Debug("failed to start debug endpoints", zap.Error(withoutStack)) 316 } 317 } 318 debugConfig := config.Debug 319 debugConfig.ControlTitle = "Storage Node" 320 peer.Debug.Server = debug.NewServerWithAtomicLevel(log.Named("debug"), peer.Debug.Listener, monkit.Default, debugConfig, atomicLogLevel) 321 peer.Servers.Add(lifecycle.Item{ 322 Name: "debug", 323 Run: peer.Debug.Server.Run, 324 Close: peer.Debug.Server.Close, 325 }) 326 } 327 328 var err error 329 330 { // version setup 331 if !versionInfo.IsZero() { 332 peer.Log.Debug("Version info", 333 zap.Stringer("Version", versionInfo.Version.Version), 334 zap.String("Commit Hash", versionInfo.CommitHash), 335 zap.Stringer("Build Timestamp", versionInfo.Timestamp), 336 zap.Bool("Release Build", versionInfo.Release), 337 ) 338 } 339 340 peer.Version.Service = checker.NewService(log.Named("version"), config.Version, versionInfo, "Storagenode") 341 versionCheckInterval := 12 * time.Hour 342 peer.Version.Chore = version2.NewChore(peer.Log.Named("version:chore"), peer.Version.Service, peer.Notifications.Service, peer.Identity.ID, versionCheckInterval) 343 peer.Services.Add(lifecycle.Item{ 344 Name: "version", 345 Run: peer.Version.Chore.Run, 346 }) 347 } 348 349 { // setup listener and server 350 sc := config.Server 351 352 tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB) 353 if err != nil { 354 return nil, errs.Combine(err, peer.Close()) 355 } 356 357 peer.Dialer = rpc.NewDefaultDialer(tlsOptions) 358 359 peer.Server, err = server.New(log.Named("server"), tlsOptions, sc) 360 if err != nil { 361 return nil, errs.Combine(err, peer.Close()) 362 } 363 364 peer.Servers.Add(lifecycle.Item{ 365 Name: "server", 366 Run: func(ctx context.Context) error { 367 // Don't change the format of this comment, it is used to figure out the node id. 368 peer.Log.Info(fmt.Sprintf("Node %s started", peer.Identity.ID)) 369 peer.Log.Info(fmt.Sprintf("Public server started on %s", peer.Addr())) 370 peer.Log.Info(fmt.Sprintf("Private server started on %s", peer.PrivateAddr())) 371 return peer.Server.Run(ctx) 372 }, 373 Close: peer.Server.Close, 374 }) 375 } 376 377 { // setup trust pool 378 peer.Storage2.Trust, err = trust.NewPool(log.Named("trust"), trust.Dialer(peer.Dialer), config.Storage2.Trust, peer.DB.Satellites()) 379 if err != nil { 380 return nil, errs.Combine(err, peer.Close()) 381 } 382 peer.Services.Add(lifecycle.Item{ 383 Name: "trust", 384 Run: peer.Storage2.Trust.Run, 385 }) 386 } 387 388 { 389 peer.Preflight.LocalTime = preflight.NewLocalTime(peer.Log.Named("preflight:localtime"), config.Preflight, peer.Storage2.Trust, peer.Dialer) 390 } 391 392 { // setup contact service 393 c := config.Contact 394 if c.ExternalAddress == "" { 395 c.ExternalAddress = peer.Addr() 396 } 397 398 pbVersion, err := versionInfo.Proto() 399 if err != nil { 400 return nil, errs.Combine(err, peer.Close()) 401 } 402 self := contact.NodeInfo{ 403 ID: peer.ID(), 404 Address: c.ExternalAddress, 405 Operator: pb.NodeOperator{ 406 Email: config.Operator.Email, 407 Wallet: config.Operator.Wallet, 408 WalletFeatures: config.Operator.WalletFeatures, 409 }, 410 Version: *pbVersion, 411 } 412 peer.Contact.PingStats = new(contact.PingStats) 413 peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), peer.Dialer, self, peer.Storage2.Trust) 414 415 peer.Contact.Chore = contact.NewChore(peer.Log.Named("contact:chore"), config.Contact.Interval, peer.Contact.Service) 416 peer.Services.Add(lifecycle.Item{ 417 Name: "contact:chore", 418 Run: peer.Contact.Chore.Run, 419 Close: peer.Contact.Chore.Close, 420 }) 421 422 peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Storage2.Trust, peer.Contact.PingStats) 423 if err := pb.DRPCRegisterContact(peer.Server.DRPC(), peer.Contact.Endpoint); err != nil { 424 return nil, errs.Combine(err, peer.Close()) 425 } 426 } 427 428 { // setup storage 429 peer.Storage2.BlobsCache = pieces.NewBlobsUsageCache(peer.Log.Named("blobscache"), peer.DB.Pieces()) 430 431 peer.Storage2.Store = pieces.NewStore(peer.Log.Named("pieces"), 432 peer.Storage2.BlobsCache, 433 peer.DB.V0PieceInfo(), 434 peer.DB.PieceExpirationDB(), 435 peer.DB.PieceSpaceUsedDB(), 436 config.Pieces, 437 ) 438 439 peer.Storage2.PieceDeleter = pieces.NewDeleter(log.Named("piecedeleter"), peer.Storage2.Store, config.Storage2.DeleteWorkers, config.Storage2.DeleteQueueSize) 440 peer.Services.Add(lifecycle.Item{ 441 Name: "PieceDeleter", 442 Run: peer.Storage2.PieceDeleter.Run, 443 Close: peer.Storage2.PieceDeleter.Close, 444 }) 445 446 peer.Storage2.TrashChore = pieces.NewTrashChore( 447 log.Named("pieces:trash"), 448 24*time.Hour, // choreInterval: how often to run the chore 449 7*24*time.Hour, // trashExpiryInterval: when items in the trash should be deleted 450 peer.Storage2.Trust, 451 peer.Storage2.Store, 452 ) 453 peer.Services.Add(lifecycle.Item{ 454 Name: "pieces:trash", 455 Run: peer.Storage2.TrashChore.Run, 456 Close: peer.Storage2.TrashChore.Close, 457 }) 458 459 peer.Storage2.CacheService = pieces.NewService( 460 log.Named("piecestore:cache"), 461 peer.Storage2.BlobsCache, 462 peer.Storage2.Store, 463 config.Storage2.CacheSyncInterval, 464 ) 465 peer.Services.Add(lifecycle.Item{ 466 Name: "piecestore:cache", 467 Run: peer.Storage2.CacheService.Run, 468 Close: peer.Storage2.CacheService.Close, 469 }) 470 peer.Debug.Server.Panel.Add( 471 debug.Cycle("Piecestore Cache", peer.Storage2.CacheService.Loop)) 472 473 peer.Storage2.Monitor = monitor.NewService( 474 log.Named("piecestore:monitor"), 475 peer.Storage2.Store, 476 peer.Contact.Service, 477 peer.DB.Bandwidth(), 478 config.Storage.AllocatedDiskSpace.Int64(), 479 // TODO: use config.Storage.Monitor.Interval, but for some reason is not set 480 config.Storage.KBucketRefreshInterval, 481 peer.Contact.Chore.Trigger, 482 config.Storage2.Monitor, 483 ) 484 peer.Services.Add(lifecycle.Item{ 485 Name: "piecestore:monitor", 486 Run: peer.Storage2.Monitor.Run, 487 Close: peer.Storage2.Monitor.Close, 488 }) 489 peer.Debug.Server.Panel.Add( 490 debug.Cycle("Piecestore Monitor", peer.Storage2.Monitor.Loop)) 491 492 peer.Storage2.RetainService = retain.NewService( 493 peer.Log.Named("retain"), 494 peer.Storage2.Store, 495 config.Retain, 496 ) 497 peer.Services.Add(lifecycle.Item{ 498 Name: "retain", 499 Run: peer.Storage2.RetainService.Run, 500 Close: peer.Storage2.RetainService.Close, 501 }) 502 503 peer.UsedSerials = usedserials.NewTable(config.Storage2.MaxUsedSerialsSize) 504 505 peer.OrdersStore, err = orders.NewFileStore( 506 peer.Log.Named("ordersfilestore"), 507 config.Storage2.Orders.Path, 508 config.Storage2.OrderLimitGracePeriod, 509 ) 510 if err != nil { 511 return nil, errs.Combine(err, peer.Close()) 512 } 513 514 peer.Storage2.Endpoint, err = piecestore.NewEndpoint( 515 peer.Log.Named("piecestore"), 516 signing.SignerFromFullIdentity(peer.Identity), 517 peer.Storage2.Trust, 518 peer.Storage2.Monitor, 519 peer.Storage2.RetainService, 520 peer.Contact.PingStats, 521 peer.Storage2.Store, 522 peer.Storage2.PieceDeleter, 523 peer.OrdersStore, 524 peer.DB.Bandwidth(), 525 peer.UsedSerials, 526 config.Storage2, 527 ) 528 if err != nil { 529 return nil, errs.Combine(err, peer.Close()) 530 } 531 532 if err := pb.DRPCRegisterPiecestore(peer.Server.DRPC(), peer.Storage2.Endpoint); err != nil { 533 return nil, errs.Combine(err, peer.Close()) 534 } 535 536 // TODO workaround for custom timeout for order sending request (read/write) 537 sc := config.Server 538 539 tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB) 540 if err != nil { 541 return nil, errs.Combine(err, peer.Close()) 542 } 543 544 dialer := rpc.NewDefaultDialer(tlsOptions) 545 dialer.DialTimeout = config.Storage2.Orders.SenderDialTimeout 546 547 peer.Storage2.Orders = orders.NewService( 548 log.Named("orders"), 549 dialer, 550 peer.OrdersStore, 551 peer.DB.Orders(), 552 peer.Storage2.Trust, 553 config.Storage2.Orders, 554 ) 555 peer.Services.Add(lifecycle.Item{ 556 Name: "orders", 557 Run: peer.Storage2.Orders.Run, 558 Close: peer.Storage2.Orders.Close, 559 }) 560 peer.Debug.Server.Panel.Add( 561 debug.Cycle("Orders Sender", peer.Storage2.Orders.Sender)) 562 peer.Debug.Server.Panel.Add( 563 debug.Cycle("Orders Cleanup", peer.Storage2.Orders.Cleanup)) 564 } 565 566 { // setup payouts. 567 peer.Payout.Service, err = payouts.NewService( 568 peer.Log.Named("payouts:service"), 569 peer.DB.Payout(), 570 peer.DB.Reputation(), 571 peer.DB.Satellites(), 572 peer.Storage2.Trust, 573 ) 574 if err != nil { 575 return nil, errs.Combine(err, peer.Close()) 576 } 577 578 peer.Payout.Endpoint = payouts.NewEndpoint( 579 peer.Log.Named("payouts:endpoint"), 580 peer.Dialer, 581 peer.Storage2.Trust, 582 ) 583 } 584 585 { // setup reputation service. 586 peer.Reputation = reputation.NewService( 587 peer.Log.Named("reputation:service"), 588 peer.DB.Reputation(), 589 peer.Identity.ID, 590 peer.Notifications.Service, 591 ) 592 } 593 594 { // setup node stats service 595 peer.NodeStats.Service = nodestats.NewService( 596 peer.Log.Named("nodestats:service"), 597 peer.Dialer, 598 peer.Storage2.Trust, 599 ) 600 601 peer.NodeStats.Cache = nodestats.NewCache( 602 peer.Log.Named("nodestats:cache"), 603 config.Nodestats, 604 nodestats.CacheStorage{ 605 Reputation: peer.DB.Reputation(), 606 StorageUsage: peer.DB.StorageUsage(), 607 Payout: peer.DB.Payout(), 608 Pricing: peer.DB.Pricing(), 609 }, 610 peer.NodeStats.Service, 611 peer.Payout.Endpoint, 612 peer.Reputation, 613 peer.Storage2.Trust, 614 ) 615 peer.Services.Add(lifecycle.Item{ 616 Name: "nodestats:cache", 617 Run: peer.NodeStats.Cache.Run, 618 Close: peer.NodeStats.Cache.Close, 619 }) 620 peer.Debug.Server.Panel.Add( 621 debug.Cycle("Node Stats Cache Reputation", peer.NodeStats.Cache.Reputation)) 622 peer.Debug.Server.Panel.Add( 623 debug.Cycle("Node Stats Cache Storage", peer.NodeStats.Cache.Storage)) 624 } 625 626 { // setup estimation service 627 peer.Estimation.Service = estimatedpayouts.NewService( 628 peer.DB.Bandwidth(), 629 peer.DB.Reputation(), 630 peer.DB.StorageUsage(), 631 peer.DB.Pricing(), 632 peer.DB.Satellites(), 633 peer.Storage2.Trust, 634 ) 635 } 636 637 { // setup storage node operator dashboard 638 peer.Console.Service, err = console.NewService( 639 peer.Log.Named("console:service"), 640 peer.DB.Bandwidth(), 641 peer.Storage2.Store, 642 peer.Version.Service, 643 config.Storage.AllocatedDiskSpace, 644 config.Operator.Wallet, 645 versionInfo, 646 peer.Storage2.Trust, 647 peer.DB.Reputation(), 648 peer.DB.StorageUsage(), 649 peer.DB.Pricing(), 650 peer.DB.Satellites(), 651 peer.Contact.PingStats, 652 peer.Contact.Service, 653 peer.Estimation.Service, 654 peer.Storage2.BlobsCache, 655 config.Operator.WalletFeatures, 656 ) 657 if err != nil { 658 return nil, errs.Combine(err, peer.Close()) 659 } 660 661 peer.Console.Listener, err = net.Listen("tcp", config.Console.Address) 662 if err != nil { 663 return nil, errs.Combine(err, peer.Close()) 664 } 665 666 assets := consoleassets.FileSystem 667 if config.Console.StaticDir != "" { 668 // a specific directory has been configured. use it 669 assets = http.Dir(config.Console.StaticDir) 670 } 671 672 peer.Console.Endpoint = consoleserver.NewServer( 673 peer.Log.Named("console:endpoint"), 674 assets, 675 peer.Notifications.Service, 676 peer.Console.Service, 677 peer.Payout.Service, 678 peer.Console.Listener, 679 ) 680 peer.Services.Add(lifecycle.Item{ 681 Name: "console:endpoint", 682 Run: peer.Console.Endpoint.Run, 683 Close: peer.Console.Endpoint.Close, 684 }) 685 } 686 687 { // setup storage inspector 688 peer.Storage2.Inspector = inspector.NewEndpoint( 689 peer.Log.Named("pieces:inspector"), 690 peer.Storage2.Store, 691 peer.Contact.Service, 692 peer.Contact.PingStats, 693 peer.DB.Bandwidth(), 694 config.Storage, 695 peer.Console.Listener.Addr(), 696 config.Contact.ExternalAddress, 697 ) 698 if err := internalpb.DRPCRegisterPieceStoreInspector(peer.Server.PrivateDRPC(), peer.Storage2.Inspector); err != nil { 699 return nil, errs.Combine(err, peer.Close()) 700 } 701 } 702 703 { // setup piecetransfer service 704 peer.PieceTransfer.Service = piecetransfer.NewService( 705 peer.Log.Named("piecetransfer"), 706 peer.Storage2.Store, 707 peer.Storage2.Trust, 708 peer.Dialer, 709 // using GracefulExit config here for historical reasons 710 config.GracefulExit.MinDownloadTimeout, 711 config.GracefulExit.MinBytesPerSecond, 712 ) 713 } 714 715 { // setup graceful exit service 716 peer.GracefulExit.Service = gracefulexit.NewService( 717 peer.Log.Named("gracefulexit:service"), 718 peer.Storage2.Store, 719 peer.Storage2.Trust, 720 peer.DB.Satellites(), 721 peer.Dialer, 722 config.GracefulExit, 723 ) 724 725 peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint( 726 peer.Log.Named("gracefulexit:endpoint"), 727 peer.Storage2.Trust, 728 peer.DB.Satellites(), 729 peer.Dialer, 730 peer.Storage2.BlobsCache, 731 ) 732 if err := internalpb.DRPCRegisterNodeGracefulExit(peer.Server.PrivateDRPC(), peer.GracefulExit.Endpoint); err != nil { 733 return nil, errs.Combine(err, peer.Close()) 734 } 735 736 peer.GracefulExit.Chore = gracefulexit.NewChore( 737 peer.Log.Named("gracefulexit:chore"), 738 peer.GracefulExit.Service, 739 peer.PieceTransfer.Service, 740 peer.Dialer, 741 config.GracefulExit, 742 ) 743 peer.GracefulExit.BlobsCleaner = gracefulexit.NewBlobsCleaner( 744 peer.Log.Named("gracefulexit:blobscleaner"), 745 peer.Storage2.Store, 746 peer.Storage2.Trust, 747 peer.DB.Satellites(), 748 ) 749 // Runs once on node start to clean blobs from trash that left after successful GE. 750 peer.Services.Add(lifecycle.Item{ 751 Name: "gracefulexit:blobscleaner", 752 Run: peer.GracefulExit.BlobsCleaner.RemoveBlobs, 753 }) 754 peer.Services.Add(lifecycle.Item{ 755 Name: "gracefulexit:chore", 756 Run: peer.GracefulExit.Chore.Run, 757 Close: peer.GracefulExit.Chore.Close, 758 }) 759 peer.Debug.Server.Panel.Add( 760 debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop)) 761 } 762 763 peer.Collector = collector.NewService(peer.Log.Named("collector"), peer.Storage2.Store, peer.UsedSerials, config.Collector) 764 peer.Services.Add(lifecycle.Item{ 765 Name: "collector", 766 Run: peer.Collector.Run, 767 Close: peer.Collector.Close, 768 }) 769 peer.Debug.Server.Panel.Add( 770 debug.Cycle("Collector", peer.Collector.Loop)) 771 772 peer.Bandwidth = bandwidth.NewService(peer.Log.Named("bandwidth"), peer.DB.Bandwidth(), config.Bandwidth) 773 peer.Services.Add(lifecycle.Item{ 774 Name: "bandwidth", 775 Run: peer.Bandwidth.Run, 776 Close: peer.Bandwidth.Close, 777 }) 778 peer.Debug.Server.Panel.Add( 779 debug.Cycle("Bandwidth", peer.Bandwidth.Loop)) 780 781 { // setup multinode endpoints 782 // TODO: add to peer? 783 apiKeys := apikeys.NewService(peer.DB.APIKeys()) 784 785 peer.Multinode.Storage = multinode.NewStorageEndpoint( 786 peer.Log.Named("multinode:storage-endpoint"), 787 apiKeys, 788 peer.Storage2.Monitor, 789 peer.DB.StorageUsage(), 790 ) 791 792 peer.Multinode.Bandwidth = multinode.NewBandwidthEndpoint( 793 peer.Log.Named("multinode:bandwidth-endpoint"), 794 apiKeys, 795 peer.DB.Bandwidth(), 796 ) 797 798 peer.Multinode.Node = multinode.NewNodeEndpoint( 799 peer.Log.Named("multinode:node-endpoint"), 800 config.Operator, 801 apiKeys, 802 peer.Version.Service.Info, 803 peer.Contact.PingStats, 804 peer.DB.Reputation(), 805 peer.Storage2.Trust, 806 ) 807 808 peer.Multinode.Payout = multinode.NewPayoutEndpoint( 809 peer.Log.Named("multinode:payout-endpoint"), 810 apiKeys, 811 peer.DB.Payout(), 812 peer.Estimation.Service, 813 peer.Payout.Service, 814 ) 815 816 if err = multinodepb.DRPCRegisterStorage(peer.Server.DRPC(), peer.Multinode.Storage); err != nil { 817 return nil, errs.Combine(err, peer.Close()) 818 } 819 if err = multinodepb.DRPCRegisterBandwidth(peer.Server.DRPC(), peer.Multinode.Bandwidth); err != nil { 820 return nil, errs.Combine(err, peer.Close()) 821 } 822 if err = multinodepb.DRPCRegisterNode(peer.Server.DRPC(), peer.Multinode.Node); err != nil { 823 return nil, errs.Combine(err, peer.Close()) 824 } 825 if err = multinodepb.DRPCRegisterPayout(peer.Server.DRPC(), peer.Multinode.Payout); err != nil { 826 return nil, errs.Combine(err, peer.Close()) 827 } 828 if err = multinodepb.DRPCRegisterPayouts(peer.Server.DRPC(), peer.Multinode.Payout); err != nil { 829 return nil, errs.Combine(err, peer.Close()) 830 } 831 } 832 833 return peer, nil 834} 835 836// Run runs storage node until it's either closed or it errors. 837func (peer *Peer) Run(ctx context.Context) (err error) { 838 defer mon.Task()(&ctx)(&err) 839 840 // Refresh the trust pool first. It will be updated periodically via 841 // Run() below. 842 if err := peer.Storage2.Trust.Refresh(ctx); err != nil { 843 return err 844 } 845 846 if err := peer.Preflight.LocalTime.Check(ctx); err != nil { 847 peer.Log.Error("Failed preflight check.", zap.Error(err)) 848 return err 849 } 850 851 group, ctx := errgroup.WithContext(ctx) 852 853 peer.Servers.Run(ctx, group) 854 peer.Services.Run(ctx, group) 855 856 return group.Wait() 857} 858 859// Close closes all the resources. 860func (peer *Peer) Close() error { 861 return errs.Combine( 862 peer.Servers.Close(), 863 peer.Services.Close(), 864 ) 865} 866 867// ID returns the peer ID. 868func (peer *Peer) ID() storj.NodeID { return peer.Identity.ID } 869 870// Addr returns the public address. 871func (peer *Peer) Addr() string { return peer.Server.Addr().String() } 872 873// URL returns the storj.NodeURL. 874func (peer *Peer) URL() storj.NodeURL { return storj.NodeURL{ID: peer.ID(), Address: peer.Addr()} } 875 876// PrivateAddr returns the private address. 877func (peer *Peer) PrivateAddr() string { return peer.Server.PrivateAddr().String() } 878