1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package storagenodedb
5
6import (
7	"context"
8	"database/sql"
9	"errors"
10	"sync"
11	"time"
12
13	"github.com/zeebo/errs"
14
15	"storj.io/common/pb"
16	"storj.io/common/storj"
17	"storj.io/private/dbutil"
18	"storj.io/storj/private/date"
19	"storj.io/storj/storagenode/bandwidth"
20)
21
22// ErrBandwidth represents errors from the bandwidthdb database.
23var ErrBandwidth = errs.Class("bandwidthdb")
24
25// BandwidthDBName represents the database name.
26const BandwidthDBName = "bandwidth"
27
28type bandwidthDB struct {
29	// Moved to top of struct to resolve alignment issue with atomic operations on ARM
30	usedSpace int64
31	usedMu    sync.RWMutex
32	usedSince time.Time
33
34	dbContainerImpl
35}
36
37// Add adds bandwidth usage to the table.
38func (db *bandwidthDB) Add(ctx context.Context, satelliteID storj.NodeID, action pb.PieceAction, amount int64, created time.Time) (err error) {
39	defer mon.Task()(&ctx)(&err)
40	_, err = db.ExecContext(ctx, `
41		INSERT INTO
42			bandwidth_usage(satellite_id, action, amount, created_at)
43		VALUES(?, ?, ?, datetime(?))`, satelliteID, action, amount, created.UTC())
44	if err == nil {
45		db.usedMu.Lock()
46		defer db.usedMu.Unlock()
47
48		beginningOfMonth := getBeginningOfMonth(created.UTC())
49		if beginningOfMonth.Equal(db.usedSince) {
50			db.usedSpace += amount
51		} else if beginningOfMonth.After(db.usedSince) {
52			usage, err := db.Summary(ctx, beginningOfMonth, time.Now())
53			if err != nil {
54				return err
55			}
56			db.usedSince = beginningOfMonth
57			db.usedSpace = usage.Total()
58		}
59	}
60	return ErrBandwidth.Wrap(err)
61}
62
63// MonthSummary returns summary of the current months bandwidth usages.
64func (db *bandwidthDB) MonthSummary(ctx context.Context, now time.Time) (_ int64, err error) {
65	defer mon.Task()(&ctx)(&err)
66
67	db.usedMu.RLock()
68	beginningOfMonth := getBeginningOfMonth(now)
69	if beginningOfMonth.Equal(db.usedSince) {
70		defer db.usedMu.RUnlock()
71		return db.usedSpace, nil
72	}
73	db.usedMu.RUnlock()
74
75	usage, err := db.Summary(ctx, beginningOfMonth, now)
76	if err != nil {
77		return 0, err
78	}
79	// Just return the usage, don't update the cache. Let add handle updates
80	return usage.Total(), nil
81}
82
83// actionFilter sums bandwidth depending on piece action type.
84type actionFilter func(action pb.PieceAction, amount int64, usage *bandwidth.Usage)
85
86var (
87	// ingressFilter sums put and put repair.
88	ingressFilter actionFilter = func(action pb.PieceAction, amount int64, usage *bandwidth.Usage) {
89		switch action {
90		case pb.PieceAction_PUT, pb.PieceAction_PUT_REPAIR:
91			usage.Include(action, amount)
92		}
93	}
94
95	// egressFilter sums get, get audit and get repair.
96	egressFilter actionFilter = func(action pb.PieceAction, amount int64, usage *bandwidth.Usage) {
97		switch action {
98		case pb.PieceAction_GET, pb.PieceAction_GET_AUDIT, pb.PieceAction_GET_REPAIR:
99			usage.Include(action, amount)
100		}
101	}
102
103	// bandwidthFilter sums all bandwidth.
104	bandwidthFilter actionFilter = func(action pb.PieceAction, amount int64, usage *bandwidth.Usage) {
105		usage.Include(action, amount)
106	}
107)
108
109// Summary returns summary of bandwidth usages for all satellites.
110func (db *bandwidthDB) Summary(ctx context.Context, from, to time.Time) (_ *bandwidth.Usage, err error) {
111	defer mon.Task()(&ctx)(&err)
112
113	return db.getSummary(ctx, from, to, bandwidthFilter)
114}
115
116// EgressSummary returns summary of egress usages for all satellites.
117func (db *bandwidthDB) EgressSummary(ctx context.Context, from, to time.Time) (_ *bandwidth.Usage, err error) {
118	defer mon.Task()(&ctx)(&err)
119
120	return db.getSummary(ctx, from, to, egressFilter)
121}
122
123// IngressSummary returns summary of ingress usages for all satellites.
124func (db *bandwidthDB) IngressSummary(ctx context.Context, from, to time.Time) (_ *bandwidth.Usage, err error) {
125	defer mon.Task()(&ctx)(&err)
126
127	return db.getSummary(ctx, from, to, ingressFilter)
128}
129
130// getSummary returns bandwidth data for all satellites.
131func (db *bandwidthDB) getSummary(ctx context.Context, from, to time.Time, filter actionFilter) (_ *bandwidth.Usage, err error) {
132	defer mon.Task()(&ctx)(&err)
133
134	usage := &bandwidth.Usage{}
135
136	from, to = from.UTC(), to.UTC()
137
138	rows, err := db.QueryContext(ctx, `
139		SELECT action, sum(a) amount from(
140				SELECT action, sum(amount) a
141				FROM bandwidth_usage
142				WHERE datetime(?) <= created_at AND created_at <= datetime(?)
143				GROUP BY action
144				UNION ALL
145				SELECT action, sum(amount) a
146				FROM bandwidth_usage_rollups
147				WHERE datetime(?) <= interval_start AND interval_start <= datetime(?)
148				GROUP BY action
149		) GROUP BY action;
150		`, from, to, from, to)
151	if err != nil {
152		if errors.Is(err, sql.ErrNoRows) {
153			return usage, nil
154		}
155		return nil, ErrBandwidth.Wrap(err)
156	}
157	defer func() { err = errs.Combine(err, rows.Close()) }()
158
159	for rows.Next() {
160		var action pb.PieceAction
161		var amount int64
162
163		err := rows.Scan(&action, &amount)
164		if err != nil {
165			return nil, ErrBandwidth.Wrap(err)
166		}
167
168		filter(action, amount, usage)
169	}
170
171	return usage, ErrBandwidth.Wrap(rows.Err())
172}
173
174// SatelliteSummary returns summary of bandwidth usages for a particular satellite.
175func (db *bandwidthDB) SatelliteSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ *bandwidth.Usage, err error) {
176	defer mon.Task()(&ctx, satelliteID, from, to)(&err)
177
178	return db.getSatelliteSummary(ctx, satelliteID, from, to, bandwidthFilter)
179}
180
181// SatelliteEgressSummary returns summary of egress usage for a particular satellite.
182func (db *bandwidthDB) SatelliteEgressSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ *bandwidth.Usage, err error) {
183	defer mon.Task()(&ctx, satelliteID, from, to)(&err)
184
185	return db.getSatelliteSummary(ctx, satelliteID, from, to, egressFilter)
186}
187
188// SatelliteIngressSummary returns summary of ingress usage for a particular satellite.
189func (db *bandwidthDB) SatelliteIngressSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ *bandwidth.Usage, err error) {
190	defer mon.Task()(&ctx, satelliteID, from, to)(&err)
191
192	return db.getSatelliteSummary(ctx, satelliteID, from, to, ingressFilter)
193}
194
195// getSummary returns bandwidth data for a particular satellite.
196func (db *bandwidthDB) getSatelliteSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time, filter actionFilter) (_ *bandwidth.Usage, err error) {
197	defer mon.Task()(&ctx, satelliteID, from, to)(&err)
198
199	from, to = from.UTC(), to.UTC()
200
201	query := `SELECT action, sum(a) amount from(
202			SELECT action, sum(amount) a
203				FROM bandwidth_usage
204				WHERE datetime(?) <= created_at AND created_at <= datetime(?)
205				AND satellite_id = ?
206				GROUP BY action
207			UNION ALL
208			SELECT action, sum(amount) a
209				FROM bandwidth_usage_rollups
210				WHERE datetime(?) <= interval_start AND interval_start <= datetime(?)
211				AND satellite_id = ?
212				GROUP BY action
213		) GROUP BY action;`
214
215	rows, err := db.QueryContext(ctx, query, from, to, satelliteID, from, to, satelliteID)
216	if err != nil {
217		return nil, ErrBandwidth.Wrap(err)
218	}
219	defer func() {
220		err = ErrBandwidth.Wrap(errs.Combine(err, rows.Close()))
221	}()
222
223	usage := new(bandwidth.Usage)
224	for rows.Next() {
225		var action pb.PieceAction
226		var amount int64
227
228		err := rows.Scan(&action, &amount)
229		if err != nil {
230			return nil, err
231		}
232
233		filter(action, amount, usage)
234	}
235
236	return usage, ErrBandwidth.Wrap(rows.Err())
237}
238
239// SummaryBySatellite returns summary of bandwidth usage grouping by satellite.
240func (db *bandwidthDB) SummaryBySatellite(ctx context.Context, from, to time.Time) (_ map[storj.NodeID]*bandwidth.Usage, err error) {
241	defer mon.Task()(&ctx)(&err)
242
243	entries := map[storj.NodeID]*bandwidth.Usage{}
244
245	from, to = from.UTC(), to.UTC()
246
247	rows, err := db.QueryContext(ctx, `
248	SELECT satellite_id, action, sum(a) amount from(
249		SELECT satellite_id, action, sum(amount) a
250			FROM bandwidth_usage
251			WHERE datetime(?) <= created_at AND created_at <= datetime(?)
252			GROUP BY satellite_id, action
253		UNION ALL
254		SELECT satellite_id, action, sum(amount) a
255			FROM bandwidth_usage_rollups
256			WHERE datetime(?) <= interval_start AND interval_start <= datetime(?)
257			GROUP BY satellite_id, action
258	) GROUP BY satellite_id, action;
259		`, from, to, from, to)
260	if err != nil {
261		if errors.Is(err, sql.ErrNoRows) {
262			return entries, nil
263		}
264		return nil, ErrBandwidth.Wrap(err)
265	}
266	defer func() { err = errs.Combine(err, rows.Close()) }()
267
268	for rows.Next() {
269		var satelliteID storj.NodeID
270		var action pb.PieceAction
271		var amount int64
272
273		err := rows.Scan(&satelliteID, &action, &amount)
274		if err != nil {
275			return nil, ErrBandwidth.Wrap(err)
276		}
277
278		entry, ok := entries[satelliteID]
279		if !ok {
280			entry = &bandwidth.Usage{}
281			entries[satelliteID] = entry
282		}
283
284		entry.Include(action, amount)
285	}
286
287	return entries, ErrBandwidth.Wrap(rows.Err())
288}
289
290// Rollup bandwidth_usage data earlier than the current hour, then delete the rolled up records.
291func (db *bandwidthDB) Rollup(ctx context.Context) (err error) {
292	defer mon.Task()(&ctx)(&err)
293
294	now := time.Now().UTC()
295
296	// Go back an hour to give us room for late persists
297	hour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()).Add(-time.Hour)
298
299	tx, err := db.BeginTx(ctx, nil)
300	if err != nil {
301		return ErrBandwidth.Wrap(err)
302	}
303
304	defer func() {
305		if err == nil {
306			err = tx.Commit()
307		} else {
308			err = errs.Combine(err, tx.Rollback())
309		}
310	}()
311
312	result, err := tx.ExecContext(ctx, `
313		INSERT INTO bandwidth_usage_rollups (interval_start, satellite_id,  action, amount)
314		SELECT datetime(strftime('%Y-%m-%dT%H:00:00', created_at)) created_hr, satellite_id, action, SUM(amount)
315			FROM bandwidth_usage
316		WHERE created_at < datetime(?)
317		GROUP BY created_hr, satellite_id, action
318		ON CONFLICT(interval_start, satellite_id,  action)
319		DO UPDATE SET amount = bandwidth_usage_rollups.amount + excluded.amount;
320
321		DELETE FROM bandwidth_usage WHERE created_at < datetime(?);
322	`, hour, hour)
323	if err != nil {
324		return ErrBandwidth.Wrap(err)
325	}
326
327	_, err = result.RowsAffected()
328	if err != nil {
329		return ErrBandwidth.Wrap(err)
330	}
331
332	return nil
333}
334
335// GetDailyRollups returns slice of daily bandwidth usage rollups for provided time range,
336// sorted in ascending order.
337func (db *bandwidthDB) GetDailyRollups(ctx context.Context, from, to time.Time) (_ []bandwidth.UsageRollup, err error) {
338	defer mon.Task()(&ctx, from, to)(&err)
339
340	since, _ := date.DayBoundary(from.UTC())
341	_, before := date.DayBoundary(to.UTC())
342
343	return db.getDailyUsageRollups(ctx,
344		"WHERE datetime(?) <= interval_start AND interval_start <= datetime(?)",
345		since, before)
346}
347
348// GetDailySatelliteRollups returns slice of daily bandwidth usage for provided time range,
349// sorted in ascending order for a particular satellite.
350func (db *bandwidthDB) GetDailySatelliteRollups(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []bandwidth.UsageRollup, err error) {
351	defer mon.Task()(&ctx, satelliteID, from, to)(&err)
352
353	since, _ := date.DayBoundary(from.UTC())
354	_, before := date.DayBoundary(to.UTC())
355
356	return db.getDailyUsageRollups(ctx,
357		"WHERE satellite_id = ? AND datetime(?) <= interval_start AND interval_start <= datetime(?)",
358		satelliteID, since, before)
359}
360
361// getDailyUsageRollups returns slice of grouped by date bandwidth usage rollups
362// sorted in ascending order and applied condition if any.
363func (db *bandwidthDB) getDailyUsageRollups(ctx context.Context, cond string, args ...interface{}) (_ []bandwidth.UsageRollup, err error) {
364	defer mon.Task()(&ctx)(&err)
365
366	query := `SELECT action, sum(a) as amount, DATETIME(DATE(interval_start)) as date FROM (
367			SELECT action, sum(amount) as a, created_at AS interval_start
368				FROM bandwidth_usage
369				` + cond + `
370				GROUP BY interval_start, action
371			UNION ALL
372			SELECT action, sum(amount) as a, interval_start
373				FROM bandwidth_usage_rollups
374				` + cond + `
375				GROUP BY interval_start, action
376		) GROUP BY date, action
377		ORDER BY interval_start`
378
379	// duplicate args as they are used twice
380	args = append(args, args...)
381
382	rows, err := db.QueryContext(ctx, query, args...)
383	if err != nil {
384		return nil, ErrBandwidth.Wrap(err)
385	}
386	defer func() {
387		err = ErrBandwidth.Wrap(errs.Combine(err, rows.Close()))
388	}()
389
390	var dates []time.Time
391	usageRollupsByDate := make(map[time.Time]*bandwidth.UsageRollup)
392
393	for rows.Next() {
394		var action int32
395		var amount int64
396		var intervalStartN dbutil.NullTime
397
398		err = rows.Scan(&action, &amount, &intervalStartN)
399		if err != nil {
400			return nil, err
401		}
402
403		intervalStart := intervalStartN.Time
404
405		rollup, ok := usageRollupsByDate[intervalStart]
406		if !ok {
407			rollup = &bandwidth.UsageRollup{
408				IntervalStart: intervalStart,
409			}
410
411			dates = append(dates, intervalStart)
412			usageRollupsByDate[intervalStart] = rollup
413		}
414
415		switch pb.PieceAction(action) {
416		case pb.PieceAction_GET:
417			rollup.Egress.Usage = amount
418		case pb.PieceAction_GET_AUDIT:
419			rollup.Egress.Audit = amount
420		case pb.PieceAction_GET_REPAIR:
421			rollup.Egress.Repair = amount
422		case pb.PieceAction_PUT:
423			rollup.Ingress.Usage = amount
424		case pb.PieceAction_PUT_REPAIR:
425			rollup.Ingress.Repair = amount
426		case pb.PieceAction_DELETE:
427			rollup.Delete = amount
428		}
429	}
430
431	var usageRollups []bandwidth.UsageRollup
432	for _, d := range dates {
433		usageRollups = append(usageRollups, *usageRollupsByDate[d])
434	}
435
436	return usageRollups, ErrBandwidth.Wrap(rows.Err())
437}
438
439func getBeginningOfMonth(now time.Time) time.Time {
440	y, m, _ := now.UTC().Date()
441	return time.Date(y, m, 1, 0, 0, 0, 0, time.UTC)
442}
443