1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package storagenodedb 5 6import ( 7 "context" 8 "database/sql" 9 "time" 10 11 "github.com/zeebo/errs" 12 13 "storj.io/common/storj" 14 "storj.io/private/tagsql" 15 "storj.io/storj/storagenode/storageusage" 16) 17 18// StorageUsageDBName represents the database name. 19const StorageUsageDBName = "storage_usage" 20 21// storageUsageDB storage usage DB. 22type storageUsageDB struct { 23 dbContainerImpl 24} 25 26// Store stores storage usage stamps to db replacing conflicting entries. 27func (db *storageUsageDB) Store(ctx context.Context, stamps []storageusage.Stamp) (err error) { 28 defer mon.Task()(&ctx)(&err) 29 30 if len(stamps) == 0 { 31 return nil 32 } 33 34 query := `INSERT OR REPLACE INTO storage_usage(satellite_id, at_rest_total, interval_start) 35 VALUES(?,?,?)` 36 37 return withTx(ctx, db.GetDB(), func(tx tagsql.Tx) error { 38 for _, stamp := range stamps { 39 _, err = tx.ExecContext(ctx, query, stamp.SatelliteID, stamp.AtRestTotal, stamp.IntervalStart.UTC()) 40 41 if err != nil { 42 return err 43 } 44 } 45 46 return nil 47 }) 48} 49 50// GetDaily returns daily storage usage stamps for particular satellite 51// for provided time range. 52func (db *storageUsageDB) GetDaily(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []storageusage.Stamp, err error) { 53 defer mon.Task()(&ctx)(&err) 54 55 query := `SELECT satellite_id, 56 SUM(at_rest_total), 57 interval_start 58 FROM storage_usage 59 WHERE satellite_id = ? 60 AND ? <= interval_start AND interval_start <= ? 61 GROUP BY DATE(interval_start) 62 ORDER BY interval_start` 63 64 rows, err := db.QueryContext(ctx, query, satelliteID, from.UTC(), to.UTC()) 65 if err != nil { 66 return nil, err 67 } 68 defer func() { err = errs.Combine(err, rows.Close()) }() 69 70 var stamps []storageusage.Stamp 71 for rows.Next() { 72 var satellite storj.NodeID 73 var atRestTotal float64 74 var intervalStart time.Time 75 76 err = rows.Scan(&satellite, &atRestTotal, &intervalStart) 77 if err != nil { 78 return nil, err 79 } 80 81 stamps = append(stamps, storageusage.Stamp{ 82 SatelliteID: satellite, 83 AtRestTotal: atRestTotal, 84 IntervalStart: intervalStart, 85 }) 86 } 87 88 return stamps, rows.Err() 89} 90 91// GetDailyTotal returns daily storage usage stamps summed across all known satellites 92// for provided time range. 93func (db *storageUsageDB) GetDailyTotal(ctx context.Context, from, to time.Time) (_ []storageusage.Stamp, err error) { 94 defer mon.Task()(&ctx)(&err) 95 96 query := `SELECT SUM(at_rest_total), interval_start 97 FROM storage_usage 98 WHERE ? <= interval_start AND interval_start <= ? 99 GROUP BY DATE(interval_start) 100 ORDER BY interval_start` 101 102 rows, err := db.QueryContext(ctx, query, from.UTC(), to.UTC()) 103 if err != nil { 104 return nil, err 105 } 106 defer func() { 107 err = errs.Combine(err, rows.Close()) 108 }() 109 110 var stamps []storageusage.Stamp 111 for rows.Next() { 112 var atRestTotal float64 113 var intervalStart time.Time 114 115 err = rows.Scan(&atRestTotal, &intervalStart) 116 if err != nil { 117 return nil, err 118 } 119 120 stamps = append(stamps, storageusage.Stamp{ 121 AtRestTotal: atRestTotal, 122 IntervalStart: intervalStart, 123 }) 124 } 125 126 return stamps, rows.Err() 127} 128 129// Summary returns aggregated storage usage across all satellites. 130func (db *storageUsageDB) Summary(ctx context.Context, from, to time.Time) (_ float64, err error) { 131 defer mon.Task()(&ctx, from, to)(&err) 132 var summary sql.NullFloat64 133 134 query := `SELECT SUM(at_rest_total) 135 FROM storage_usage 136 WHERE ? <= interval_start AND interval_start <= ?` 137 138 err = db.QueryRowContext(ctx, query, from.UTC(), to.UTC()).Scan(&summary) 139 return summary.Float64, err 140} 141 142// SatelliteSummary returns aggregated storage usage for a particular satellite. 143func (db *storageUsageDB) SatelliteSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ float64, err error) { 144 defer mon.Task()(&ctx, satelliteID, from, to)(&err) 145 var summary sql.NullFloat64 146 147 query := `SELECT SUM(at_rest_total) 148 FROM storage_usage 149 WHERE satellite_id = ? 150 AND ? <= interval_start AND interval_start <= ?` 151 152 err = db.QueryRowContext(ctx, query, satelliteID, from.UTC(), to.UTC()).Scan(&summary) 153 return summary.Float64, err 154} 155