1// Copyright (C) 2020 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package storagenodedb
5
6import (
7	"context"
8	"database/sql"
9	"errors"
10
11	"github.com/zeebo/errs"
12
13	"storj.io/common/storj"
14	"storj.io/storj/storagenode/payouts"
15)
16
17// ensures that payoutDB implements payouts.DB interface.
18var _ payouts.DB = (*payoutDB)(nil)
19
20// ErrPayout represents errors from the payouts database.
21var ErrPayout = errs.Class("payouts")
22
23// HeldAmountDBName represents the database name.
24const HeldAmountDBName = "heldamount"
25
26// payoutDB works with node payouts DB.
27type payoutDB struct {
28	dbContainerImpl
29}
30
31// StorePayStub inserts or updates paystub data into the db.
32func (db *payoutDB) StorePayStub(ctx context.Context, paystub payouts.PayStub) (err error) {
33	defer mon.Task()(&ctx)(&err)
34
35	query := `INSERT OR REPLACE INTO paystubs (
36			period,
37			satellite_id,
38			created_at,
39			codes,
40			usage_at_rest,
41			usage_get,
42			usage_put,
43			usage_get_repair,
44			usage_put_repair,
45			usage_get_audit,
46			comp_at_rest,
47			comp_get,
48			comp_put,
49			comp_get_repair,
50			comp_put_repair,
51			comp_get_audit,
52			surge_percent,
53			held,
54			owed,
55			disposed,
56			paid,
57			distributed
58		) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`
59
60	_, err = db.ExecContext(ctx, query,
61		paystub.Period,
62		paystub.SatelliteID,
63		paystub.Created,
64		paystub.Codes,
65		paystub.UsageAtRest,
66		paystub.UsageGet,
67		paystub.UsagePut,
68		paystub.UsageGetRepair,
69		paystub.UsagePutRepair,
70		paystub.UsageGetAudit,
71		paystub.CompAtRest,
72		paystub.CompGet,
73		paystub.CompPut,
74		paystub.CompGetRepair,
75		paystub.CompPutRepair,
76		paystub.CompGetAudit,
77		paystub.SurgePercent,
78		paystub.Held,
79		paystub.Owed,
80		paystub.Disposed,
81		paystub.Paid,
82		paystub.Distributed,
83	)
84
85	return ErrPayout.Wrap(err)
86}
87
88// GetPayStub retrieves paystub data for a specific satellite and period.
89func (db *payoutDB) GetPayStub(ctx context.Context, satelliteID storj.NodeID, period string) (_ *payouts.PayStub, err error) {
90	defer mon.Task()(&ctx)(&err)
91
92	result := payouts.PayStub{
93		SatelliteID: satelliteID,
94		Period:      period,
95	}
96
97	rowStub := db.QueryRowContext(ctx,
98		`SELECT created_at,
99			codes,
100			usage_at_rest,
101			usage_get,
102			usage_put,
103			usage_get_repair,
104			usage_put_repair,
105			usage_get_audit,
106			comp_at_rest,
107			comp_get,
108			comp_put,
109			comp_get_repair,
110			comp_put_repair,
111			comp_get_audit,
112			surge_percent,
113			held,
114			owed,
115			disposed,
116			paid,
117			distributed
118		FROM paystubs WHERE satellite_id = ? AND period = ?`,
119		satelliteID, period,
120	)
121
122	err = rowStub.Scan(
123		&result.Created,
124		&result.Codes,
125		&result.UsageAtRest,
126		&result.UsageGet,
127		&result.UsagePut,
128		&result.UsageGetRepair,
129		&result.UsagePutRepair,
130		&result.UsageGetAudit,
131		&result.CompAtRest,
132		&result.CompGet,
133		&result.CompPut,
134		&result.CompGetRepair,
135		&result.CompPutRepair,
136		&result.CompGetAudit,
137		&result.SurgePercent,
138		&result.Held,
139		&result.Owed,
140		&result.Disposed,
141		&result.Paid,
142		&result.Distributed,
143	)
144	if err != nil {
145		if errors.Is(err, sql.ErrNoRows) {
146			return nil, payouts.ErrNoPayStubForPeriod.Wrap(err)
147		}
148		return nil, ErrPayout.Wrap(err)
149	}
150
151	return &result, nil
152}
153
154// AllPayStubs retrieves all paystub stats from DB for specific period.
155func (db *payoutDB) AllPayStubs(ctx context.Context, period string) (_ []payouts.PayStub, err error) {
156	defer mon.Task()(&ctx)(&err)
157
158	query := `SELECT
159			satellite_id,
160			created_at,
161			codes,
162			usage_at_rest,
163			usage_get,
164			usage_put,
165			usage_get_repair,
166			usage_put_repair,
167			usage_get_audit,
168			comp_at_rest,
169			comp_get,
170			comp_put,
171			comp_get_repair,
172			comp_put_repair,
173			comp_get_audit,
174			surge_percent,
175			held,
176			owed,
177			disposed,
178			paid,
179			distributed
180		FROM paystubs WHERE period = ?`
181
182	rows, err := db.QueryContext(ctx, query, period)
183	if err != nil {
184		return nil, err
185	}
186
187	defer func() { err = errs.Combine(err, rows.Close()) }()
188
189	var paystubList []payouts.PayStub
190	for rows.Next() {
191		var paystub payouts.PayStub
192		paystub.Period = period
193
194		err := rows.Scan(&paystub.SatelliteID,
195			&paystub.Created,
196			&paystub.Codes,
197			&paystub.UsageAtRest,
198			&paystub.UsageGet,
199			&paystub.UsagePut,
200			&paystub.UsageGetRepair,
201			&paystub.UsagePutRepair,
202			&paystub.UsageGetAudit,
203			&paystub.CompAtRest,
204			&paystub.CompGet,
205			&paystub.CompPut,
206			&paystub.CompGetRepair,
207			&paystub.CompPutRepair,
208			&paystub.CompGetAudit,
209			&paystub.SurgePercent,
210			&paystub.Held,
211			&paystub.Owed,
212			&paystub.Disposed,
213			&paystub.Paid,
214			&paystub.Distributed,
215		)
216		if err != nil {
217			return nil, ErrPayout.Wrap(err)
218		}
219
220		paystubList = append(paystubList, paystub)
221	}
222	if err = rows.Err(); err != nil {
223		return nil, ErrPayout.Wrap(err)
224	}
225
226	return paystubList, nil
227}
228
229// SatellitesHeldbackHistory retrieves heldback history for specific satellite.
230func (db *payoutDB) SatellitesHeldbackHistory(ctx context.Context, id storj.NodeID) (_ []payouts.HeldForPeriod, err error) {
231	defer mon.Task()(&ctx)(&err)
232
233	query := `SELECT
234			period,
235			held
236		FROM paystubs WHERE satellite_id = ? ORDER BY period ASC`
237
238	rows, err := db.QueryContext(ctx, query, id)
239	if err != nil {
240		return nil, err
241	}
242
243	defer func() { err = errs.Combine(err, rows.Close()) }()
244
245	var heldback []payouts.HeldForPeriod
246	for rows.Next() {
247		var held payouts.HeldForPeriod
248
249		err := rows.Scan(&held.Period, &held.Amount)
250		if err != nil {
251			return nil, ErrPayout.Wrap(err)
252		}
253
254		heldback = append(heldback, held)
255	}
256	if err = rows.Err(); err != nil {
257		return nil, ErrPayout.Wrap(err)
258	}
259
260	return heldback, nil
261}
262
263// SatellitePeriods retrieves all periods for concrete satellite in which we have some payouts data.
264func (db *payoutDB) SatellitePeriods(ctx context.Context, satelliteID storj.NodeID) (_ []string, err error) {
265	defer mon.Task()(&ctx)(&err)
266
267	query := `SELECT distinct period FROM paystubs WHERE satellite_id = ? ORDER BY created_at`
268
269	rows, err := db.QueryContext(ctx, query, satelliteID[:])
270	if err != nil {
271		return nil, ErrPayout.Wrap(err)
272	}
273
274	defer func() { err = errs.Combine(err, rows.Close()) }()
275
276	var periodList []string
277	for rows.Next() {
278		var period string
279		err := rows.Scan(&period)
280		if err != nil {
281			return nil, ErrPayout.Wrap(err)
282		}
283
284		periodList = append(periodList, period)
285	}
286	if err = rows.Err(); err != nil {
287		return nil, ErrPayout.Wrap(err)
288	}
289
290	return periodList, nil
291}
292
293// AllPeriods retrieves all periods in which we have some payouts data.
294func (db *payoutDB) AllPeriods(ctx context.Context) (_ []string, err error) {
295	defer mon.Task()(&ctx)(&err)
296
297	query := `SELECT distinct period FROM paystubs ORDER BY created_at`
298
299	rows, err := db.QueryContext(ctx, query)
300	if err != nil {
301		return nil, err
302	}
303
304	defer func() { err = errs.Combine(err, rows.Close()) }()
305
306	var periodList []string
307	for rows.Next() {
308		var period string
309		err := rows.Scan(&period)
310		if err != nil {
311			return nil, ErrPayout.Wrap(err)
312		}
313
314		periodList = append(periodList, period)
315	}
316	if err = rows.Err(); err != nil {
317		return nil, ErrPayout.Wrap(err)
318	}
319
320	return periodList, nil
321}
322
323// StorePayment inserts or updates payment data into the db.
324func (db *payoutDB) StorePayment(ctx context.Context, payment payouts.Payment) (err error) {
325	defer mon.Task()(&ctx)(&err)
326
327	query := `INSERT OR REPLACE INTO payments (
328			id,
329			created_at,
330			satellite_id,
331			period,
332			amount,
333			receipt,
334			notes
335		) VALUES(?,?,?,?,?,?,?)`
336
337	_, err = db.ExecContext(ctx, query,
338		payment.ID,
339		payment.Created,
340		payment.SatelliteID,
341		payment.Period,
342		payment.Amount,
343		payment.Receipt,
344		payment.Notes,
345	)
346
347	return ErrPayout.Wrap(err)
348}
349
350// SatellitesDisposedHistory returns all disposed amount for specific satellite from DB.
351func (db *payoutDB) SatellitesDisposedHistory(ctx context.Context, satelliteID storj.NodeID) (_ int64, err error) {
352	defer mon.Task()(&ctx)(&err)
353
354	query := `SELECT
355			disposed
356		FROM paystubs WHERE satellite_id = ? ORDER BY period ASC`
357
358	rows, err := db.QueryContext(ctx, query, satelliteID)
359	if err != nil {
360		return 0, err
361	}
362
363	defer func() { err = errs.Combine(err, rows.Close()) }()
364
365	var totalDisposed int64
366	for rows.Next() {
367		var disposed int64
368
369		err := rows.Scan(&disposed)
370		if err != nil {
371			return 0, ErrPayout.Wrap(err)
372		}
373
374		totalDisposed += disposed
375	}
376	if err = rows.Err(); err != nil {
377		return 0, ErrPayout.Wrap(err)
378	}
379
380	return totalDisposed, nil
381}
382
383// GetReceipt retrieves receipt data for a specific satellite and period.
384func (db *payoutDB) GetReceipt(ctx context.Context, satelliteID storj.NodeID, period string) (receipt string, err error) {
385	defer mon.Task()(&ctx)(&err)
386
387	rowPayment := db.QueryRowContext(ctx,
388		`SELECT receipt FROM payments WHERE satellite_id = ? AND period = ?`,
389		satelliteID, period,
390	)
391
392	err = rowPayment.Scan(&receipt)
393	if err != nil {
394		if errors.Is(err, sql.ErrNoRows) {
395			return "", payouts.ErrNoPayStubForPeriod.Wrap(err)
396		}
397		return "", ErrPayout.Wrap(err)
398	}
399
400	return receipt, nil
401}
402
403// GetTotalEarned returns total earned value for node from all paystubs.
404func (db *payoutDB) GetTotalEarned(ctx context.Context) (_ int64, err error) {
405	defer mon.Task()(&ctx)(&err)
406
407	query := `SELECT comp_at_rest, comp_get, comp_get_repair, comp_get_audit FROM paystubs`
408
409	rows, err := db.QueryContext(ctx, query)
410	if err != nil {
411		return 0, err
412	}
413
414	defer func() { err = errs.Combine(err, rows.Close()) }()
415
416	var totalEarned int64
417	for rows.Next() {
418		var compAtRest, compGet, compGetRepair, compGetAudit int64
419
420		err := rows.Scan(&compAtRest, &compGet, &compGetRepair, &compGetAudit)
421		if err != nil {
422			return 0, ErrPayout.Wrap(err)
423		}
424
425		totalEarned += compGetAudit + compGet + compGetRepair + compAtRest
426	}
427	if err = rows.Err(); err != nil {
428		return 0, ErrPayout.Wrap(err)
429	}
430
431	return totalEarned, nil
432}
433
434// GetEarnedAtSatellite returns total earned value for node from specific satellite.
435func (db *payoutDB) GetEarnedAtSatellite(ctx context.Context, id storj.NodeID) (_ int64, err error) {
436	defer mon.Task()(&ctx)(&err)
437
438	query := `SELECT comp_at_rest, comp_get, comp_get_repair, comp_get_audit FROM paystubs WHERE satellite_id = ?`
439
440	rows, err := db.QueryContext(ctx, query, id)
441	if err != nil {
442		return 0, err
443	}
444
445	defer func() { err = errs.Combine(err, rows.Close()) }()
446
447	var totalEarned int64
448	for rows.Next() {
449		var compAtRest, compGet, compGetRepair, compGetAudit int64
450
451		err := rows.Scan(&compAtRest, &compGet, &compGetRepair, &compGetAudit)
452		if err != nil {
453			return 0, ErrPayout.Wrap(err)
454		}
455
456		totalEarned += compGetAudit + compGet + compGetRepair + compAtRest
457	}
458
459	if err = rows.Err(); err != nil {
460		return 0, ErrPayout.Wrap(err)
461	}
462
463	return totalEarned, nil
464}
465
466// GetPayingSatellitesIDs returns list of satellite ID's that ever paid to storagenode.
467func (db *payoutDB) GetPayingSatellitesIDs(ctx context.Context) (_ []storj.NodeID, err error) {
468	defer mon.Task()(&ctx)(&err)
469
470	query := `SELECT DISTINCT (satellite_id) FROM paystubs`
471
472	rows, err := db.QueryContext(ctx, query)
473	if err != nil {
474		return nil, err
475	}
476
477	defer func() { err = errs.Combine(err, rows.Close()) }()
478
479	var satelliteIDs []storj.NodeID
480	for rows.Next() {
481		var satelliteID storj.NodeID
482
483		err := rows.Scan(&satelliteID)
484		if err != nil {
485			if errors.Is(err, sql.ErrNoRows) {
486				return []storj.NodeID{}, nil
487			}
488
489			return nil, ErrPayout.Wrap(err)
490		}
491
492		satelliteIDs = append(satelliteIDs, satelliteID)
493	}
494	if err = rows.Err(); err != nil {
495		return nil, ErrPayout.Wrap(err)
496	}
497
498	return satelliteIDs, nil
499}
500
501// GetSatelliteSummary returns satellite all time paid and held amounts.
502func (db *payoutDB) GetSatelliteSummary(ctx context.Context, satelliteID storj.NodeID) (_, _ int64, err error) {
503	defer mon.Task()(&ctx)(&err)
504
505	query := `SELECT paid, held FROM paystubs WHERE satellite_id = ?`
506
507	rows, err := db.QueryContext(ctx, query, satelliteID)
508	if err != nil {
509		return 0, 0, err
510	}
511
512	defer func() { err = errs.Combine(err, rows.Close()) }()
513
514	var paid, held int64
515	for rows.Next() {
516		var paidPeriod, heldPeriod int64
517
518		err := rows.Scan(&paidPeriod, &heldPeriod)
519		if err != nil {
520			if errors.Is(err, sql.ErrNoRows) {
521				return 0, 0, nil
522			}
523
524			return 0, 0, ErrPayout.Wrap(err)
525		}
526
527		paid += paidPeriod
528		held += heldPeriod
529	}
530	if err = rows.Err(); err != nil {
531		return 0, 0, ErrPayout.Wrap(err)
532	}
533
534	return paid, held, nil
535}
536
537// GetSatellitePeriodSummary returns satellite paid and held amounts for specific period.
538func (db *payoutDB) GetSatellitePeriodSummary(ctx context.Context, satelliteID storj.NodeID, period string) (_, _ int64, err error) {
539	defer mon.Task()(&ctx)(&err)
540
541	query := `SELECT paid, held FROM paystubs WHERE satellite_id = ? AND period = ?`
542
543	rows, err := db.QueryContext(ctx, query, satelliteID, period)
544	if err != nil {
545		return 0, 0, err
546	}
547
548	defer func() { err = errs.Combine(err, rows.Close()) }()
549
550	var paid, held int64
551	for rows.Next() {
552		err := rows.Scan(&paid, &held)
553		if err != nil {
554			if errors.Is(err, sql.ErrNoRows) {
555				return 0, 0, nil
556			}
557
558			return 0, 0, ErrPayout.Wrap(err)
559		}
560	}
561	if err = rows.Err(); err != nil {
562		return 0, 0, ErrPayout.Wrap(err)
563	}
564
565	return paid, held, nil
566}
567
568// GetUndistributed returns total undistributed amount.
569func (db *payoutDB) GetUndistributed(ctx context.Context) (_ int64, err error) {
570	defer mon.Task()(&ctx)(&err)
571
572	var distributed, paid int64
573
574	rowPayment := db.QueryRowContext(ctx,
575		`SELECT COALESCE(SUM(distributed),0), COALESCE(SUM(paid), 0) FROM paystubs`)
576
577	err = rowPayment.Scan(&distributed, &paid)
578	if err != nil {
579		if errors.Is(err, sql.ErrNoRows) {
580			return 0, payouts.ErrNoPayStubForPeriod.Wrap(err)
581		}
582
583		return 0, ErrPayout.Wrap(err)
584	}
585
586	return paid - distributed, nil
587}
588
589// GetSatellitePaystubs returns summed paystubs for specific satellite.
590func (db *payoutDB) GetSatellitePaystubs(ctx context.Context, satelliteID storj.NodeID) (_ *payouts.PayStub, err error) {
591	defer mon.Task()(&ctx)(&err)
592
593	rowPayment := db.QueryRowContext(ctx,
594		`SELECT COALESCE(SUM(usage_at_rest),0), COALESCE(SUM(usage_get),0), COALESCE(SUM(usage_get_repair),0), COALESCE(SUM(usage_get_audit),0),
595			COALESCE(SUM(comp_at_rest),0), COALESCE(SUM(comp_get),0), COALESCE(SUM(comp_get_repair),0), COALESCE(SUM(comp_get_audit),0),
596			COALESCE(SUM(held),0), COALESCE(SUM(paid),0), COALESCE(SUM(distributed),0), COALESCE(SUM(disposed),0) from paystubs WHERE satellite_id = $1`, satelliteID)
597
598	var paystub payouts.PayStub
599
600	err = rowPayment.Scan(
601		&paystub.UsageAtRest,
602		&paystub.UsageGet,
603		&paystub.UsageGetRepair,
604		&paystub.UsageGetAudit,
605		&paystub.CompAtRest,
606		&paystub.CompGet,
607		&paystub.CompGetRepair,
608		&paystub.CompGetAudit,
609		&paystub.Held,
610		&paystub.Paid,
611		&paystub.Distributed,
612		&paystub.Disposed,
613	)
614	if err != nil {
615		return &payouts.PayStub{}, ErrPayout.Wrap(err)
616	}
617
618	return &paystub, nil
619}
620
621// GetPaystubs returns summed all paystubs.
622func (db *payoutDB) GetPaystubs(ctx context.Context) (_ *payouts.PayStub, err error) {
623	defer mon.Task()(&ctx)(&err)
624
625	rowPayment := db.QueryRowContext(ctx,
626		`SELECT COALESCE(SUM(usage_at_rest),0), COALESCE(SUM(usage_get),0), COALESCE(SUM(usage_get_repair),0), COALESCE(SUM(usage_get_audit),0),
627			COALESCE(SUM(comp_at_rest),0), COALESCE(SUM(comp_get),0), COALESCE(SUM(comp_get_repair),0), COALESCE(SUM(comp_get_audit),0),
628			COALESCE(SUM(held),0), COALESCE(SUM(paid),0), COALESCE(SUM(distributed),0), COALESCE(SUM(disposed),0) from paystubs`)
629
630	var paystub payouts.PayStub
631
632	err = rowPayment.Scan(
633		&paystub.UsageAtRest,
634		&paystub.UsageGet,
635		&paystub.UsageGetRepair,
636		&paystub.UsageGetAudit,
637		&paystub.CompAtRest,
638		&paystub.CompGet,
639		&paystub.CompGetRepair,
640		&paystub.CompGetAudit,
641		&paystub.Held,
642		&paystub.Paid,
643		&paystub.Distributed,
644		&paystub.Disposed,
645	)
646	if err != nil {
647		return &payouts.PayStub{}, ErrPayout.Wrap(err)
648	}
649
650	return &paystub, nil
651}
652
653// GetPeriodPaystubs returns all satellites paystubs for specific period.
654func (db *payoutDB) GetPeriodPaystubs(ctx context.Context, period string) (_ *payouts.PayStub, err error) {
655	defer mon.Task()(&ctx)(&err)
656
657	rowPayment := db.QueryRowContext(ctx,
658		`SELECT COALESCE(SUM(usage_at_rest),0), COALESCE(SUM(usage_get),0), COALESCE(SUM(usage_get_repair),0), COALESCE(SUM(usage_get_audit),0),
659			COALESCE(SUM(comp_at_rest),0), COALESCE(SUM(comp_get),0), COALESCE(SUM(comp_get_repair),0), COALESCE(SUM(comp_get_audit),0),
660			COALESCE(SUM(held),0), COALESCE(SUM(paid),0), COALESCE(SUM(distributed),0), COALESCE(SUM(disposed),0) from paystubs WHERE period = $1`, period)
661
662	var paystub payouts.PayStub
663
664	err = rowPayment.Scan(
665		&paystub.UsageAtRest,
666		&paystub.UsageGet,
667		&paystub.UsageGetRepair,
668		&paystub.UsageGetAudit,
669		&paystub.CompAtRest,
670		&paystub.CompGet,
671		&paystub.CompGetRepair,
672		&paystub.CompGetAudit,
673		&paystub.Held,
674		&paystub.Paid,
675		&paystub.Distributed,
676		&paystub.Disposed,
677	)
678	if err != nil {
679		return &payouts.PayStub{}, ErrPayout.Wrap(err)
680	}
681
682	return &paystub, nil
683}
684
685// GetSatellitePeriodPaystubs returns summed satellite paystubs for specific period.
686func (db *payoutDB) GetSatellitePeriodPaystubs(ctx context.Context, period string, satelliteID storj.NodeID) (_ *payouts.PayStub, err error) {
687	defer mon.Task()(&ctx)(&err)
688
689	rowPayment := db.QueryRowContext(ctx,
690		`SELECT COALESCE(SUM(usage_at_rest),0), COALESCE(SUM(usage_get),0), COALESCE(SUM(usage_get_repair),0), COALESCE(SUM(usage_get_audit),0),
691			COALESCE(SUM(comp_at_rest),0), COALESCE(SUM(comp_get),0), COALESCE(SUM(comp_get_repair),0), COALESCE(SUM(comp_get_audit),0),
692			COALESCE(SUM(held),0), COALESCE(SUM(paid),0), COALESCE(SUM(distributed),0), COALESCE(SUM(disposed),0) from paystubs WHERE period = $1 AND satellite_id = $2`, period, satelliteID)
693
694	var paystub payouts.PayStub
695
696	err = rowPayment.Scan(
697		&paystub.UsageAtRest,
698		&paystub.UsageGet,
699		&paystub.UsageGetRepair,
700		&paystub.UsageGetAudit,
701		&paystub.CompAtRest,
702		&paystub.CompGet,
703		&paystub.CompGetRepair,
704		&paystub.CompGetAudit,
705		&paystub.Held,
706		&paystub.Paid,
707		&paystub.Distributed,
708		&paystub.Disposed,
709	)
710	if err != nil {
711		return &payouts.PayStub{}, ErrPayout.Wrap(err)
712	}
713
714	return &paystub, nil
715}
716
717// HeldAmountHistory retrieves held amount history for all satellites.
718func (db *payoutDB) HeldAmountHistory(ctx context.Context) (_ []payouts.HeldAmountHistory, err error) {
719	defer mon.Task()(&ctx)(&err)
720
721	query := `
722		SELECT
723			satellite_id,
724			period,
725			held
726		FROM paystubs
727		ORDER BY satellite_id, period ASC`
728
729	rows, err := db.QueryContext(ctx, query)
730	if err != nil {
731		return nil, err
732	}
733	defer func() {
734		err = errs.Combine(err, rows.Close())
735	}()
736
737	cache := make(map[storj.NodeID]payouts.HeldAmountHistory)
738
739	for rows.Next() {
740		var idBytes []byte
741		var held payouts.HeldForPeriod
742
743		err := rows.Scan(&idBytes, &held.Period, &held.Amount)
744		if err != nil {
745			return nil, ErrPayout.Wrap(err)
746		}
747
748		satelliteID, err := storj.NodeIDFromBytes(idBytes)
749		if err != nil {
750			return nil, ErrPayout.Wrap(err)
751		}
752
753		satelliteHeldHistory := cache[satelliteID]
754		satelliteHeldHistory.HeldAmounts = append(satelliteHeldHistory.HeldAmounts, held)
755		cache[satelliteID] = satelliteHeldHistory
756	}
757	if err = rows.Err(); err != nil {
758		return nil, ErrPayout.Wrap(err)
759	}
760
761	var heldHistories []payouts.HeldAmountHistory
762	for satelliteID, heldHistory := range cache {
763		heldHistory.SatelliteID = satelliteID
764		heldHistories = append(heldHistories, heldHistory)
765	}
766
767	return heldHistories, nil
768}
769