1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package satellite
5
6import (
7	"context"
8	"errors"
9	"fmt"
10	"net"
11	"net/mail"
12	"net/smtp"
13
14	"github.com/spacemonkeygo/monkit/v3"
15	"github.com/zeebo/errs"
16	"go.uber.org/zap"
17	"golang.org/x/sync/errgroup"
18
19	"storj.io/common/identity"
20	"storj.io/common/pb"
21	"storj.io/common/peertls/extensions"
22	"storj.io/common/peertls/tlsopts"
23	"storj.io/common/rpc"
24	"storj.io/common/signing"
25	"storj.io/common/storj"
26	"storj.io/private/debug"
27	"storj.io/private/version"
28	"storj.io/storj/private/lifecycle"
29	"storj.io/storj/private/post"
30	"storj.io/storj/private/post/oauth2"
31	"storj.io/storj/private/server"
32	"storj.io/storj/private/version/checker"
33	"storj.io/storj/satellite/accounting"
34	"storj.io/storj/satellite/analytics"
35	"storj.io/storj/satellite/buckets"
36	"storj.io/storj/satellite/console"
37	"storj.io/storj/satellite/console/consoleauth"
38	"storj.io/storj/satellite/console/consoleweb"
39	"storj.io/storj/satellite/contact"
40	"storj.io/storj/satellite/gracefulexit"
41	"storj.io/storj/satellite/inspector"
42	"storj.io/storj/satellite/internalpb"
43	"storj.io/storj/satellite/mailservice"
44	"storj.io/storj/satellite/mailservice/simulate"
45	"storj.io/storj/satellite/metabase"
46	"storj.io/storj/satellite/metainfo"
47	"storj.io/storj/satellite/metainfo/piecedeletion"
48	"storj.io/storj/satellite/nodestats"
49	"storj.io/storj/satellite/orders"
50	"storj.io/storj/satellite/overlay"
51	"storj.io/storj/satellite/payments"
52	"storj.io/storj/satellite/payments/paymentsconfig"
53	"storj.io/storj/satellite/payments/stripecoinpayments"
54	"storj.io/storj/satellite/reputation"
55	"storj.io/storj/satellite/rewards"
56	"storj.io/storj/satellite/snopayouts"
57)
58
59// API is the satellite API process.
60//
61// architecture: Peer
62type API struct {
63	Log      *zap.Logger
64	Identity *identity.FullIdentity
65	DB       DB
66
67	Servers  *lifecycle.Group
68	Services *lifecycle.Group
69
70	Dialer          rpc.Dialer
71	Server          *server.Server
72	ExternalAddress string
73
74	Version struct {
75		Chore   *checker.Chore
76		Service *checker.Service
77	}
78
79	Debug struct {
80		Listener net.Listener
81		Server   *debug.Server
82	}
83
84	Contact struct {
85		Service  *contact.Service
86		Endpoint *contact.Endpoint
87	}
88
89	Overlay struct {
90		DB      overlay.DB
91		Service *overlay.Service
92	}
93
94	Reputation struct {
95		Service *reputation.Service
96	}
97
98	Orders struct {
99		DB       orders.DB
100		Endpoint *orders.Endpoint
101		Service  *orders.Service
102		Chore    *orders.Chore
103	}
104
105	Metainfo struct {
106		Metabase      *metabase.DB
107		PieceDeletion *piecedeletion.Service
108		Endpoint      *metainfo.Endpoint
109	}
110
111	Inspector struct {
112		Endpoint *inspector.Endpoint
113	}
114
115	Accounting struct {
116		ProjectUsage *accounting.Service
117	}
118
119	LiveAccounting struct {
120		Cache accounting.Cache
121	}
122
123	ProjectLimits struct {
124		Cache *accounting.ProjectLimitCache
125	}
126
127	Mail struct {
128		Service *mailservice.Service
129	}
130
131	Payments struct {
132		Accounts   payments.Accounts
133		Conversion *stripecoinpayments.ConversionService
134		Service    *stripecoinpayments.Service
135		Stripe     stripecoinpayments.StripeClient
136	}
137
138	Console struct {
139		Listener net.Listener
140		Service  *console.Service
141		Endpoint *consoleweb.Server
142	}
143
144	Marketing struct {
145		PartnersService *rewards.PartnersService
146	}
147
148	NodeStats struct {
149		Endpoint *nodestats.Endpoint
150	}
151
152	SNOPayouts struct {
153		Endpoint *snopayouts.Endpoint
154		Service  *snopayouts.Service
155		DB       snopayouts.DB
156	}
157
158	GracefulExit struct {
159		Endpoint *gracefulexit.Endpoint
160	}
161
162	Analytics struct {
163		Service *analytics.Service
164	}
165
166	Buckets struct {
167		Service *buckets.Service
168	}
169}
170
171// NewAPI creates a new satellite API process.
172func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
173	metabaseDB *metabase.DB, revocationDB extensions.RevocationDB,
174	liveAccounting accounting.Cache, rollupsWriteCache *orders.RollupsWriteCache,
175	config *Config, versionInfo version.Info, atomicLogLevel *zap.AtomicLevel) (*API, error) {
176	peer := &API{
177		Log:             log,
178		Identity:        full,
179		DB:              db,
180		ExternalAddress: config.Contact.ExternalAddress,
181
182		Servers:  lifecycle.NewGroup(log.Named("servers")),
183		Services: lifecycle.NewGroup(log.Named("services")),
184	}
185
186	{ // setup buckets service
187		peer.Buckets.Service = buckets.NewService(db.Buckets(), metabaseDB)
188	}
189
190	{ // setup debug
191		var err error
192		if config.Debug.Address != "" {
193			peer.Debug.Listener, err = net.Listen("tcp", config.Debug.Address)
194			if err != nil {
195				withoutStack := errors.New(err.Error())
196				peer.Log.Debug("failed to start debug endpoints", zap.Error(withoutStack))
197			}
198		}
199		debugConfig := config.Debug
200		debugConfig.ControlTitle = "API"
201		peer.Debug.Server = debug.NewServerWithAtomicLevel(log.Named("debug"), peer.Debug.Listener, monkit.Default, debugConfig, atomicLogLevel)
202		peer.Servers.Add(lifecycle.Item{
203			Name:  "debug",
204			Run:   peer.Debug.Server.Run,
205			Close: peer.Debug.Server.Close,
206		})
207	}
208
209	var err error
210
211	{
212		peer.Log.Info("Version info",
213			zap.Stringer("Version", versionInfo.Version.Version),
214			zap.String("Commit Hash", versionInfo.CommitHash),
215			zap.Stringer("Build Timestamp", versionInfo.Timestamp),
216			zap.Bool("Release Build", versionInfo.Release),
217		)
218
219		peer.Version.Service = checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
220		peer.Version.Chore = checker.NewChore(peer.Version.Service, config.Version.CheckInterval)
221
222		peer.Services.Add(lifecycle.Item{
223			Name: "version",
224			Run:  peer.Version.Chore.Run,
225		})
226	}
227
228	{ // setup listener and server
229		sc := config.Server
230
231		tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB)
232		if err != nil {
233			return nil, errs.Combine(err, peer.Close())
234		}
235
236		peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
237
238		peer.Server, err = server.New(log.Named("server"), tlsOptions, sc)
239		if err != nil {
240			return nil, errs.Combine(err, peer.Close())
241		}
242
243		if peer.ExternalAddress == "" {
244			// not ideal, but better than nothing
245			peer.ExternalAddress = peer.Server.Addr().String()
246		}
247
248		peer.Servers.Add(lifecycle.Item{
249			Name: "server",
250			Run: func(ctx context.Context) error {
251				// Don't change the format of this comment, it is used to figure out the node id.
252				peer.Log.Info(fmt.Sprintf("Node %s started", peer.Identity.ID))
253				peer.Log.Info(fmt.Sprintf("Public server started on %s", peer.Addr()))
254				peer.Log.Info(fmt.Sprintf("Private server started on %s", peer.PrivateAddr()))
255				return peer.Server.Run(ctx)
256			},
257			Close: peer.Server.Close,
258		})
259	}
260
261	{ // setup overlay
262		peer.Overlay.DB = peer.DB.OverlayCache()
263
264		peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay)
265		if err != nil {
266			return nil, errs.Combine(err, peer.Close())
267		}
268		peer.Services.Add(lifecycle.Item{
269			Name:  "overlay",
270			Close: peer.Overlay.Service.Close,
271		})
272	}
273
274	{ // setup reputation
275		peer.Reputation.Service = reputation.NewService(peer.Log.Named("reputation"), peer.Overlay.DB, peer.DB.Reputation(), config.Reputation)
276
277		peer.Services.Add(lifecycle.Item{
278			Name:  "reputation",
279			Close: peer.Reputation.Service.Close,
280		})
281	}
282
283	{ // setup contact service
284		pbVersion, err := versionInfo.Proto()
285		if err != nil {
286			return nil, errs.Combine(err, peer.Close())
287		}
288
289		self := &overlay.NodeDossier{
290			Node: pb.Node{
291				Id: peer.ID(),
292				Address: &pb.NodeAddress{
293					Address: peer.Addr(),
294				},
295			},
296			Type:    pb.NodeType_SATELLITE,
297			Version: *pbVersion,
298		}
299		peer.Contact.Service = contact.NewService(peer.Log.Named("contact:service"), self, peer.Overlay.Service, peer.DB.PeerIdentities(), peer.Dialer, config.Contact)
300		peer.Contact.Endpoint = contact.NewEndpoint(peer.Log.Named("contact:endpoint"), peer.Contact.Service)
301		if err := pb.DRPCRegisterNode(peer.Server.DRPC(), peer.Contact.Endpoint); err != nil {
302			return nil, errs.Combine(err, peer.Close())
303		}
304
305		peer.Services.Add(lifecycle.Item{
306			Name:  "contact:service",
307			Close: peer.Contact.Service.Close,
308		})
309	}
310
311	{ // setup live accounting
312		peer.LiveAccounting.Cache = liveAccounting
313	}
314
315	{ // setup project limits
316		peer.ProjectLimits.Cache = accounting.NewProjectLimitCache(peer.DB.ProjectAccounting(),
317			config.Console.Config.UsageLimits.Storage.Free,
318			config.Console.Config.UsageLimits.Bandwidth.Free,
319			1000000, // TODO this will be correctly populated with up coming change
320			config.ProjectLimit,
321		)
322	}
323
324	{ // setup accounting project usage
325		peer.Accounting.ProjectUsage = accounting.NewService(
326			peer.DB.ProjectAccounting(),
327			peer.LiveAccounting.Cache,
328			peer.ProjectLimits.Cache,
329			config.LiveAccounting.BandwidthCacheTTL,
330			config.LiveAccounting.AsOfSystemInterval,
331		)
332	}
333
334	{ // setup orders
335		peer.Orders.DB = rollupsWriteCache
336		peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders)
337		peer.Services.Add(lifecycle.Item{
338			Name:  "orders:chore",
339			Run:   peer.Orders.Chore.Run,
340			Close: peer.Orders.Chore.Close,
341		})
342		peer.Debug.Server.Panel.Add(
343			debug.Cycle("Orders Chore", peer.Orders.Chore.Loop))
344		var err error
345		peer.Orders.Service, err = orders.NewService(
346			peer.Log.Named("orders:service"),
347			signing.SignerFromFullIdentity(peer.Identity),
348			peer.Overlay.Service,
349			peer.Orders.DB,
350			peer.Buckets.Service,
351			config.Orders,
352		)
353		if err != nil {
354			return nil, errs.Combine(err, peer.Close())
355		}
356
357		satelliteSignee := signing.SigneeFromPeerIdentity(peer.Identity.PeerIdentity())
358		peer.Orders.Endpoint = orders.NewEndpoint(
359			peer.Log.Named("orders:endpoint"),
360			satelliteSignee,
361			peer.Orders.DB,
362			peer.DB.NodeAPIVersion(),
363			config.Orders.OrdersSemaphoreSize,
364			peer.Orders.Service,
365		)
366
367		if err := pb.DRPCRegisterOrders(peer.Server.DRPC(), peer.Orders.Endpoint); err != nil {
368			return nil, errs.Combine(err, peer.Close())
369		}
370	}
371
372	{ // setup marketing partners service
373		peer.Marketing.PartnersService = rewards.NewPartnersService(
374			peer.Log.Named("partners"),
375			rewards.DefaultPartnersDB,
376		)
377	}
378
379	{ // setup analytics service
380		peer.Analytics.Service = analytics.NewService(peer.Log.Named("analytics:service"), config.Analytics, config.Console.SatelliteName)
381
382		peer.Services.Add(lifecycle.Item{
383			Name:  "analytics:service",
384			Run:   peer.Analytics.Service.Run,
385			Close: peer.Analytics.Service.Close,
386		})
387	}
388
389	{ // setup metainfo
390		peer.Metainfo.Metabase = metabaseDB
391
392		peer.Metainfo.PieceDeletion, err = piecedeletion.NewService(
393			peer.Log.Named("metainfo:piecedeletion"),
394			peer.Dialer,
395			peer.Overlay.Service,
396			config.Metainfo.PieceDeletion,
397		)
398		if err != nil {
399			return nil, errs.Combine(err, peer.Close())
400		}
401		peer.Services.Add(lifecycle.Item{
402			Name:  "metainfo:piecedeletion",
403			Run:   peer.Metainfo.PieceDeletion.Run,
404			Close: peer.Metainfo.PieceDeletion.Close,
405		})
406
407		peer.Metainfo.Endpoint, err = metainfo.NewEndpoint(
408			peer.Log.Named("metainfo:endpoint"),
409			peer.Buckets.Service,
410			peer.Metainfo.Metabase,
411			peer.Metainfo.PieceDeletion,
412			peer.Orders.Service,
413			peer.Overlay.Service,
414			peer.DB.Attribution(),
415			peer.Marketing.PartnersService,
416			peer.DB.PeerIdentities(),
417			peer.DB.Console().APIKeys(),
418			peer.Accounting.ProjectUsage,
419			peer.DB.Console().Projects(),
420			signing.SignerFromFullIdentity(peer.Identity),
421			peer.DB.Revocation(),
422			config.Metainfo,
423		)
424		if err != nil {
425			return nil, errs.Combine(err, peer.Close())
426		}
427
428		if err := pb.DRPCRegisterMetainfo(peer.Server.DRPC(), peer.Metainfo.Endpoint); err != nil {
429			return nil, errs.Combine(err, peer.Close())
430		}
431
432		peer.Services.Add(lifecycle.Item{
433			Name:  "metainfo:endpoint",
434			Close: peer.Metainfo.Endpoint.Close,
435		})
436	}
437
438	{ // setup inspector
439		peer.Inspector.Endpoint = inspector.NewEndpoint(
440			peer.Log.Named("inspector"),
441			peer.Overlay.Service,
442			peer.Metainfo.Metabase,
443		)
444		if err := internalpb.DRPCRegisterHealthInspector(peer.Server.PrivateDRPC(), peer.Inspector.Endpoint); err != nil {
445			return nil, errs.Combine(err, peer.Close())
446		}
447	}
448
449	{ // setup mailservice
450		// TODO(yar): test multiple satellites using same OAUTH credentials
451		mailConfig := config.Mail
452
453		// validate from mail address
454		from, err := mail.ParseAddress(mailConfig.From)
455		if err != nil {
456			return nil, errs.Combine(err, peer.Close())
457		}
458
459		// validate smtp server address
460		host, _, err := net.SplitHostPort(mailConfig.SMTPServerAddress)
461		if err != nil {
462			return nil, errs.Combine(err, peer.Close())
463		}
464
465		var sender mailservice.Sender
466		switch mailConfig.AuthType {
467		case "oauth2":
468			creds := oauth2.Credentials{
469				ClientID:     mailConfig.ClientID,
470				ClientSecret: mailConfig.ClientSecret,
471				TokenURI:     mailConfig.TokenURI,
472			}
473			token, err := oauth2.RefreshToken(context.TODO(), creds, mailConfig.RefreshToken)
474			if err != nil {
475				return nil, errs.Combine(err, peer.Close())
476			}
477
478			sender = &post.SMTPSender{
479				From: *from,
480				Auth: &oauth2.Auth{
481					UserEmail: from.Address,
482					Storage:   oauth2.NewTokenStore(creds, *token),
483				},
484				ServerAddress: mailConfig.SMTPServerAddress,
485			}
486		case "plain":
487			sender = &post.SMTPSender{
488				From:          *from,
489				Auth:          smtp.PlainAuth("", mailConfig.Login, mailConfig.Password, host),
490				ServerAddress: mailConfig.SMTPServerAddress,
491			}
492		case "login":
493			sender = &post.SMTPSender{
494				From: *from,
495				Auth: post.LoginAuth{
496					Username: mailConfig.Login,
497					Password: mailConfig.Password,
498				},
499				ServerAddress: mailConfig.SMTPServerAddress,
500			}
501		default:
502			sender = simulate.NewDefaultLinkClicker()
503		}
504
505		peer.Mail.Service, err = mailservice.New(
506			peer.Log.Named("mail:service"),
507			sender,
508			mailConfig.TemplatePath,
509		)
510		if err != nil {
511			return nil, errs.Combine(err, peer.Close())
512		}
513
514		peer.Services.Add(lifecycle.Item{
515			Name:  "mail:service",
516			Close: peer.Mail.Service.Close,
517		})
518	}
519
520	{ // setup payments
521		pc := config.Payments
522
523		var stripeClient stripecoinpayments.StripeClient
524		switch pc.Provider {
525		default:
526			stripeClient = stripecoinpayments.NewStripeMock(
527				peer.ID(),
528				peer.DB.StripeCoinPayments().Customers(),
529				peer.DB.Console().Users(),
530			)
531		case "stripecoinpayments":
532			stripeClient = stripecoinpayments.NewStripeClient(log, pc.StripeCoinPayments)
533		}
534
535		peer.Payments.Service, err = stripecoinpayments.NewService(
536			peer.Log.Named("payments.stripe:service"),
537			stripeClient,
538			pc.StripeCoinPayments,
539			peer.DB.StripeCoinPayments(),
540			peer.DB.Console().Projects(),
541			peer.DB.ProjectAccounting(),
542			pc.StorageTBPrice,
543			pc.EgressTBPrice,
544			pc.SegmentPrice,
545			pc.BonusRate)
546
547		if err != nil {
548			return nil, errs.Combine(err, peer.Close())
549		}
550
551		peer.Payments.Stripe = stripeClient
552		peer.Payments.Accounts = peer.Payments.Service.Accounts()
553		peer.Payments.Conversion = stripecoinpayments.NewConversionService(
554			peer.Log.Named("payments.stripe:version"),
555			peer.Payments.Service,
556			pc.StripeCoinPayments.ConversionRatesCycleInterval)
557
558		peer.Services.Add(lifecycle.Item{
559			Name:  "payments.stripe:version",
560			Run:   peer.Payments.Conversion.Run,
561			Close: peer.Payments.Conversion.Close,
562		})
563	}
564
565	{ // setup console
566		consoleConfig := config.Console
567		peer.Console.Listener, err = net.Listen("tcp", consoleConfig.Address)
568		if err != nil {
569			return nil, errs.Combine(err, peer.Close())
570		}
571		if consoleConfig.AuthTokenSecret == "" {
572			return nil, errs.New("Auth token secret required")
573		}
574
575		peer.Console.Service, err = console.NewService(
576			peer.Log.Named("console:service"),
577			&consoleauth.Hmac{Secret: []byte(consoleConfig.AuthTokenSecret)},
578			peer.DB.Console(),
579			peer.DB.ProjectAccounting(),
580			peer.Accounting.ProjectUsage,
581			peer.Buckets.Service,
582			peer.Marketing.PartnersService,
583			peer.Payments.Accounts,
584			peer.Analytics.Service,
585			consoleConfig.Config,
586		)
587		if err != nil {
588			return nil, errs.Combine(err, peer.Close())
589		}
590
591		pricing := paymentsconfig.PricingValues{
592			StorageTBPrice: config.Payments.StorageTBPrice,
593			EgressTBPrice:  config.Payments.EgressTBPrice,
594			SegmentPrice:   config.Payments.SegmentPrice,
595		}
596
597		peer.Console.Endpoint = consoleweb.NewServer(
598			peer.Log.Named("console:endpoint"),
599			consoleConfig,
600			peer.Console.Service,
601			peer.Mail.Service,
602			peer.Marketing.PartnersService,
603			peer.Analytics.Service,
604			peer.Console.Listener,
605			config.Payments.StripeCoinPayments.StripePublicKey,
606			pricing,
607			peer.URL(),
608		)
609
610		peer.Servers.Add(lifecycle.Item{
611			Name:  "console:endpoint",
612			Run:   peer.Console.Endpoint.Run,
613			Close: peer.Console.Endpoint.Close,
614		})
615	}
616
617	{ // setup node stats endpoint
618		peer.NodeStats.Endpoint = nodestats.NewEndpoint(
619			peer.Log.Named("nodestats:endpoint"),
620			peer.Overlay.DB,
621			peer.Reputation.Service,
622			peer.DB.StoragenodeAccounting(),
623			config.Payments,
624		)
625		if err := pb.DRPCRegisterNodeStats(peer.Server.DRPC(), peer.NodeStats.Endpoint); err != nil {
626			return nil, errs.Combine(err, peer.Close())
627		}
628	}
629
630	{ // setup SnoPayout endpoint
631		peer.SNOPayouts.DB = peer.DB.SNOPayouts()
632		peer.SNOPayouts.Service = snopayouts.NewService(
633			peer.Log.Named("payouts:service"),
634			peer.SNOPayouts.DB)
635		peer.SNOPayouts.Endpoint = snopayouts.NewEndpoint(
636			peer.Log.Named("payouts:endpoint"),
637			peer.DB.StoragenodeAccounting(),
638			peer.Overlay.DB,
639			peer.SNOPayouts.Service)
640		if err := pb.DRPCRegisterHeldAmount(peer.Server.DRPC(), peer.SNOPayouts.Endpoint); err != nil {
641			return nil, errs.Combine(err, peer.Close())
642		}
643	}
644
645	{ // setup graceful exit
646		if config.GracefulExit.Enabled {
647			peer.GracefulExit.Endpoint = gracefulexit.NewEndpoint(
648				peer.Log.Named("gracefulexit:endpoint"),
649				signing.SignerFromFullIdentity(peer.Identity),
650				peer.DB.GracefulExit(),
651				peer.Overlay.DB,
652				peer.Overlay.Service,
653				peer.Reputation.Service,
654				peer.Metainfo.Metabase,
655				peer.Orders.Service,
656				peer.DB.PeerIdentities(),
657				config.GracefulExit)
658
659			if err := pb.DRPCRegisterSatelliteGracefulExit(peer.Server.DRPC(), peer.GracefulExit.Endpoint); err != nil {
660				return nil, errs.Combine(err, peer.Close())
661			}
662		} else {
663			peer.Log.Named("gracefulexit").Info("disabled")
664		}
665	}
666
667	return peer, nil
668}
669
670// Run runs satellite until it's either closed or it errors.
671func (peer *API) Run(ctx context.Context) (err error) {
672	defer mon.Task()(&ctx)(&err)
673
674	group, ctx := errgroup.WithContext(ctx)
675
676	peer.Servers.Run(ctx, group)
677	peer.Services.Run(ctx, group)
678
679	return group.Wait()
680}
681
682// Close closes all the resources.
683func (peer *API) Close() error {
684	return errs.Combine(
685		peer.Servers.Close(),
686		peer.Services.Close(),
687	)
688}
689
690// ID returns the peer ID.
691func (peer *API) ID() storj.NodeID { return peer.Identity.ID }
692
693// Addr returns the public address.
694func (peer *API) Addr() string {
695	return peer.ExternalAddress
696}
697
698// URL returns the storj.NodeURL.
699func (peer *API) URL() storj.NodeURL {
700	return storj.NodeURL{ID: peer.ID(), Address: peer.Addr()}
701}
702
703// PrivateAddr returns the private address.
704func (peer *API) PrivateAddr() string { return peer.Server.PrivateAddr().String() }
705