1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package storagenodedb
5
6import (
7	"context"
8	"database/sql"
9	"time"
10
11	"github.com/zeebo/errs"
12
13	"storj.io/common/storj"
14	"storj.io/private/tagsql"
15	"storj.io/storj/storagenode/storageusage"
16)
17
18// StorageUsageDBName represents the database name.
19const StorageUsageDBName = "storage_usage"
20
21// storageUsageDB storage usage DB.
22type storageUsageDB struct {
23	dbContainerImpl
24}
25
26// Store stores storage usage stamps to db replacing conflicting entries.
27func (db *storageUsageDB) Store(ctx context.Context, stamps []storageusage.Stamp) (err error) {
28	defer mon.Task()(&ctx)(&err)
29
30	if len(stamps) == 0 {
31		return nil
32	}
33
34	query := `INSERT OR REPLACE INTO storage_usage(satellite_id, at_rest_total, interval_start)
35			VALUES(?,?,?)`
36
37	return withTx(ctx, db.GetDB(), func(tx tagsql.Tx) error {
38		for _, stamp := range stamps {
39			_, err = tx.ExecContext(ctx, query, stamp.SatelliteID, stamp.AtRestTotal, stamp.IntervalStart.UTC())
40
41			if err != nil {
42				return err
43			}
44		}
45
46		return nil
47	})
48}
49
50// GetDaily returns daily storage usage stamps for particular satellite
51// for provided time range.
52func (db *storageUsageDB) GetDaily(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []storageusage.Stamp, err error) {
53	defer mon.Task()(&ctx)(&err)
54
55	query := `SELECT satellite_id,
56					SUM(at_rest_total),
57					interval_start
58				FROM storage_usage
59				WHERE satellite_id = ?
60				AND ? <= interval_start AND interval_start <= ?
61				GROUP BY DATE(interval_start)
62				ORDER BY interval_start`
63
64	rows, err := db.QueryContext(ctx, query, satelliteID, from.UTC(), to.UTC())
65	if err != nil {
66		return nil, err
67	}
68	defer func() { err = errs.Combine(err, rows.Close()) }()
69
70	var stamps []storageusage.Stamp
71	for rows.Next() {
72		var satellite storj.NodeID
73		var atRestTotal float64
74		var intervalStart time.Time
75
76		err = rows.Scan(&satellite, &atRestTotal, &intervalStart)
77		if err != nil {
78			return nil, err
79		}
80
81		stamps = append(stamps, storageusage.Stamp{
82			SatelliteID:   satellite,
83			AtRestTotal:   atRestTotal,
84			IntervalStart: intervalStart,
85		})
86	}
87
88	return stamps, rows.Err()
89}
90
91// GetDailyTotal returns daily storage usage stamps summed across all known satellites
92// for provided time range.
93func (db *storageUsageDB) GetDailyTotal(ctx context.Context, from, to time.Time) (_ []storageusage.Stamp, err error) {
94	defer mon.Task()(&ctx)(&err)
95
96	query := `SELECT SUM(at_rest_total), interval_start
97				FROM storage_usage
98				WHERE ? <= interval_start AND interval_start <= ?
99				GROUP BY DATE(interval_start)
100				ORDER BY interval_start`
101
102	rows, err := db.QueryContext(ctx, query, from.UTC(), to.UTC())
103	if err != nil {
104		return nil, err
105	}
106	defer func() {
107		err = errs.Combine(err, rows.Close())
108	}()
109
110	var stamps []storageusage.Stamp
111	for rows.Next() {
112		var atRestTotal float64
113		var intervalStart time.Time
114
115		err = rows.Scan(&atRestTotal, &intervalStart)
116		if err != nil {
117			return nil, err
118		}
119
120		stamps = append(stamps, storageusage.Stamp{
121			AtRestTotal:   atRestTotal,
122			IntervalStart: intervalStart,
123		})
124	}
125
126	return stamps, rows.Err()
127}
128
129// Summary returns aggregated storage usage across all satellites.
130func (db *storageUsageDB) Summary(ctx context.Context, from, to time.Time) (_ float64, err error) {
131	defer mon.Task()(&ctx, from, to)(&err)
132	var summary sql.NullFloat64
133
134	query := `SELECT SUM(at_rest_total)
135				FROM storage_usage
136				WHERE ? <= interval_start AND interval_start <= ?`
137
138	err = db.QueryRowContext(ctx, query, from.UTC(), to.UTC()).Scan(&summary)
139	return summary.Float64, err
140}
141
142// SatelliteSummary returns aggregated storage usage for a particular satellite.
143func (db *storageUsageDB) SatelliteSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ float64, err error) {
144	defer mon.Task()(&ctx, satelliteID, from, to)(&err)
145	var summary sql.NullFloat64
146
147	query := `SELECT SUM(at_rest_total)
148				FROM storage_usage
149				WHERE satellite_id = ?
150				AND ? <= interval_start AND interval_start <= ?`
151
152	err = db.QueryRowContext(ctx, query, satelliteID, from.UTC(), to.UTC()).Scan(&summary)
153	return summary.Float64, err
154}
155