1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package satellite 5 6import ( 7 "context" 8 "errors" 9 "fmt" 10 "net" 11 "net/mail" 12 "net/smtp" 13 14 "github.com/spacemonkeygo/monkit/v3" 15 "github.com/zeebo/errs" 16 "go.uber.org/zap" 17 "golang.org/x/sync/errgroup" 18 19 "storj.io/common/identity" 20 "storj.io/common/pb" 21 "storj.io/common/peertls/extensions" 22 "storj.io/common/peertls/tlsopts" 23 "storj.io/common/rpc" 24 "storj.io/common/signing" 25 "storj.io/common/storj" 26 "storj.io/private/debug" 27 "storj.io/private/version" 28 "storj.io/storj/private/lifecycle" 29 "storj.io/storj/private/post" 30 "storj.io/storj/private/post/oauth2" 31 "storj.io/storj/private/server" 32 "storj.io/storj/private/version/checker" 33 "storj.io/storj/satellite/accounting" 34 "storj.io/storj/satellite/analytics" 35 "storj.io/storj/satellite/buckets" 36 "storj.io/storj/satellite/console" 37 "storj.io/storj/satellite/console/consoleauth" 38 "storj.io/storj/satellite/console/consoleweb" 39 "storj.io/storj/satellite/contact" 40 "storj.io/storj/satellite/gracefulexit" 41 "storj.io/storj/satellite/inspector" 42 "storj.io/storj/satellite/internalpb" 43 "storj.io/storj/satellite/mailservice" 44 "storj.io/storj/satellite/mailservice/simulate" 45 "storj.io/storj/satellite/metabase" 46 "storj.io/storj/satellite/metainfo" 47 "storj.io/storj/satellite/metainfo/piecedeletion" 48 "storj.io/storj/satellite/nodestats" 49 "storj.io/storj/satellite/orders" 50 "storj.io/storj/satellite/overlay" 51 "storj.io/storj/satellite/payments" 52 "storj.io/storj/satellite/payments/paymentsconfig" 53 "storj.io/storj/satellite/payments/stripecoinpayments" 54 "storj.io/storj/satellite/reputation" 55 "storj.io/storj/satellite/rewards" 56 "storj.io/storj/satellite/snopayouts" 57) 58 59// API is the satellite API process. 60// 61// architecture: Peer 62type API struct { 63 Log *zap.Logger 64 Identity *identity.FullIdentity 65 DB DB 66 67 Servers *lifecycle.Group 68 Services *lifecycle.Group 69 70 Dialer rpc.Dialer 71 Server *server.Server 72 ExternalAddress string 73 74 Version struct { 75 Chore *checker.Chore 76 Service *checker.Service 77 } 78 79 Debug struct { 80 Listener net.Listener 81 Server *debug.Server 82 } 83 84 Contact struct { 85 Service *contact.Service 86 Endpoint *contact.Endpoint 87 } 88 89 Overlay struct { 90 DB overlay.DB 91 Service *overlay.Service 92 } 93 94 Reputation struct { 95 Service *reputation.Service 96 } 97 98 Orders struct { 99 DB orders.DB 100 Endpoint *orders.Endpoint 101 Service *orders.Service 102 Chore *orders.Chore 103 } 104 105 Metainfo struct { 106 Metabase *metabase.DB 107 PieceDeletion *piecedeletion.Service 108 Endpoint *metainfo.Endpoint 109 } 110 111 Inspector struct { 112 Endpoint *inspector.Endpoint 113 } 114 115 Accounting struct { 116 ProjectUsage *accounting.Service 117 } 118 119 LiveAccounting struct { 120 Cache accounting.Cache 121 } 122 123 ProjectLimits struct { 124 Cache *accounting.ProjectLimitCache 125 } 126 127 Mail struct { 128 Service *mailservice.Service 129 } 130 131 Payments struct { 132 Accounts payments.Accounts 133 Conversion *stripecoinpayments.ConversionService 134 Service *stripecoinpayments.Service 135 Stripe stripecoinpayments.StripeClient 136 } 137 138 Console struct { 139 Listener net.Listener 140 Service *console.Service 141 Endpoint *consoleweb.Server 142 } 143 144 Marketing struct { 145 PartnersService *rewards.PartnersService 146 } 147 148 NodeStats struct { 149 Endpoint *nodestats.Endpoint 150 } 151 152 SNOPayouts struct { 153 Endpoint *snopayouts.Endpoint 154 Service *snopayouts.Service 155 DB snopayouts.DB 156 } 157 158 GracefulExit struct { 159 Endpoint *gracefulexit.Endpoint 160 } 161 162 Analytics struct { 163 Service *analytics.Service 164 } 165 166 Buckets struct { 167 Service *buckets.Service 168 } 169} 170 171// NewAPI creates a new satellite API process. 172func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB, 173 metabaseDB *metabase.DB, revocationDB extensions.RevocationDB, 174 liveAccounting accounting.Cache, rollupsWriteCache *orders.RollupsWriteCache, 175 config *Config, versionInfo version.Info, atomicLogLevel *zap.AtomicLevel) (*API, error) { 176 peer := &API{ 177 Log: log, 178 Identity: full, 179 DB: db, 180 ExternalAddress: config.Contact.ExternalAddress, 181 182 Servers: lifecycle.NewGroup(log.Named("servers")), 183 Services: lifecycle.NewGroup(log.Named("services")), 184 } 185 186 { // setup buckets service 187 peer.Buckets.Service = buckets.NewService(db.Buckets(), metabaseDB) 188 } 189 190 { // setup debug 191 var err error 192 if config.Debug.Address != "" { 193 peer.Debug.Listener, err = net.Listen("tcp", config.Debug.Address) 194 if err != nil { 195 withoutStack := errors.New(err.Error()) 196 peer.Log.Debug("failed to start debug endpoints", zap.Error(withoutStack)) 197 } 198 } 199 debugConfig := config.Debug 200 debugConfig.ControlTitle = "API" 201 peer.Debug.Server = debug.NewServerWithAtomicLevel(log.Named("debug"), peer.Debug.Listener, monkit.Default, debugConfig, atomicLogLevel) 202 peer.Servers.Add(lifecycle.Item{ 203 Name: "debug", 204 Run: peer.Debug.Server.Run, 205 Close: peer.Debug.Server.Close, 206 }) 207 } 208 209 var err error 210 211 { 212 peer.Log.Info("Version info", 213 zap.Stringer("Version", versionInfo.Version.Version), 214 zap.String("Commit Hash", versionInfo.CommitHash), 215 zap.Stringer("Build Timestamp", versionInfo.Timestamp), 216 zap.Bool("Release Build", versionInfo.Release), 217 ) 218 219 peer.Version.Service = checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite") 220 peer.Version.Chore = checker.NewChore(peer.Version.Service, config.Version.CheckInterval) 221 222 peer.Services.Add(lifecycle.Item{ 223 Name: "version", 224 Run: peer.Version.Chore.Run, 225 }) 226 } 227 228 { // setup listener and server 229 sc := config.Server 230 231 tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB) 232 if err != nil { 233 return nil, errs.Combine(err, peer.Close()) 234 } 235 236 peer.Dialer = rpc.NewDefaultDialer(tlsOptions) 237 238 peer.Server, err = server.New(log.Named("server"), tlsOptions, sc) 239 if err != nil { 240 return nil, errs.Combine(err, peer.Close()) 241 } 242 243 if peer.ExternalAddress == "" { 244 // not ideal, but better than nothing 245 peer.ExternalAddress = peer.Server.Addr().String() 246 } 247 248 peer.Servers.Add(lifecycle.Item{ 249 Name: "server", 250 Run: func(ctx context.Context) error { 251 // Don't change the format of this comment, it is used to figure out the node id. 252 peer.Log.Info(fmt.Sprintf("Node %s started", peer.Identity.ID)) 253 peer.Log.Info(fmt.Sprintf("Public server started on %s", peer.Addr())) 254 peer.Log.Info(fmt.Sprintf("Private server started on %s", peer.PrivateAddr())) 255 return peer.Server.Run(ctx) 256 }, 257 Close: peer.Server.Close, 258 }) 259 } 260 261 { // setup overlay 262 peer.Overlay.DB = peer.DB.OverlayCache() 263 264 peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay) 265 if err != nil { 266 return nil, errs.Combine(err, peer.Close()) 267 } 268 peer.Services.Add(lifecycle.Item{ 269 Name: "overlay", 270 Close: peer.Overlay.Service.Close, 271 }) 272 } 273 274 { // setup reputation 275 peer.Reputation.Service = reputation.NewService(peer.Log.Named("reputation"), peer.Overlay.DB, peer.DB.Reputation(), config.Reputation) 276 277 peer.Services.Add(lifecycle.Item{ 278 Name: "reputation", 279 Close: peer.Reputation.Service.Close, 280 }) 281 } 282 283 { // setup contact service 284 pbVersion, err := versionInfo.Proto() 285 if err != nil { 286 return nil, errs.Combine(err, peer.Close()) 287 } 288 289 self := &overlay.NodeDossier{ 290 Node: pb.Node{ 291 Id: peer.ID(), 292 Address: &pb.NodeAddress{ 293 Address: peer.Addr(), 294 }, 295 }, 296 Type: pb.NodeType_SATELLITE, 297 Version: *pbVersion, 298 } 299 peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact) 300 peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service) 301 if err := pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint); err != nil { 302 return nil, errs.Combine(err, peer.Close()) 303 } 304 305 peer.Services.Add(lifecycle.Item{ 306 Name: "contact:service", 307 Close: peer.Contact.Service.Close, 308 }) 309 } 310 311 { // setup live accounting 312 peer.LiveAccounting.Cache = liveAccounting 313 } 314 315 { // setup project limits 316 peer.ProjectLimits.Cache = accounting.NewProjectLimitCache(peer.DB.ProjectAccounting(), 317 config.Console.Config.UsageLimits.Storage.Free, 318 config.Console.Config.UsageLimits.Bandwidth.Free, 319 1000000, // TODO this will be correctly populated with up coming change 320 config.ProjectLimit, 321 ) 322 } 323 324 { // setup accounting project usage 325 peer.Accounting.ProjectUsage = accounting.NewService( 326 peer.DB.ProjectAccounting(), 327 peer.LiveAccounting.Cache, 328 peer.ProjectLimits.Cache, 329 config.LiveAccounting.BandwidthCacheTTL, 330 config.LiveAccounting.AsOfSystemInterval, 331 ) 332 } 333 334 { // setup orders 335 peer.Orders.DB = rollupsWriteCache 336 peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders) 337 peer.Services.Add(lifecycle.Item{ 338 Name: "orders:chore", 339 Run: peer.Orders.Chore.Run, 340 Close: peer.Orders.Chore.Close, 341 }) 342 peer.Debug.Server.Panel.Add( 343 debug.Cycle("Orders Chore", peer.Orders.Chore.Loop)) 344 var err error 345 peer.Orders.Service, err = orders.NewService( 346 peer.Log.Named("orders:service"), 347 signing.SignerFromFullIdentity(peer.Identity), 348 peer.Overlay.Service, 349 peer.Orders.DB, 350 peer.Buckets.Service, 351 config.Orders, 352 ) 353 if err != nil { 354 return nil, errs.Combine(err, peer.Close()) 355 } 356 357 satelliteSignee := signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity()) 358 peer.Orders.Endpoint = orders.NewEndpoint( 359 peer.Log.Named("orders:endpoint"), 360 satelliteSignee, 361 peer.Orders.DB, 362 peer.DB.NodeAPIVersion(), 363 config.Orders.OrdersSemaphoreSize, 364 peer.Orders.Service, 365 ) 366 367 if err := pb.DRPCRegisterOrders(peer.Server.DRPC(), peer.Orders.Endpoint); err != nil { 368 return nil, errs.Combine(err, peer.Close()) 369 } 370 } 371 372 { // setup marketing partners service 373 peer.Marketing.PartnersService = rewards.NewPartnersService( 374 peer.Log.Named("partners"), 375 rewards.DefaultPartnersDB, 376 ) 377 } 378 379 { // setup analytics service 380 peer.Analytics.Service = analytics.NewService(peer.Log.Named("analytics:service"), config.Analytics, config.Console.SatelliteName) 381 382 peer.Services.Add(lifecycle.Item{ 383 Name: "analytics:service", 384 Run: peer.Analytics.Service.Run, 385 Close: peer.Analytics.Service.Close, 386 }) 387 } 388 389 { // setup metainfo 390 peer.Metainfo.Metabase = metabaseDB 391 392 peer.Metainfo.PieceDeletion, err = piecedeletion.NewService( 393 peer.Log.Named("metainfo:piecedeletion"), 394 peer.Dialer, 395 peer.Overlay.Service, 396 config.Metainfo.PieceDeletion, 397 ) 398 if err != nil { 399 return nil, errs.Combine(err, peer.Close()) 400 } 401 peer.Services.Add(lifecycle.Item{ 402 Name: "metainfo:piecedeletion", 403 Run: peer.Metainfo.PieceDeletion.Run, 404 Close: peer.Metainfo.PieceDeletion.Close, 405 }) 406 407 peer.Metainfo.Endpoint, err = metainfo.NewEndpoint( 408 peer.Log.Named("metainfo:endpoint"), 409 peer.Buckets.Service, 410 peer.Metainfo.Metabase, 411 peer.Metainfo.PieceDeletion, 412 peer.Orders.Service, 413 peer.Overlay.Service, 414 peer.DB.Attribution(), 415 peer.Marketing.PartnersService, 416 peer.DB.PeerIdentities(), 417 peer.DB.Console().APIKeys(), 418 peer.Accounting.ProjectUsage, 419 peer.DB.Console().Projects(), 420 signing.SignerFromFullIdentity(peer.Identity), 421 peer.DB.Revocation(), 422 config.Metainfo, 423 ) 424 if err != nil { 425 return nil, errs.Combine(err, peer.Close()) 426 } 427 428 if err := pb.DRPCRegisterMetainfo(peer.Server.DRPC(), peer.Metainfo.Endpoint); err != nil { 429 return nil, errs.Combine(err, peer.Close()) 430 } 431 432 peer.Services.Add(lifecycle.Item{ 433 Name: "metainfo:endpoint", 434 Close: peer.Metainfo.Endpoint.Close, 435 }) 436 } 437 438 { // setup inspector 439 peer.Inspector.Endpoint = inspector.NewEndpoint( 440 peer.Log.Named("inspector"), 441 peer.Overlay.Service, 442 peer.Metainfo.Metabase, 443 ) 444 if err := internalpb.DRPCRegisterHealthInspector(peer.Server.PrivateDRPC(), peer.Inspector.Endpoint); err != nil { 445 return nil, errs.Combine(err, peer.Close()) 446 } 447 } 448 449 { // setup mailservice 450 // TODO(yar): test multiple satellites using same OAUTH credentials 451 mailConfig := config.Mail 452 453 // validate from mail address 454 from, err := mail.ParseAddress(mailConfig.From) 455 if err != nil { 456 return nil, errs.Combine(err, peer.Close()) 457 } 458 459 // validate smtp server address 460 host, _, err := net.SplitHostPort(mailConfig.SMTPServerAddress) 461 if err != nil { 462 return nil, errs.Combine(err, peer.Close()) 463 } 464 465 var sender mailservice.Sender 466 switch mailConfig.AuthType { 467 case "oauth2": 468 creds := oauth2.Credentials{ 469 ClientID: mailConfig.ClientID, 470 ClientSecret: mailConfig.ClientSecret, 471 TokenURI: mailConfig.TokenURI, 472 } 473 token, err := oauth2.RefreshToken(context.TODO(), creds, mailConfig.RefreshToken) 474 if err != nil { 475 return nil, errs.Combine(err, peer.Close()) 476 } 477 478 sender = &post.SMTPSender{ 479 From: *from, 480 Auth: &oauth2.Auth{ 481 UserEmail: from.Address, 482 Storage: oauth2.NewTokenStore(creds, *token), 483 }, 484 ServerAddress: mailConfig.SMTPServerAddress, 485 } 486 case "plain": 487 sender = &post.SMTPSender{ 488 From: *from, 489 Auth: smtp.PlainAuth("", mailConfig.Login, mailConfig.Password, host), 490 ServerAddress: mailConfig.SMTPServerAddress, 491 } 492 case "login": 493 sender = &post.SMTPSender{ 494 From: *from, 495 Auth: post.LoginAuth{ 496 Username: mailConfig.Login, 497 Password: mailConfig.Password, 498 }, 499 ServerAddress: mailConfig.SMTPServerAddress, 500 } 501 default: 502 sender = simulate.NewDefaultLinkClicker() 503 } 504 505 peer.Mail.Service, err = mailservice.New( 506 peer.Log.Named("mail:service"), 507 sender, 508 mailConfig.TemplatePath, 509 ) 510 if err != nil { 511 return nil, errs.Combine(err, peer.Close()) 512 } 513 514 peer.Services.Add(lifecycle.Item{ 515 Name: "mail:service", 516 Close: peer.Mail.Service.Close, 517 }) 518 } 519 520 { // setup payments 521 pc := config.Payments 522 523 var stripeClient stripecoinpayments.StripeClient 524 switch pc.Provider { 525 default: 526 stripeClient = stripecoinpayments.NewStripeMock( 527 peer.ID(), 528 peer.DB.StripeCoinPayments().Customers(), 529 peer.DB.Console().Users(), 530 ) 531 case "stripecoinpayments": 532 stripeClient = stripecoinpayments.NewStripeClient(log, pc.StripeCoinPayments) 533 } 534 535 peer.Payments.Service, err = stripecoinpayments.NewService( 536 peer.Log.Named("payments.stripe:service"), 537 stripeClient, 538 pc.StripeCoinPayments, 539 peer.DB.StripeCoinPayments(), 540 peer.DB.Console().Projects(), 541 peer.DB.ProjectAccounting(), 542 pc.StorageTBPrice, 543 pc.EgressTBPrice, 544 pc.SegmentPrice, 545 pc.BonusRate) 546 547 if err != nil { 548 return nil, errs.Combine(err, peer.Close()) 549 } 550 551 peer.Payments.Stripe = stripeClient 552 peer.Payments.Accounts = peer.Payments.Service.Accounts() 553 peer.Payments.Conversion = stripecoinpayments.NewConversionService( 554 peer.Log.Named("payments.stripe:version"), 555 peer.Payments.Service, 556 pc.StripeCoinPayments.ConversionRatesCycleInterval) 557 558 peer.Services.Add(lifecycle.Item{ 559 Name: "payments.stripe:version", 560 Run: peer.Payments.Conversion.Run, 561 Close: peer.Payments.Conversion.Close, 562 }) 563 } 564 565 { // setup console 566 consoleConfig := config.Console 567 peer.Console.Listener, err = net.Listen("tcp", consoleConfig.Address) 568 if err != nil { 569 return nil, errs.Combine(err, peer.Close()) 570 } 571 if consoleConfig.AuthTokenSecret == "" { 572 return nil, errs.New("Auth token secret required") 573 } 574 575 peer.Console.Service, err = console.NewService( 576 peer.Log.Named("console:service"), 577 &consoleauth.Hmac{Secret: []byte(consoleConfig.AuthTokenSecret)}, 578 peer.DB.Console(), 579 peer.DB.ProjectAccounting(), 580 peer.Accounting.ProjectUsage, 581 peer.Buckets.Service, 582 peer.Marketing.PartnersService, 583 peer.Payments.Accounts, 584 peer.Analytics.Service, 585 consoleConfig.Config, 586 ) 587 if err != nil { 588 return nil, errs.Combine(err, peer.Close()) 589 } 590 591 pricing := paymentsconfig.PricingValues{ 592 StorageTBPrice: config.Payments.StorageTBPrice, 593 EgressTBPrice: config.Payments.EgressTBPrice, 594 SegmentPrice: config.Payments.SegmentPrice, 595 } 596 597 peer.Console.Endpoint = consoleweb.NewServer( 598 peer.Log.Named("console:endpoint"), 599 consoleConfig, 600 peer.Console.Service, 601 peer.Mail.Service, 602 peer.Marketing.PartnersService, 603 peer.Analytics.Service, 604 peer.Console.Listener, 605 config.Payments.StripeCoinPayments.StripePublicKey, 606 pricing, 607 peer.URL(), 608 ) 609 610 peer.Servers.Add(lifecycle.Item{ 611 Name: "console:endpoint", 612 Run: peer.Console.Endpoint.Run, 613 Close: peer.Console.Endpoint.Close, 614 }) 615 } 616 617 { // setup node stats endpoint 618 peer.NodeStats.Endpoint = nodestats.NewEndpoint( 619 peer.Log.Named("nodestats:endpoint"), 620 peer.Overlay.DB, 621 peer.Reputation.Service, 622 peer.DB.StoragenodeAccounting(), 623 config.Payments, 624 ) 625 if err := pb.DRPCRegisterNodeStats(peer.Server.DRPC(), peer.NodeStats.Endpoint); err != nil { 626 return nil, errs.Combine(err, peer.Close()) 627 } 628 } 629 630 { // setup SnoPayout endpoint 631 peer.SNOPayouts.DB = peer.DB.SNOPayouts() 632 peer.SNOPayouts.Service = snopayouts.NewService( 633 peer.Log.Named("payouts:service"), 634 peer.SNOPayouts.DB) 635 peer.SNOPayouts.Endpoint = snopayouts.NewEndpoint( 636 peer.Log.Named("payouts:endpoint"), 637 peer.DB.StoragenodeAccounting(), 638 peer.Overlay.DB, 639 peer.SNOPayouts.Service) 640 if err := pb.DRPCRegisterHeldAmount(peer.Server.DRPC(), peer.SNOPayouts.Endpoint); err != nil { 641 return nil, errs.Combine(err, peer.Close()) 642 } 643 } 644 645 { // setup graceful exit 646 if config.GracefulExit.Enabled { 647 peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint( 648 peer.Log.Named("gracefulexit:endpoint"), 649 signing.SignerFromFullIdentity(peer.Identity), 650 peer.DB.GracefulExit(), 651 peer.Overlay.DB, 652 peer.Overlay.Service, 653 peer.Reputation.Service, 654 peer.Metainfo.Metabase, 655 peer.Orders.Service, 656 peer.DB.PeerIdentities(), 657 config.GracefulExit) 658 659 if err := pb.DRPCRegisterSatelliteGracefulExit(peer.Server.DRPC(), peer.GracefulExit.Endpoint); err != nil { 660 return nil, errs.Combine(err, peer.Close()) 661 } 662 } else { 663 peer.Log.Named("gracefulexit").Info("disabled") 664 } 665 } 666 667 return peer, nil 668} 669 670// Run runs satellite until it's either closed or it errors. 671func (peer *API) Run(ctx context.Context) (err error) { 672 defer mon.Task()(&ctx)(&err) 673 674 group, ctx := errgroup.WithContext(ctx) 675 676 peer.Servers.Run(ctx, group) 677 peer.Services.Run(ctx, group) 678 679 return group.Wait() 680} 681 682// Close closes all the resources. 683func (peer *API) Close() error { 684 return errs.Combine( 685 peer.Servers.Close(), 686 peer.Services.Close(), 687 ) 688} 689 690// ID returns the peer ID. 691func (peer *API) ID() storj.NodeID { return peer.Identity.ID } 692 693// Addr returns the public address. 694func (peer *API) Addr() string { 695 return peer.ExternalAddress 696} 697 698// URL returns the storj.NodeURL. 699func (peer *API) URL() storj.NodeURL { 700 return storj.NodeURL{ID: peer.ID(), Address: peer.Addr()} 701} 702 703// PrivateAddr returns the private address. 704func (peer *API) PrivateAddr() string { return peer.Server.PrivateAddr().String() } 705