1// Copyright (C) 2020 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package storagenodedb 5 6import ( 7 "context" 8 "database/sql" 9 "errors" 10 11 "github.com/zeebo/errs" 12 13 "storj.io/common/storj" 14 "storj.io/storj/storagenode/payouts" 15) 16 17// ensures that payoutDB implements payouts.DB interface. 18var _ payouts.DB = (*payoutDB)(nil) 19 20// ErrPayout represents errors from the payouts database. 21var ErrPayout = errs.Class("payouts") 22 23// HeldAmountDBName represents the database name. 24const HeldAmountDBName = "heldamount" 25 26// payoutDB works with node payouts DB. 27type payoutDB struct { 28 dbContainerImpl 29} 30 31// StorePayStub inserts or updates paystub data into the db. 32func (db *payoutDB) StorePayStub(ctx context.Context, paystub payouts.PayStub) (err error) { 33 defer mon.Task()(&ctx)(&err) 34 35 query := `INSERT OR REPLACE INTO paystubs ( 36 period, 37 satellite_id, 38 created_at, 39 codes, 40 usage_at_rest, 41 usage_get, 42 usage_put, 43 usage_get_repair, 44 usage_put_repair, 45 usage_get_audit, 46 comp_at_rest, 47 comp_get, 48 comp_put, 49 comp_get_repair, 50 comp_put_repair, 51 comp_get_audit, 52 surge_percent, 53 held, 54 owed, 55 disposed, 56 paid, 57 distributed 58 ) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)` 59 60 _, err = db.ExecContext(ctx, query, 61 paystub.Period, 62 paystub.SatelliteID, 63 paystub.Created, 64 paystub.Codes, 65 paystub.UsageAtRest, 66 paystub.UsageGet, 67 paystub.UsagePut, 68 paystub.UsageGetRepair, 69 paystub.UsagePutRepair, 70 paystub.UsageGetAudit, 71 paystub.CompAtRest, 72 paystub.CompGet, 73 paystub.CompPut, 74 paystub.CompGetRepair, 75 paystub.CompPutRepair, 76 paystub.CompGetAudit, 77 paystub.SurgePercent, 78 paystub.Held, 79 paystub.Owed, 80 paystub.Disposed, 81 paystub.Paid, 82 paystub.Distributed, 83 ) 84 85 return ErrPayout.Wrap(err) 86} 87 88// GetPayStub retrieves paystub data for a specific satellite and period. 89func (db *payoutDB) GetPayStub(ctx context.Context, satelliteID storj.NodeID, period string) (_ *payouts.PayStub, err error) { 90 defer mon.Task()(&ctx)(&err) 91 92 result := payouts.PayStub{ 93 SatelliteID: satelliteID, 94 Period: period, 95 } 96 97 rowStub := db.QueryRowContext(ctx, 98 `SELECT created_at, 99 codes, 100 usage_at_rest, 101 usage_get, 102 usage_put, 103 usage_get_repair, 104 usage_put_repair, 105 usage_get_audit, 106 comp_at_rest, 107 comp_get, 108 comp_put, 109 comp_get_repair, 110 comp_put_repair, 111 comp_get_audit, 112 surge_percent, 113 held, 114 owed, 115 disposed, 116 paid, 117 distributed 118 FROM paystubs WHERE satellite_id = ? AND period = ?`, 119 satelliteID, period, 120 ) 121 122 err = rowStub.Scan( 123 &result.Created, 124 &result.Codes, 125 &result.UsageAtRest, 126 &result.UsageGet, 127 &result.UsagePut, 128 &result.UsageGetRepair, 129 &result.UsagePutRepair, 130 &result.UsageGetAudit, 131 &result.CompAtRest, 132 &result.CompGet, 133 &result.CompPut, 134 &result.CompGetRepair, 135 &result.CompPutRepair, 136 &result.CompGetAudit, 137 &result.SurgePercent, 138 &result.Held, 139 &result.Owed, 140 &result.Disposed, 141 &result.Paid, 142 &result.Distributed, 143 ) 144 if err != nil { 145 if errors.Is(err, sql.ErrNoRows) { 146 return nil, payouts.ErrNoPayStubForPeriod.Wrap(err) 147 } 148 return nil, ErrPayout.Wrap(err) 149 } 150 151 return &result, nil 152} 153 154// AllPayStubs retrieves all paystub stats from DB for specific period. 155func (db *payoutDB) AllPayStubs(ctx context.Context, period string) (_ []payouts.PayStub, err error) { 156 defer mon.Task()(&ctx)(&err) 157 158 query := `SELECT 159 satellite_id, 160 created_at, 161 codes, 162 usage_at_rest, 163 usage_get, 164 usage_put, 165 usage_get_repair, 166 usage_put_repair, 167 usage_get_audit, 168 comp_at_rest, 169 comp_get, 170 comp_put, 171 comp_get_repair, 172 comp_put_repair, 173 comp_get_audit, 174 surge_percent, 175 held, 176 owed, 177 disposed, 178 paid, 179 distributed 180 FROM paystubs WHERE period = ?` 181 182 rows, err := db.QueryContext(ctx, query, period) 183 if err != nil { 184 return nil, err 185 } 186 187 defer func() { err = errs.Combine(err, rows.Close()) }() 188 189 var paystubList []payouts.PayStub 190 for rows.Next() { 191 var paystub payouts.PayStub 192 paystub.Period = period 193 194 err := rows.Scan(&paystub.SatelliteID, 195 &paystub.Created, 196 &paystub.Codes, 197 &paystub.UsageAtRest, 198 &paystub.UsageGet, 199 &paystub.UsagePut, 200 &paystub.UsageGetRepair, 201 &paystub.UsagePutRepair, 202 &paystub.UsageGetAudit, 203 &paystub.CompAtRest, 204 &paystub.CompGet, 205 &paystub.CompPut, 206 &paystub.CompGetRepair, 207 &paystub.CompPutRepair, 208 &paystub.CompGetAudit, 209 &paystub.SurgePercent, 210 &paystub.Held, 211 &paystub.Owed, 212 &paystub.Disposed, 213 &paystub.Paid, 214 &paystub.Distributed, 215 ) 216 if err != nil { 217 return nil, ErrPayout.Wrap(err) 218 } 219 220 paystubList = append(paystubList, paystub) 221 } 222 if err = rows.Err(); err != nil { 223 return nil, ErrPayout.Wrap(err) 224 } 225 226 return paystubList, nil 227} 228 229// SatellitesHeldbackHistory retrieves heldback history for specific satellite. 230func (db *payoutDB) SatellitesHeldbackHistory(ctx context.Context, id storj.NodeID) (_ []payouts.HeldForPeriod, err error) { 231 defer mon.Task()(&ctx)(&err) 232 233 query := `SELECT 234 period, 235 held 236 FROM paystubs WHERE satellite_id = ? ORDER BY period ASC` 237 238 rows, err := db.QueryContext(ctx, query, id) 239 if err != nil { 240 return nil, err 241 } 242 243 defer func() { err = errs.Combine(err, rows.Close()) }() 244 245 var heldback []payouts.HeldForPeriod 246 for rows.Next() { 247 var held payouts.HeldForPeriod 248 249 err := rows.Scan(&held.Period, &held.Amount) 250 if err != nil { 251 return nil, ErrPayout.Wrap(err) 252 } 253 254 heldback = append(heldback, held) 255 } 256 if err = rows.Err(); err != nil { 257 return nil, ErrPayout.Wrap(err) 258 } 259 260 return heldback, nil 261} 262 263// SatellitePeriods retrieves all periods for concrete satellite in which we have some payouts data. 264func (db *payoutDB) SatellitePeriods(ctx context.Context, satelliteID storj.NodeID) (_ []string, err error) { 265 defer mon.Task()(&ctx)(&err) 266 267 query := `SELECT distinct period FROM paystubs WHERE satellite_id = ? ORDER BY created_at` 268 269 rows, err := db.QueryContext(ctx, query, satelliteID[:]) 270 if err != nil { 271 return nil, ErrPayout.Wrap(err) 272 } 273 274 defer func() { err = errs.Combine(err, rows.Close()) }() 275 276 var periodList []string 277 for rows.Next() { 278 var period string 279 err := rows.Scan(&period) 280 if err != nil { 281 return nil, ErrPayout.Wrap(err) 282 } 283 284 periodList = append(periodList, period) 285 } 286 if err = rows.Err(); err != nil { 287 return nil, ErrPayout.Wrap(err) 288 } 289 290 return periodList, nil 291} 292 293// AllPeriods retrieves all periods in which we have some payouts data. 294func (db *payoutDB) AllPeriods(ctx context.Context) (_ []string, err error) { 295 defer mon.Task()(&ctx)(&err) 296 297 query := `SELECT distinct period FROM paystubs ORDER BY created_at` 298 299 rows, err := db.QueryContext(ctx, query) 300 if err != nil { 301 return nil, err 302 } 303 304 defer func() { err = errs.Combine(err, rows.Close()) }() 305 306 var periodList []string 307 for rows.Next() { 308 var period string 309 err := rows.Scan(&period) 310 if err != nil { 311 return nil, ErrPayout.Wrap(err) 312 } 313 314 periodList = append(periodList, period) 315 } 316 if err = rows.Err(); err != nil { 317 return nil, ErrPayout.Wrap(err) 318 } 319 320 return periodList, nil 321} 322 323// StorePayment inserts or updates payment data into the db. 324func (db *payoutDB) StorePayment(ctx context.Context, payment payouts.Payment) (err error) { 325 defer mon.Task()(&ctx)(&err) 326 327 query := `INSERT OR REPLACE INTO payments ( 328 id, 329 created_at, 330 satellite_id, 331 period, 332 amount, 333 receipt, 334 notes 335 ) VALUES(?,?,?,?,?,?,?)` 336 337 _, err = db.ExecContext(ctx, query, 338 payment.ID, 339 payment.Created, 340 payment.SatelliteID, 341 payment.Period, 342 payment.Amount, 343 payment.Receipt, 344 payment.Notes, 345 ) 346 347 return ErrPayout.Wrap(err) 348} 349 350// SatellitesDisposedHistory returns all disposed amount for specific satellite from DB. 351func (db *payoutDB) SatellitesDisposedHistory(ctx context.Context, satelliteID storj.NodeID) (_ int64, err error) { 352 defer mon.Task()(&ctx)(&err) 353 354 query := `SELECT 355 disposed 356 FROM paystubs WHERE satellite_id = ? ORDER BY period ASC` 357 358 rows, err := db.QueryContext(ctx, query, satelliteID) 359 if err != nil { 360 return 0, err 361 } 362 363 defer func() { err = errs.Combine(err, rows.Close()) }() 364 365 var totalDisposed int64 366 for rows.Next() { 367 var disposed int64 368 369 err := rows.Scan(&disposed) 370 if err != nil { 371 return 0, ErrPayout.Wrap(err) 372 } 373 374 totalDisposed += disposed 375 } 376 if err = rows.Err(); err != nil { 377 return 0, ErrPayout.Wrap(err) 378 } 379 380 return totalDisposed, nil 381} 382 383// GetReceipt retrieves receipt data for a specific satellite and period. 384func (db *payoutDB) GetReceipt(ctx context.Context, satelliteID storj.NodeID, period string) (receipt string, err error) { 385 defer mon.Task()(&ctx)(&err) 386 387 rowPayment := db.QueryRowContext(ctx, 388 `SELECT receipt FROM payments WHERE satellite_id = ? AND period = ?`, 389 satelliteID, period, 390 ) 391 392 err = rowPayment.Scan(&receipt) 393 if err != nil { 394 if errors.Is(err, sql.ErrNoRows) { 395 return "", payouts.ErrNoPayStubForPeriod.Wrap(err) 396 } 397 return "", ErrPayout.Wrap(err) 398 } 399 400 return receipt, nil 401} 402 403// GetTotalEarned returns total earned value for node from all paystubs. 404func (db *payoutDB) GetTotalEarned(ctx context.Context) (_ int64, err error) { 405 defer mon.Task()(&ctx)(&err) 406 407 query := `SELECT comp_at_rest, comp_get, comp_get_repair, comp_get_audit FROM paystubs` 408 409 rows, err := db.QueryContext(ctx, query) 410 if err != nil { 411 return 0, err 412 } 413 414 defer func() { err = errs.Combine(err, rows.Close()) }() 415 416 var totalEarned int64 417 for rows.Next() { 418 var compAtRest, compGet, compGetRepair, compGetAudit int64 419 420 err := rows.Scan(&compAtRest, &compGet, &compGetRepair, &compGetAudit) 421 if err != nil { 422 return 0, ErrPayout.Wrap(err) 423 } 424 425 totalEarned += compGetAudit + compGet + compGetRepair + compAtRest 426 } 427 if err = rows.Err(); err != nil { 428 return 0, ErrPayout.Wrap(err) 429 } 430 431 return totalEarned, nil 432} 433 434// GetEarnedAtSatellite returns total earned value for node from specific satellite. 435func (db *payoutDB) GetEarnedAtSatellite(ctx context.Context, id storj.NodeID) (_ int64, err error) { 436 defer mon.Task()(&ctx)(&err) 437 438 query := `SELECT comp_at_rest, comp_get, comp_get_repair, comp_get_audit FROM paystubs WHERE satellite_id = ?` 439 440 rows, err := db.QueryContext(ctx, query, id) 441 if err != nil { 442 return 0, err 443 } 444 445 defer func() { err = errs.Combine(err, rows.Close()) }() 446 447 var totalEarned int64 448 for rows.Next() { 449 var compAtRest, compGet, compGetRepair, compGetAudit int64 450 451 err := rows.Scan(&compAtRest, &compGet, &compGetRepair, &compGetAudit) 452 if err != nil { 453 return 0, ErrPayout.Wrap(err) 454 } 455 456 totalEarned += compGetAudit + compGet + compGetRepair + compAtRest 457 } 458 459 if err = rows.Err(); err != nil { 460 return 0, ErrPayout.Wrap(err) 461 } 462 463 return totalEarned, nil 464} 465 466// GetPayingSatellitesIDs returns list of satellite ID's that ever paid to storagenode. 467func (db *payoutDB) GetPayingSatellitesIDs(ctx context.Context) (_ []storj.NodeID, err error) { 468 defer mon.Task()(&ctx)(&err) 469 470 query := `SELECT DISTINCT (satellite_id) FROM paystubs` 471 472 rows, err := db.QueryContext(ctx, query) 473 if err != nil { 474 return nil, err 475 } 476 477 defer func() { err = errs.Combine(err, rows.Close()) }() 478 479 var satelliteIDs []storj.NodeID 480 for rows.Next() { 481 var satelliteID storj.NodeID 482 483 err := rows.Scan(&satelliteID) 484 if err != nil { 485 if errors.Is(err, sql.ErrNoRows) { 486 return []storj.NodeID{}, nil 487 } 488 489 return nil, ErrPayout.Wrap(err) 490 } 491 492 satelliteIDs = append(satelliteIDs, satelliteID) 493 } 494 if err = rows.Err(); err != nil { 495 return nil, ErrPayout.Wrap(err) 496 } 497 498 return satelliteIDs, nil 499} 500 501// GetSatelliteSummary returns satellite all time paid and held amounts. 502func (db *payoutDB) GetSatelliteSummary(ctx context.Context, satelliteID storj.NodeID) (_, _ int64, err error) { 503 defer mon.Task()(&ctx)(&err) 504 505 query := `SELECT paid, held FROM paystubs WHERE satellite_id = ?` 506 507 rows, err := db.QueryContext(ctx, query, satelliteID) 508 if err != nil { 509 return 0, 0, err 510 } 511 512 defer func() { err = errs.Combine(err, rows.Close()) }() 513 514 var paid, held int64 515 for rows.Next() { 516 var paidPeriod, heldPeriod int64 517 518 err := rows.Scan(&paidPeriod, &heldPeriod) 519 if err != nil { 520 if errors.Is(err, sql.ErrNoRows) { 521 return 0, 0, nil 522 } 523 524 return 0, 0, ErrPayout.Wrap(err) 525 } 526 527 paid += paidPeriod 528 held += heldPeriod 529 } 530 if err = rows.Err(); err != nil { 531 return 0, 0, ErrPayout.Wrap(err) 532 } 533 534 return paid, held, nil 535} 536 537// GetSatellitePeriodSummary returns satellite paid and held amounts for specific period. 538func (db *payoutDB) GetSatellitePeriodSummary(ctx context.Context, satelliteID storj.NodeID, period string) (_, _ int64, err error) { 539 defer mon.Task()(&ctx)(&err) 540 541 query := `SELECT paid, held FROM paystubs WHERE satellite_id = ? AND period = ?` 542 543 rows, err := db.QueryContext(ctx, query, satelliteID, period) 544 if err != nil { 545 return 0, 0, err 546 } 547 548 defer func() { err = errs.Combine(err, rows.Close()) }() 549 550 var paid, held int64 551 for rows.Next() { 552 err := rows.Scan(&paid, &held) 553 if err != nil { 554 if errors.Is(err, sql.ErrNoRows) { 555 return 0, 0, nil 556 } 557 558 return 0, 0, ErrPayout.Wrap(err) 559 } 560 } 561 if err = rows.Err(); err != nil { 562 return 0, 0, ErrPayout.Wrap(err) 563 } 564 565 return paid, held, nil 566} 567 568// GetUndistributed returns total undistributed amount. 569func (db *payoutDB) GetUndistributed(ctx context.Context) (_ int64, err error) { 570 defer mon.Task()(&ctx)(&err) 571 572 var distributed, paid int64 573 574 rowPayment := db.QueryRowContext(ctx, 575 `SELECT COALESCE(SUM(distributed),0), COALESCE(SUM(paid), 0) FROM paystubs`) 576 577 err = rowPayment.Scan(&distributed, &paid) 578 if err != nil { 579 if errors.Is(err, sql.ErrNoRows) { 580 return 0, payouts.ErrNoPayStubForPeriod.Wrap(err) 581 } 582 583 return 0, ErrPayout.Wrap(err) 584 } 585 586 return paid - distributed, nil 587} 588 589// GetSatellitePaystubs returns summed paystubs for specific satellite. 590func (db *payoutDB) GetSatellitePaystubs(ctx context.Context, satelliteID storj.NodeID) (_ *payouts.PayStub, err error) { 591 defer mon.Task()(&ctx)(&err) 592 593 rowPayment := db.QueryRowContext(ctx, 594 `SELECT COALESCE(SUM(usage_at_rest),0), COALESCE(SUM(usage_get),0), COALESCE(SUM(usage_get_repair),0), COALESCE(SUM(usage_get_audit),0), 595 COALESCE(SUM(comp_at_rest),0), COALESCE(SUM(comp_get),0), COALESCE(SUM(comp_get_repair),0), COALESCE(SUM(comp_get_audit),0), 596 COALESCE(SUM(held),0), COALESCE(SUM(paid),0), COALESCE(SUM(distributed),0), COALESCE(SUM(disposed),0) from paystubs WHERE satellite_id = $1`, satelliteID) 597 598 var paystub payouts.PayStub 599 600 err = rowPayment.Scan( 601 &paystub.UsageAtRest, 602 &paystub.UsageGet, 603 &paystub.UsageGetRepair, 604 &paystub.UsageGetAudit, 605 &paystub.CompAtRest, 606 &paystub.CompGet, 607 &paystub.CompGetRepair, 608 &paystub.CompGetAudit, 609 &paystub.Held, 610 &paystub.Paid, 611 &paystub.Distributed, 612 &paystub.Disposed, 613 ) 614 if err != nil { 615 return &payouts.PayStub{}, ErrPayout.Wrap(err) 616 } 617 618 return &paystub, nil 619} 620 621// GetPaystubs returns summed all paystubs. 622func (db *payoutDB) GetPaystubs(ctx context.Context) (_ *payouts.PayStub, err error) { 623 defer mon.Task()(&ctx)(&err) 624 625 rowPayment := db.QueryRowContext(ctx, 626 `SELECT COALESCE(SUM(usage_at_rest),0), COALESCE(SUM(usage_get),0), COALESCE(SUM(usage_get_repair),0), COALESCE(SUM(usage_get_audit),0), 627 COALESCE(SUM(comp_at_rest),0), COALESCE(SUM(comp_get),0), COALESCE(SUM(comp_get_repair),0), COALESCE(SUM(comp_get_audit),0), 628 COALESCE(SUM(held),0), COALESCE(SUM(paid),0), COALESCE(SUM(distributed),0), COALESCE(SUM(disposed),0) from paystubs`) 629 630 var paystub payouts.PayStub 631 632 err = rowPayment.Scan( 633 &paystub.UsageAtRest, 634 &paystub.UsageGet, 635 &paystub.UsageGetRepair, 636 &paystub.UsageGetAudit, 637 &paystub.CompAtRest, 638 &paystub.CompGet, 639 &paystub.CompGetRepair, 640 &paystub.CompGetAudit, 641 &paystub.Held, 642 &paystub.Paid, 643 &paystub.Distributed, 644 &paystub.Disposed, 645 ) 646 if err != nil { 647 return &payouts.PayStub{}, ErrPayout.Wrap(err) 648 } 649 650 return &paystub, nil 651} 652 653// GetPeriodPaystubs returns all satellites paystubs for specific period. 654func (db *payoutDB) GetPeriodPaystubs(ctx context.Context, period string) (_ *payouts.PayStub, err error) { 655 defer mon.Task()(&ctx)(&err) 656 657 rowPayment := db.QueryRowContext(ctx, 658 `SELECT COALESCE(SUM(usage_at_rest),0), COALESCE(SUM(usage_get),0), COALESCE(SUM(usage_get_repair),0), COALESCE(SUM(usage_get_audit),0), 659 COALESCE(SUM(comp_at_rest),0), COALESCE(SUM(comp_get),0), COALESCE(SUM(comp_get_repair),0), COALESCE(SUM(comp_get_audit),0), 660 COALESCE(SUM(held),0), COALESCE(SUM(paid),0), COALESCE(SUM(distributed),0), COALESCE(SUM(disposed),0) from paystubs WHERE period = $1`, period) 661 662 var paystub payouts.PayStub 663 664 err = rowPayment.Scan( 665 &paystub.UsageAtRest, 666 &paystub.UsageGet, 667 &paystub.UsageGetRepair, 668 &paystub.UsageGetAudit, 669 &paystub.CompAtRest, 670 &paystub.CompGet, 671 &paystub.CompGetRepair, 672 &paystub.CompGetAudit, 673 &paystub.Held, 674 &paystub.Paid, 675 &paystub.Distributed, 676 &paystub.Disposed, 677 ) 678 if err != nil { 679 return &payouts.PayStub{}, ErrPayout.Wrap(err) 680 } 681 682 return &paystub, nil 683} 684 685// GetSatellitePeriodPaystubs returns summed satellite paystubs for specific period. 686func (db *payoutDB) GetSatellitePeriodPaystubs(ctx context.Context, period string, satelliteID storj.NodeID) (_ *payouts.PayStub, err error) { 687 defer mon.Task()(&ctx)(&err) 688 689 rowPayment := db.QueryRowContext(ctx, 690 `SELECT COALESCE(SUM(usage_at_rest),0), COALESCE(SUM(usage_get),0), COALESCE(SUM(usage_get_repair),0), COALESCE(SUM(usage_get_audit),0), 691 COALESCE(SUM(comp_at_rest),0), COALESCE(SUM(comp_get),0), COALESCE(SUM(comp_get_repair),0), COALESCE(SUM(comp_get_audit),0), 692 COALESCE(SUM(held),0), COALESCE(SUM(paid),0), COALESCE(SUM(distributed),0), COALESCE(SUM(disposed),0) from paystubs WHERE period = $1 AND satellite_id = $2`, period, satelliteID) 693 694 var paystub payouts.PayStub 695 696 err = rowPayment.Scan( 697 &paystub.UsageAtRest, 698 &paystub.UsageGet, 699 &paystub.UsageGetRepair, 700 &paystub.UsageGetAudit, 701 &paystub.CompAtRest, 702 &paystub.CompGet, 703 &paystub.CompGetRepair, 704 &paystub.CompGetAudit, 705 &paystub.Held, 706 &paystub.Paid, 707 &paystub.Distributed, 708 &paystub.Disposed, 709 ) 710 if err != nil { 711 return &payouts.PayStub{}, ErrPayout.Wrap(err) 712 } 713 714 return &paystub, nil 715} 716 717// HeldAmountHistory retrieves held amount history for all satellites. 718func (db *payoutDB) HeldAmountHistory(ctx context.Context) (_ []payouts.HeldAmountHistory, err error) { 719 defer mon.Task()(&ctx)(&err) 720 721 query := ` 722 SELECT 723 satellite_id, 724 period, 725 held 726 FROM paystubs 727 ORDER BY satellite_id, period ASC` 728 729 rows, err := db.QueryContext(ctx, query) 730 if err != nil { 731 return nil, err 732 } 733 defer func() { 734 err = errs.Combine(err, rows.Close()) 735 }() 736 737 cache := make(map[storj.NodeID]payouts.HeldAmountHistory) 738 739 for rows.Next() { 740 var idBytes []byte 741 var held payouts.HeldForPeriod 742 743 err := rows.Scan(&idBytes, &held.Period, &held.Amount) 744 if err != nil { 745 return nil, ErrPayout.Wrap(err) 746 } 747 748 satelliteID, err := storj.NodeIDFromBytes(idBytes) 749 if err != nil { 750 return nil, ErrPayout.Wrap(err) 751 } 752 753 satelliteHeldHistory := cache[satelliteID] 754 satelliteHeldHistory.HeldAmounts = append(satelliteHeldHistory.HeldAmounts, held) 755 cache[satelliteID] = satelliteHeldHistory 756 } 757 if err = rows.Err(); err != nil { 758 return nil, ErrPayout.Wrap(err) 759 } 760 761 var heldHistories []payouts.HeldAmountHistory 762 for satelliteID, heldHistory := range cache { 763 heldHistory.SatelliteID = satelliteID 764 heldHistories = append(heldHistories, heldHistory) 765 } 766 767 return heldHistories, nil 768} 769