1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package satellite
5
6import (
7	"context"
8	"errors"
9	"net"
10
11	"github.com/spacemonkeygo/monkit/v3"
12	"github.com/zeebo/errs"
13	"go.uber.org/zap"
14	"golang.org/x/sync/errgroup"
15
16	"storj.io/common/identity"
17	"storj.io/common/peertls/extensions"
18	"storj.io/common/peertls/tlsopts"
19	"storj.io/common/rpc"
20	"storj.io/common/signing"
21	"storj.io/common/storj"
22	"storj.io/private/debug"
23	"storj.io/private/version"
24	"storj.io/storj/private/lifecycle"
25	version_checker "storj.io/storj/private/version/checker"
26	"storj.io/storj/satellite/accounting"
27	"storj.io/storj/satellite/accounting/nodetally"
28	"storj.io/storj/satellite/accounting/projectbwcleanup"
29	"storj.io/storj/satellite/accounting/rollup"
30	"storj.io/storj/satellite/accounting/rolluparchive"
31	"storj.io/storj/satellite/accounting/tally"
32	"storj.io/storj/satellite/audit"
33	"storj.io/storj/satellite/buckets"
34	"storj.io/storj/satellite/gracefulexit"
35	"storj.io/storj/satellite/metabase"
36	"storj.io/storj/satellite/metabase/segmentloop"
37	"storj.io/storj/satellite/metabase/zombiedeletion"
38	"storj.io/storj/satellite/metainfo/expireddeletion"
39	"storj.io/storj/satellite/metrics"
40	"storj.io/storj/satellite/orders"
41	"storj.io/storj/satellite/overlay"
42	"storj.io/storj/satellite/overlay/straynodes"
43	"storj.io/storj/satellite/payments"
44	"storj.io/storj/satellite/payments/stripecoinpayments"
45	"storj.io/storj/satellite/repair/checker"
46	"storj.io/storj/satellite/reputation"
47)
48
49// Core is the satellite core process that runs chores.
50//
51// architecture: Peer
52type Core struct {
53	// core dependencies
54	Log      *zap.Logger
55	Identity *identity.FullIdentity
56	DB       DB
57
58	Servers  *lifecycle.Group
59	Services *lifecycle.Group
60
61	Dialer rpc.Dialer
62
63	Version struct {
64		Chore   *version_checker.Chore
65		Service *version_checker.Service
66	}
67
68	Debug struct {
69		Listener net.Listener
70		Server   *debug.Server
71	}
72
73	// services and endpoints
74	Overlay struct {
75		DB           overlay.DB
76		Service      *overlay.Service
77		DQStrayNodes *straynodes.Chore
78	}
79
80	Metainfo struct {
81		Metabase    *metabase.DB
82		SegmentLoop *segmentloop.Service
83	}
84
85	Orders struct {
86		DB      orders.DB
87		Service *orders.Service
88		Chore   *orders.Chore
89	}
90
91	Reputation struct {
92		Service *reputation.Service
93	}
94
95	Repair struct {
96		Checker *checker.Checker
97	}
98
99	Audit struct {
100		Queues   *audit.Queues
101		Worker   *audit.Worker
102		Chore    *audit.Chore
103		Verifier *audit.Verifier
104		Reporter *audit.Reporter
105	}
106
107	ExpiredDeletion struct {
108		Chore *expireddeletion.Chore
109	}
110
111	ZombieDeletion struct {
112		Chore *zombiedeletion.Chore
113	}
114
115	Accounting struct {
116		Tally                 *tally.Service
117		NodeTally             *nodetally.Service
118		Rollup                *rollup.Service
119		RollupArchiveChore    *rolluparchive.Chore
120		ProjectBWCleanupChore *projectbwcleanup.Chore
121	}
122
123	LiveAccounting struct {
124		Cache accounting.Cache
125	}
126
127	Payments struct {
128		Accounts payments.Accounts
129		Chore    *stripecoinpayments.Chore
130	}
131
132	GracefulExit struct {
133		Chore *gracefulexit.Chore
134	}
135
136	Metrics struct {
137		Chore *metrics.Chore
138	}
139
140	Buckets struct {
141		Service *buckets.Service
142	}
143}
144
145// New creates a new satellite.
146func New(log *zap.Logger, full *identity.FullIdentity, db DB,
147	metabaseDB *metabase.DB, revocationDB extensions.RevocationDB,
148	liveAccounting accounting.Cache, rollupsWriteCache *orders.RollupsWriteCache,
149	versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel) (*Core, error) {
150	peer := &Core{
151		Log:      log,
152		Identity: full,
153		DB:       db,
154
155		Servers:  lifecycle.NewGroup(log.Named("servers")),
156		Services: lifecycle.NewGroup(log.Named("services")),
157	}
158
159	{ // setup buckets service
160		peer.Buckets.Service = buckets.NewService(db.Buckets(), metabaseDB)
161	}
162
163	{ // setup debug
164		var err error
165		if config.Debug.Address != "" {
166			peer.Debug.Listener, err = net.Listen("tcp", config.Debug.Address)
167			if err != nil {
168				withoutStack := errors.New(err.Error())
169				peer.Log.Debug("failed to start debug endpoints", zap.Error(withoutStack))
170			}
171		}
172		debugConfig := config.Debug
173		debugConfig.ControlTitle = "Core"
174		peer.Debug.Server = debug.NewServerWithAtomicLevel(log.Named("debug"), peer.Debug.Listener, monkit.Default, debugConfig, atomicLogLevel)
175		peer.Servers.Add(lifecycle.Item{
176			Name:  "debug",
177			Run:   peer.Debug.Server.Run,
178			Close: peer.Debug.Server.Close,
179		})
180	}
181
182	var err error
183
184	{ // setup version control
185		peer.Log.Info("Version info",
186			zap.Stringer("Version", versionInfo.Version.Version),
187			zap.String("Commit Hash", versionInfo.CommitHash),
188			zap.Stringer("Build Timestamp", versionInfo.Timestamp),
189			zap.Bool("Release Build", versionInfo.Release),
190		)
191		peer.Version.Service = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite")
192		peer.Version.Chore = version_checker.NewChore(peer.Version.Service, config.Version.CheckInterval)
193
194		peer.Services.Add(lifecycle.Item{
195			Name: "version",
196			Run:  peer.Version.Chore.Run,
197		})
198	}
199
200	{ // setup listener and server
201		sc := config.Server
202
203		tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB)
204		if err != nil {
205			return nil, errs.Combine(err, peer.Close())
206		}
207
208		peer.Dialer = rpc.NewDefaultDialer(tlsOptions)
209	}
210
211	{ // setup overlay
212		peer.Overlay.DB = peer.DB.OverlayCache()
213		peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay)
214		if err != nil {
215			return nil, errs.Combine(err, peer.Close())
216		}
217		peer.Services.Add(lifecycle.Item{
218			Name:  "overlay",
219			Close: peer.Overlay.Service.Close,
220		})
221
222		if config.StrayNodes.EnableDQ {
223			peer.Overlay.DQStrayNodes = straynodes.NewChore(peer.Log.Named("overlay:dq-stray-nodes"), peer.Overlay.DB, config.StrayNodes)
224			peer.Services.Add(lifecycle.Item{
225				Name:  "overlay:dq-stray-nodes",
226				Run:   peer.Overlay.DQStrayNodes.Run,
227				Close: peer.Overlay.DQStrayNodes.Close,
228			})
229			peer.Debug.Server.Panel.Add(
230				debug.Cycle("Overlay DQ Stray Nodes", peer.Overlay.DQStrayNodes.Loop))
231		}
232	}
233
234	{ // setup live accounting
235		peer.LiveAccounting.Cache = liveAccounting
236	}
237
238	{ // setup orders
239		peer.Orders.DB = rollupsWriteCache
240		peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders)
241		peer.Services.Add(lifecycle.Item{
242			Name:  "orders:chore",
243			Run:   peer.Orders.Chore.Run,
244			Close: peer.Orders.Chore.Close,
245		})
246		var err error
247		peer.Orders.Service, err = orders.NewService(
248			peer.Log.Named("orders:service"),
249			signing.SignerFromFullIdentity(peer.Identity),
250			peer.Overlay.Service,
251			peer.Orders.DB,
252			peer.Buckets.Service,
253			config.Orders,
254		)
255		if err != nil {
256			return nil, errs.Combine(err, peer.Close())
257		}
258	}
259
260	{ // setup metainfo
261		peer.Metainfo.Metabase = metabaseDB
262
263		peer.Metainfo.SegmentLoop = segmentloop.New(
264			peer.Log.Named("metainfo:segmentloop"),
265			config.Metainfo.SegmentLoop,
266			peer.Metainfo.Metabase,
267		)
268		peer.Services.Add(lifecycle.Item{
269			Name:  "metainfo:segmentloop",
270			Run:   peer.Metainfo.SegmentLoop.Run,
271			Close: peer.Metainfo.SegmentLoop.Close,
272		})
273	}
274
275	{ // setup datarepair
276		// TODO: simplify argument list somehow
277		peer.Repair.Checker = checker.NewChecker(
278			peer.Log.Named("repair:checker"),
279			peer.DB.RepairQueue(),
280			peer.Metainfo.Metabase,
281			peer.Metainfo.SegmentLoop,
282			peer.Overlay.Service,
283			config.Checker)
284		peer.Services.Add(lifecycle.Item{
285			Name:  "repair:checker",
286			Run:   peer.Repair.Checker.Run,
287			Close: peer.Repair.Checker.Close,
288		})
289		peer.Debug.Server.Panel.Add(
290			debug.Cycle("Repair Checker", peer.Repair.Checker.Loop))
291	}
292
293	{ // setup reputation
294		peer.Reputation.Service = reputation.NewService(log.Named("reputation:service"),
295			peer.Overlay.DB,
296			peer.DB.Reputation(),
297			config.Reputation,
298		)
299		peer.Services.Add(lifecycle.Item{
300			Name:  "reputation",
301			Close: peer.Reputation.Service.Close,
302		})
303	}
304
305	{ // setup audit
306		config := config.Audit
307
308		peer.Audit.Queues = audit.NewQueues()
309
310		peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"),
311			peer.Metainfo.Metabase,
312			peer.Dialer,
313			peer.Overlay.Service,
314			peer.DB.Containment(),
315			peer.Orders.Service,
316			peer.Identity,
317			config.MinBytesPerSecond,
318			config.MinDownloadTimeout,
319		)
320
321		peer.Audit.Reporter = audit.NewReporter(log.Named("audit:reporter"),
322			peer.Reputation.Service,
323			peer.DB.Containment(),
324			config.MaxRetriesStatDB,
325			int32(config.MaxReverifyCount),
326		)
327
328		peer.Audit.Worker, err = audit.NewWorker(peer.Log.Named("audit:worker"),
329			peer.Audit.Queues,
330			peer.Audit.Verifier,
331			peer.Audit.Reporter,
332			config,
333		)
334		peer.Services.Add(lifecycle.Item{
335			Name:  "audit:worker",
336			Run:   peer.Audit.Worker.Run,
337			Close: peer.Audit.Worker.Close,
338		})
339		peer.Debug.Server.Panel.Add(
340			debug.Cycle("Audit Worker", peer.Audit.Worker.Loop))
341
342		if err != nil {
343			return nil, errs.Combine(err, peer.Close())
344		}
345
346		peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"),
347			peer.Audit.Queues,
348			peer.Metainfo.SegmentLoop,
349			config,
350		)
351		peer.Services.Add(lifecycle.Item{
352			Name:  "audit:chore",
353			Run:   peer.Audit.Chore.Run,
354			Close: peer.Audit.Chore.Close,
355		})
356		peer.Debug.Server.Panel.Add(
357			debug.Cycle("Audit Chore", peer.Audit.Chore.Loop))
358	}
359
360	{ // setup expired segment cleanup
361		peer.ExpiredDeletion.Chore = expireddeletion.NewChore(
362			peer.Log.Named("core-expired-deletion"),
363			config.ExpiredDeletion,
364			peer.Metainfo.Metabase,
365		)
366		peer.Services.Add(lifecycle.Item{
367			Name:  "expireddeletion:chore",
368			Run:   peer.ExpiredDeletion.Chore.Run,
369			Close: peer.ExpiredDeletion.Chore.Close,
370		})
371		peer.Debug.Server.Panel.Add(
372			debug.Cycle("Expired Segments Chore", peer.ExpiredDeletion.Chore.Loop))
373	}
374
375	{ // setup zombie objects cleanup
376		peer.ZombieDeletion.Chore = zombiedeletion.NewChore(
377			peer.Log.Named("core-zombie-deletion"),
378			config.ZombieDeletion,
379			peer.Metainfo.Metabase,
380		)
381		peer.Services.Add(lifecycle.Item{
382			Name:  "zombiedeletion:chore",
383			Run:   peer.ZombieDeletion.Chore.Run,
384			Close: peer.ZombieDeletion.Chore.Close,
385		})
386		peer.Debug.Server.Panel.Add(
387			debug.Cycle("Zombie Objects Chore", peer.ZombieDeletion.Chore.Loop))
388	}
389
390	{ // setup accounting
391		peer.Accounting.Tally = tally.New(peer.Log.Named("accounting:tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Metabase, config.Tally)
392		peer.Services.Add(lifecycle.Item{
393			Name:  "accounting:tally",
394			Run:   peer.Accounting.Tally.Run,
395			Close: peer.Accounting.Tally.Close,
396		})
397		peer.Debug.Server.Panel.Add(
398			debug.Cycle("Accounting Tally", peer.Accounting.Tally.Loop))
399
400		// storage nodes tally
401		peer.Accounting.NodeTally = nodetally.New(peer.Log.Named("accounting:nodetally"), peer.DB.StoragenodeAccounting(), peer.Metainfo.SegmentLoop, config.Tally.Interval)
402		peer.Services.Add(lifecycle.Item{
403			Name:  "accounting:nodetally",
404			Run:   peer.Accounting.NodeTally.Run,
405			Close: peer.Accounting.NodeTally.Close,
406		})
407
408		// Lets add 1 more day so we catch any off by one errors when deleting tallies
409		orderExpirationPlusDay := config.Orders.Expiration + config.Rollup.Interval
410		peer.Accounting.Rollup = rollup.New(peer.Log.Named("accounting:rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies, orderExpirationPlusDay)
411		peer.Services.Add(lifecycle.Item{
412			Name:  "accounting:rollup",
413			Run:   peer.Accounting.Rollup.Run,
414			Close: peer.Accounting.Rollup.Close,
415		})
416		peer.Debug.Server.Panel.Add(
417			debug.Cycle("Accounting Rollup", peer.Accounting.Rollup.Loop))
418
419		peer.Accounting.ProjectBWCleanupChore = projectbwcleanup.NewChore(peer.Log.Named("accounting:chore"), peer.DB.ProjectAccounting(), config.ProjectBWCleanup)
420		peer.Services.Add(lifecycle.Item{
421			Name:  "accounting:project-bw-rollup",
422			Run:   peer.Accounting.ProjectBWCleanupChore.Run,
423			Close: peer.Accounting.ProjectBWCleanupChore.Close,
424		})
425		peer.Debug.Server.Panel.Add(
426			debug.Cycle("Accounting Project Bandwidth Rollup", peer.Accounting.ProjectBWCleanupChore.Loop))
427
428		if config.RollupArchive.Enabled {
429			peer.Accounting.RollupArchiveChore = rolluparchive.New(peer.Log.Named("accounting:rollup-archive"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), config.RollupArchive)
430			peer.Services.Add(lifecycle.Item{
431				Name:  "accounting:rollup-archive",
432				Run:   peer.Accounting.RollupArchiveChore.Run,
433				Close: peer.Accounting.RollupArchiveChore.Close,
434			})
435			peer.Debug.Server.Panel.Add(
436				debug.Cycle("Accounting Rollup Archive", peer.Accounting.RollupArchiveChore.Loop))
437		} else {
438			peer.Log.Named("rolluparchive").Info("disabled")
439		}
440	}
441
442	// TODO: remove in future, should be in API
443	{ // setup payments
444		pc := config.Payments
445
446		var stripeClient stripecoinpayments.StripeClient
447		switch pc.Provider {
448		default:
449			stripeClient = stripecoinpayments.NewStripeMock(
450				peer.ID(),
451				peer.DB.StripeCoinPayments().Customers(),
452				peer.DB.Console().Users(),
453			)
454		case "stripecoinpayments":
455			stripeClient = stripecoinpayments.NewStripeClient(log, pc.StripeCoinPayments)
456		}
457
458		service, err := stripecoinpayments.NewService(
459			peer.Log.Named("payments.stripe:service"),
460			stripeClient,
461			pc.StripeCoinPayments,
462			peer.DB.StripeCoinPayments(),
463			peer.DB.Console().Projects(),
464			peer.DB.ProjectAccounting(),
465			pc.StorageTBPrice,
466			pc.EgressTBPrice,
467			pc.SegmentPrice,
468			pc.BonusRate)
469		if err != nil {
470			return nil, errs.Combine(err, peer.Close())
471		}
472
473		peer.Payments.Accounts = service.Accounts()
474
475		peer.Payments.Chore = stripecoinpayments.NewChore(
476			peer.Log.Named("payments.stripe:clearing"),
477			service,
478			pc.StripeCoinPayments.TransactionUpdateInterval,
479			pc.StripeCoinPayments.AccountBalanceUpdateInterval,
480		)
481		peer.Services.Add(lifecycle.Item{
482			Name: "payments.stripe:service",
483			Run:  peer.Payments.Chore.Run,
484		})
485		peer.Debug.Server.Panel.Add(
486			debug.Cycle("Payments Stripe Transactions", peer.Payments.Chore.TransactionCycle),
487			debug.Cycle("Payments Stripe Account Balance", peer.Payments.Chore.AccountBalanceCycle),
488		)
489	}
490
491	{ // setup graceful exit
492		if config.GracefulExit.Enabled {
493			peer.GracefulExit.Chore = gracefulexit.NewChore(peer.Log.Named("gracefulexit"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.SegmentLoop, config.GracefulExit)
494			peer.Services.Add(lifecycle.Item{
495				Name:  "gracefulexit",
496				Run:   peer.GracefulExit.Chore.Run,
497				Close: peer.GracefulExit.Chore.Close,
498			})
499			peer.Debug.Server.Panel.Add(
500				debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop))
501		} else {
502			peer.Log.Named("gracefulexit").Info("disabled")
503		}
504	}
505
506	{ // setup metrics service
507		peer.Metrics.Chore = metrics.NewChore(
508			peer.Log.Named("metrics"),
509			config.Metrics,
510			peer.Metainfo.SegmentLoop,
511		)
512		peer.Services.Add(lifecycle.Item{
513			Name:  "metrics",
514			Run:   peer.Metrics.Chore.Run,
515			Close: peer.Metrics.Chore.Close,
516		})
517		peer.Debug.Server.Panel.Add(
518			debug.Cycle("Metrics", peer.Metrics.Chore.Loop))
519	}
520
521	return peer, nil
522}
523
524// Run runs satellite until it's either closed or it errors.
525func (peer *Core) Run(ctx context.Context) (err error) {
526	defer mon.Task()(&ctx)(&err)
527
528	group, ctx := errgroup.WithContext(ctx)
529
530	peer.Servers.Run(ctx, group)
531	peer.Services.Run(ctx, group)
532
533	return group.Wait()
534}
535
536// Close closes all the resources.
537func (peer *Core) Close() error {
538	return errs.Combine(
539		peer.Servers.Close(),
540		peer.Services.Close(),
541	)
542}
543
544// ID returns the peer ID.
545func (peer *Core) ID() storj.NodeID { return peer.Identity.ID }
546