1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package satellite 5 6import ( 7 "context" 8 "errors" 9 "net" 10 11 "github.com/spacemonkeygo/monkit/v3" 12 "github.com/zeebo/errs" 13 "go.uber.org/zap" 14 "golang.org/x/sync/errgroup" 15 16 "storj.io/common/identity" 17 "storj.io/common/peertls/extensions" 18 "storj.io/common/peertls/tlsopts" 19 "storj.io/common/rpc" 20 "storj.io/common/signing" 21 "storj.io/common/storj" 22 "storj.io/private/debug" 23 "storj.io/private/version" 24 "storj.io/storj/private/lifecycle" 25 version_checker "storj.io/storj/private/version/checker" 26 "storj.io/storj/satellite/accounting" 27 "storj.io/storj/satellite/accounting/nodetally" 28 "storj.io/storj/satellite/accounting/projectbwcleanup" 29 "storj.io/storj/satellite/accounting/rollup" 30 "storj.io/storj/satellite/accounting/rolluparchive" 31 "storj.io/storj/satellite/accounting/tally" 32 "storj.io/storj/satellite/audit" 33 "storj.io/storj/satellite/buckets" 34 "storj.io/storj/satellite/gracefulexit" 35 "storj.io/storj/satellite/metabase" 36 "storj.io/storj/satellite/metabase/segmentloop" 37 "storj.io/storj/satellite/metabase/zombiedeletion" 38 "storj.io/storj/satellite/metainfo/expireddeletion" 39 "storj.io/storj/satellite/metrics" 40 "storj.io/storj/satellite/orders" 41 "storj.io/storj/satellite/overlay" 42 "storj.io/storj/satellite/overlay/straynodes" 43 "storj.io/storj/satellite/payments" 44 "storj.io/storj/satellite/payments/stripecoinpayments" 45 "storj.io/storj/satellite/repair/checker" 46 "storj.io/storj/satellite/reputation" 47) 48 49// Core is the satellite core process that runs chores. 50// 51// architecture: Peer 52type Core struct { 53 // core dependencies 54 Log *zap.Logger 55 Identity *identity.FullIdentity 56 DB DB 57 58 Servers *lifecycle.Group 59 Services *lifecycle.Group 60 61 Dialer rpc.Dialer 62 63 Version struct { 64 Chore *version_checker.Chore 65 Service *version_checker.Service 66 } 67 68 Debug struct { 69 Listener net.Listener 70 Server *debug.Server 71 } 72 73 // services and endpoints 74 Overlay struct { 75 DB overlay.DB 76 Service *overlay.Service 77 DQStrayNodes *straynodes.Chore 78 } 79 80 Metainfo struct { 81 Metabase *metabase.DB 82 SegmentLoop *segmentloop.Service 83 } 84 85 Orders struct { 86 DB orders.DB 87 Service *orders.Service 88 Chore *orders.Chore 89 } 90 91 Reputation struct { 92 Service *reputation.Service 93 } 94 95 Repair struct { 96 Checker *checker.Checker 97 } 98 99 Audit struct { 100 Queues *audit.Queues 101 Worker *audit.Worker 102 Chore *audit.Chore 103 Verifier *audit.Verifier 104 Reporter *audit.Reporter 105 } 106 107 ExpiredDeletion struct { 108 Chore *expireddeletion.Chore 109 } 110 111 ZombieDeletion struct { 112 Chore *zombiedeletion.Chore 113 } 114 115 Accounting struct { 116 Tally *tally.Service 117 NodeTally *nodetally.Service 118 Rollup *rollup.Service 119 RollupArchiveChore *rolluparchive.Chore 120 ProjectBWCleanupChore *projectbwcleanup.Chore 121 } 122 123 LiveAccounting struct { 124 Cache accounting.Cache 125 } 126 127 Payments struct { 128 Accounts payments.Accounts 129 Chore *stripecoinpayments.Chore 130 } 131 132 GracefulExit struct { 133 Chore *gracefulexit.Chore 134 } 135 136 Metrics struct { 137 Chore *metrics.Chore 138 } 139 140 Buckets struct { 141 Service *buckets.Service 142 } 143} 144 145// New creates a new satellite. 146func New(log *zap.Logger, full *identity.FullIdentity, db DB, 147 metabaseDB *metabase.DB, revocationDB extensions.RevocationDB, 148 liveAccounting accounting.Cache, rollupsWriteCache *orders.RollupsWriteCache, 149 versionInfo version.Info, config *Config, atomicLogLevel *zap.AtomicLevel) (*Core, error) { 150 peer := &Core{ 151 Log: log, 152 Identity: full, 153 DB: db, 154 155 Servers: lifecycle.NewGroup(log.Named("servers")), 156 Services: lifecycle.NewGroup(log.Named("services")), 157 } 158 159 { // setup buckets service 160 peer.Buckets.Service = buckets.NewService(db.Buckets(), metabaseDB) 161 } 162 163 { // setup debug 164 var err error 165 if config.Debug.Address != "" { 166 peer.Debug.Listener, err = net.Listen("tcp", config.Debug.Address) 167 if err != nil { 168 withoutStack := errors.New(err.Error()) 169 peer.Log.Debug("failed to start debug endpoints", zap.Error(withoutStack)) 170 } 171 } 172 debugConfig := config.Debug 173 debugConfig.ControlTitle = "Core" 174 peer.Debug.Server = debug.NewServerWithAtomicLevel(log.Named("debug"), peer.Debug.Listener, monkit.Default, debugConfig, atomicLogLevel) 175 peer.Servers.Add(lifecycle.Item{ 176 Name: "debug", 177 Run: peer.Debug.Server.Run, 178 Close: peer.Debug.Server.Close, 179 }) 180 } 181 182 var err error 183 184 { // setup version control 185 peer.Log.Info("Version info", 186 zap.Stringer("Version", versionInfo.Version.Version), 187 zap.String("Commit Hash", versionInfo.CommitHash), 188 zap.Stringer("Build Timestamp", versionInfo.Timestamp), 189 zap.Bool("Release Build", versionInfo.Release), 190 ) 191 peer.Version.Service = version_checker.NewService(log.Named("version"), config.Version, versionInfo, "Satellite") 192 peer.Version.Chore = version_checker.NewChore(peer.Version.Service, config.Version.CheckInterval) 193 194 peer.Services.Add(lifecycle.Item{ 195 Name: "version", 196 Run: peer.Version.Chore.Run, 197 }) 198 } 199 200 { // setup listener and server 201 sc := config.Server 202 203 tlsOptions, err := tlsopts.NewOptions(peer.Identity, sc.Config, revocationDB) 204 if err != nil { 205 return nil, errs.Combine(err, peer.Close()) 206 } 207 208 peer.Dialer = rpc.NewDefaultDialer(tlsOptions) 209 } 210 211 { // setup overlay 212 peer.Overlay.DB = peer.DB.OverlayCache() 213 peer.Overlay.Service, err = overlay.NewService(peer.Log.Named("overlay"), peer.Overlay.DB, config.Overlay) 214 if err != nil { 215 return nil, errs.Combine(err, peer.Close()) 216 } 217 peer.Services.Add(lifecycle.Item{ 218 Name: "overlay", 219 Close: peer.Overlay.Service.Close, 220 }) 221 222 if config.StrayNodes.EnableDQ { 223 peer.Overlay.DQStrayNodes = straynodes.NewChore(peer.Log.Named("overlay:dq-stray-nodes"), peer.Overlay.DB, config.StrayNodes) 224 peer.Services.Add(lifecycle.Item{ 225 Name: "overlay:dq-stray-nodes", 226 Run: peer.Overlay.DQStrayNodes.Run, 227 Close: peer.Overlay.DQStrayNodes.Close, 228 }) 229 peer.Debug.Server.Panel.Add( 230 debug.Cycle("Overlay DQ Stray Nodes", peer.Overlay.DQStrayNodes.Loop)) 231 } 232 } 233 234 { // setup live accounting 235 peer.LiveAccounting.Cache = liveAccounting 236 } 237 238 { // setup orders 239 peer.Orders.DB = rollupsWriteCache 240 peer.Orders.Chore = orders.NewChore(log.Named("orders:chore"), rollupsWriteCache, config.Orders) 241 peer.Services.Add(lifecycle.Item{ 242 Name: "orders:chore", 243 Run: peer.Orders.Chore.Run, 244 Close: peer.Orders.Chore.Close, 245 }) 246 var err error 247 peer.Orders.Service, err = orders.NewService( 248 peer.Log.Named("orders:service"), 249 signing.SignerFromFullIdentity(peer.Identity), 250 peer.Overlay.Service, 251 peer.Orders.DB, 252 peer.Buckets.Service, 253 config.Orders, 254 ) 255 if err != nil { 256 return nil, errs.Combine(err, peer.Close()) 257 } 258 } 259 260 { // setup metainfo 261 peer.Metainfo.Metabase = metabaseDB 262 263 peer.Metainfo.SegmentLoop = segmentloop.New( 264 peer.Log.Named("metainfo:segmentloop"), 265 config.Metainfo.SegmentLoop, 266 peer.Metainfo.Metabase, 267 ) 268 peer.Services.Add(lifecycle.Item{ 269 Name: "metainfo:segmentloop", 270 Run: peer.Metainfo.SegmentLoop.Run, 271 Close: peer.Metainfo.SegmentLoop.Close, 272 }) 273 } 274 275 { // setup datarepair 276 // TODO: simplify argument list somehow 277 peer.Repair.Checker = checker.NewChecker( 278 peer.Log.Named("repair:checker"), 279 peer.DB.RepairQueue(), 280 peer.Metainfo.Metabase, 281 peer.Metainfo.SegmentLoop, 282 peer.Overlay.Service, 283 config.Checker) 284 peer.Services.Add(lifecycle.Item{ 285 Name: "repair:checker", 286 Run: peer.Repair.Checker.Run, 287 Close: peer.Repair.Checker.Close, 288 }) 289 peer.Debug.Server.Panel.Add( 290 debug.Cycle("Repair Checker", peer.Repair.Checker.Loop)) 291 } 292 293 { // setup reputation 294 peer.Reputation.Service = reputation.NewService(log.Named("reputation:service"), 295 peer.Overlay.DB, 296 peer.DB.Reputation(), 297 config.Reputation, 298 ) 299 peer.Services.Add(lifecycle.Item{ 300 Name: "reputation", 301 Close: peer.Reputation.Service.Close, 302 }) 303 } 304 305 { // setup audit 306 config := config.Audit 307 308 peer.Audit.Queues = audit.NewQueues() 309 310 peer.Audit.Verifier = audit.NewVerifier(log.Named("audit:verifier"), 311 peer.Metainfo.Metabase, 312 peer.Dialer, 313 peer.Overlay.Service, 314 peer.DB.Containment(), 315 peer.Orders.Service, 316 peer.Identity, 317 config.MinBytesPerSecond, 318 config.MinDownloadTimeout, 319 ) 320 321 peer.Audit.Reporter = audit.NewReporter(log.Named("audit:reporter"), 322 peer.Reputation.Service, 323 peer.DB.Containment(), 324 config.MaxRetriesStatDB, 325 int32(config.MaxReverifyCount), 326 ) 327 328 peer.Audit.Worker, err = audit.NewWorker(peer.Log.Named("audit:worker"), 329 peer.Audit.Queues, 330 peer.Audit.Verifier, 331 peer.Audit.Reporter, 332 config, 333 ) 334 peer.Services.Add(lifecycle.Item{ 335 Name: "audit:worker", 336 Run: peer.Audit.Worker.Run, 337 Close: peer.Audit.Worker.Close, 338 }) 339 peer.Debug.Server.Panel.Add( 340 debug.Cycle("Audit Worker", peer.Audit.Worker.Loop)) 341 342 if err != nil { 343 return nil, errs.Combine(err, peer.Close()) 344 } 345 346 peer.Audit.Chore = audit.NewChore(peer.Log.Named("audit:chore"), 347 peer.Audit.Queues, 348 peer.Metainfo.SegmentLoop, 349 config, 350 ) 351 peer.Services.Add(lifecycle.Item{ 352 Name: "audit:chore", 353 Run: peer.Audit.Chore.Run, 354 Close: peer.Audit.Chore.Close, 355 }) 356 peer.Debug.Server.Panel.Add( 357 debug.Cycle("Audit Chore", peer.Audit.Chore.Loop)) 358 } 359 360 { // setup expired segment cleanup 361 peer.ExpiredDeletion.Chore = expireddeletion.NewChore( 362 peer.Log.Named("core-expired-deletion"), 363 config.ExpiredDeletion, 364 peer.Metainfo.Metabase, 365 ) 366 peer.Services.Add(lifecycle.Item{ 367 Name: "expireddeletion:chore", 368 Run: peer.ExpiredDeletion.Chore.Run, 369 Close: peer.ExpiredDeletion.Chore.Close, 370 }) 371 peer.Debug.Server.Panel.Add( 372 debug.Cycle("Expired Segments Chore", peer.ExpiredDeletion.Chore.Loop)) 373 } 374 375 { // setup zombie objects cleanup 376 peer.ZombieDeletion.Chore = zombiedeletion.NewChore( 377 peer.Log.Named("core-zombie-deletion"), 378 config.ZombieDeletion, 379 peer.Metainfo.Metabase, 380 ) 381 peer.Services.Add(lifecycle.Item{ 382 Name: "zombiedeletion:chore", 383 Run: peer.ZombieDeletion.Chore.Run, 384 Close: peer.ZombieDeletion.Chore.Close, 385 }) 386 peer.Debug.Server.Panel.Add( 387 debug.Cycle("Zombie Objects Chore", peer.ZombieDeletion.Chore.Loop)) 388 } 389 390 { // setup accounting 391 peer.Accounting.Tally = tally.New(peer.Log.Named("accounting:tally"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), peer.LiveAccounting.Cache, peer.Metainfo.Metabase, config.Tally) 392 peer.Services.Add(lifecycle.Item{ 393 Name: "accounting:tally", 394 Run: peer.Accounting.Tally.Run, 395 Close: peer.Accounting.Tally.Close, 396 }) 397 peer.Debug.Server.Panel.Add( 398 debug.Cycle("Accounting Tally", peer.Accounting.Tally.Loop)) 399 400 // storage nodes tally 401 peer.Accounting.NodeTally = nodetally.New(peer.Log.Named("accounting:nodetally"), peer.DB.StoragenodeAccounting(), peer.Metainfo.SegmentLoop, config.Tally.Interval) 402 peer.Services.Add(lifecycle.Item{ 403 Name: "accounting:nodetally", 404 Run: peer.Accounting.NodeTally.Run, 405 Close: peer.Accounting.NodeTally.Close, 406 }) 407 408 // Lets add 1 more day so we catch any off by one errors when deleting tallies 409 orderExpirationPlusDay := config.Orders.Expiration + config.Rollup.Interval 410 peer.Accounting.Rollup = rollup.New(peer.Log.Named("accounting:rollup"), peer.DB.StoragenodeAccounting(), config.Rollup.Interval, config.Rollup.DeleteTallies, orderExpirationPlusDay) 411 peer.Services.Add(lifecycle.Item{ 412 Name: "accounting:rollup", 413 Run: peer.Accounting.Rollup.Run, 414 Close: peer.Accounting.Rollup.Close, 415 }) 416 peer.Debug.Server.Panel.Add( 417 debug.Cycle("Accounting Rollup", peer.Accounting.Rollup.Loop)) 418 419 peer.Accounting.ProjectBWCleanupChore = projectbwcleanup.NewChore(peer.Log.Named("accounting:chore"), peer.DB.ProjectAccounting(), config.ProjectBWCleanup) 420 peer.Services.Add(lifecycle.Item{ 421 Name: "accounting:project-bw-rollup", 422 Run: peer.Accounting.ProjectBWCleanupChore.Run, 423 Close: peer.Accounting.ProjectBWCleanupChore.Close, 424 }) 425 peer.Debug.Server.Panel.Add( 426 debug.Cycle("Accounting Project Bandwidth Rollup", peer.Accounting.ProjectBWCleanupChore.Loop)) 427 428 if config.RollupArchive.Enabled { 429 peer.Accounting.RollupArchiveChore = rolluparchive.New(peer.Log.Named("accounting:rollup-archive"), peer.DB.StoragenodeAccounting(), peer.DB.ProjectAccounting(), config.RollupArchive) 430 peer.Services.Add(lifecycle.Item{ 431 Name: "accounting:rollup-archive", 432 Run: peer.Accounting.RollupArchiveChore.Run, 433 Close: peer.Accounting.RollupArchiveChore.Close, 434 }) 435 peer.Debug.Server.Panel.Add( 436 debug.Cycle("Accounting Rollup Archive", peer.Accounting.RollupArchiveChore.Loop)) 437 } else { 438 peer.Log.Named("rolluparchive").Info("disabled") 439 } 440 } 441 442 // TODO: remove in future, should be in API 443 { // setup payments 444 pc := config.Payments 445 446 var stripeClient stripecoinpayments.StripeClient 447 switch pc.Provider { 448 default: 449 stripeClient = stripecoinpayments.NewStripeMock( 450 peer.ID(), 451 peer.DB.StripeCoinPayments().Customers(), 452 peer.DB.Console().Users(), 453 ) 454 case "stripecoinpayments": 455 stripeClient = stripecoinpayments.NewStripeClient(log, pc.StripeCoinPayments) 456 } 457 458 service, err := stripecoinpayments.NewService( 459 peer.Log.Named("payments.stripe:service"), 460 stripeClient, 461 pc.StripeCoinPayments, 462 peer.DB.StripeCoinPayments(), 463 peer.DB.Console().Projects(), 464 peer.DB.ProjectAccounting(), 465 pc.StorageTBPrice, 466 pc.EgressTBPrice, 467 pc.SegmentPrice, 468 pc.BonusRate) 469 if err != nil { 470 return nil, errs.Combine(err, peer.Close()) 471 } 472 473 peer.Payments.Accounts = service.Accounts() 474 475 peer.Payments.Chore = stripecoinpayments.NewChore( 476 peer.Log.Named("payments.stripe:clearing"), 477 service, 478 pc.StripeCoinPayments.TransactionUpdateInterval, 479 pc.StripeCoinPayments.AccountBalanceUpdateInterval, 480 ) 481 peer.Services.Add(lifecycle.Item{ 482 Name: "payments.stripe:service", 483 Run: peer.Payments.Chore.Run, 484 }) 485 peer.Debug.Server.Panel.Add( 486 debug.Cycle("Payments Stripe Transactions", peer.Payments.Chore.TransactionCycle), 487 debug.Cycle("Payments Stripe Account Balance", peer.Payments.Chore.AccountBalanceCycle), 488 ) 489 } 490 491 { // setup graceful exit 492 if config.GracefulExit.Enabled { 493 peer.GracefulExit.Chore = gracefulexit.NewChore(peer.Log.Named("gracefulexit"), peer.DB.GracefulExit(), peer.Overlay.DB, peer.Metainfo.SegmentLoop, config.GracefulExit) 494 peer.Services.Add(lifecycle.Item{ 495 Name: "gracefulexit", 496 Run: peer.GracefulExit.Chore.Run, 497 Close: peer.GracefulExit.Chore.Close, 498 }) 499 peer.Debug.Server.Panel.Add( 500 debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop)) 501 } else { 502 peer.Log.Named("gracefulexit").Info("disabled") 503 } 504 } 505 506 { // setup metrics service 507 peer.Metrics.Chore = metrics.NewChore( 508 peer.Log.Named("metrics"), 509 config.Metrics, 510 peer.Metainfo.SegmentLoop, 511 ) 512 peer.Services.Add(lifecycle.Item{ 513 Name: "metrics", 514 Run: peer.Metrics.Chore.Run, 515 Close: peer.Metrics.Chore.Close, 516 }) 517 peer.Debug.Server.Panel.Add( 518 debug.Cycle("Metrics", peer.Metrics.Chore.Loop)) 519 } 520 521 return peer, nil 522} 523 524// Run runs satellite until it's either closed or it errors. 525func (peer *Core) Run(ctx context.Context) (err error) { 526 defer mon.Task()(&ctx)(&err) 527 528 group, ctx := errgroup.WithContext(ctx) 529 530 peer.Servers.Run(ctx, group) 531 peer.Services.Run(ctx, group) 532 533 return group.Wait() 534} 535 536// Close closes all the resources. 537func (peer *Core) Close() error { 538 return errs.Combine( 539 peer.Servers.Close(), 540 peer.Services.Close(), 541 ) 542} 543 544// ID returns the peer ID. 545func (peer *Core) ID() storj.NodeID { return peer.Identity.ID } 546