1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package satellitedb
5
6import (
7	"context"
8	"database/sql"
9	"errors"
10	"reflect"
11	"time"
12
13	"github.com/jackc/pgx/v4"
14	"github.com/zeebo/errs"
15	"go.uber.org/zap"
16
17	"storj.io/common/pb"
18	"storj.io/common/storj"
19	"storj.io/common/uuid"
20	"storj.io/private/dbutil/pgutil"
21	"storj.io/private/dbutil/pgxutil"
22	"storj.io/storj/satellite/orders"
23	"storj.io/storj/satellite/satellitedb/dbx"
24)
25
26const defaultIntervalSeconds = int(time.Hour / time.Second)
27
28var (
29	// ErrDifferentStorageNodes is returned when ProcessOrders gets orders from different storage nodes.
30	ErrDifferentStorageNodes = errs.Class("different storage nodes")
31	// ErrBucketFromSerial is returned when there is an error trying to get the bucket name from the serial number.
32	ErrBucketFromSerial = errs.Class("bucket from serial number")
33	// ErrUpdateBucketBandwidthSettle is returned when there is an error updating bucket bandwidth.
34	ErrUpdateBucketBandwidthSettle = errs.Class("update bucket bandwidth settle")
35	// ErrProcessOrderWithWindowTx is returned when there is an error with the ProcessOrders transaction.
36	ErrProcessOrderWithWindowTx = errs.Class("process order with window transaction")
37	// ErrGetStoragenodeBandwidthInWindow is returned when there is an error getting all storage node bandwidth for a window.
38	ErrGetStoragenodeBandwidthInWindow = errs.Class("get storagenode bandwidth in window")
39	// ErrCreateStoragenodeBandwidth is returned when there is an error updating storage node bandwidth.
40	ErrCreateStoragenodeBandwidth = errs.Class("create storagenode bandwidth")
41)
42
43type ordersDB struct {
44	db *satelliteDB
45}
46
47// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket.
48func (db *ordersDB) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
49	defer mon.Task()(&ctx)(&err)
50
51	return pgxutil.Conn(ctx, db.db, func(conn *pgx.Conn) error {
52		var batch pgx.Batch
53
54		// TODO decide if we need to have transaction here
55		batch.Queue(`START TRANSACTION`)
56
57		statement := db.db.Rebind(
58			`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
59					VALUES (?, ?, ?, ?, ?, ?, ?, ?)
60					ON CONFLICT(bucket_name, project_id, interval_start, action)
61					DO UPDATE SET allocated = bucket_bandwidth_rollups.allocated + ?`,
62		)
63		batch.Queue(statement, bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, uint64(amount), 0, uint64(amount))
64
65		if action == pb.PieceAction_GET {
66			dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
67			statement = db.db.Rebind(
68				`INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled, egress_dead)
69				VALUES (?, ?, ?, ?, ?)
70				ON CONFLICT(project_id, interval_day)
71				DO UPDATE SET egress_allocated = project_bandwidth_daily_rollups.egress_allocated + EXCLUDED.egress_allocated::BIGINT`,
72			)
73			batch.Queue(statement, projectID[:], dailyInterval, uint64(amount), 0, 0)
74		}
75
76		batch.Queue(`COMMIT TRANSACTION`)
77
78		results := conn.SendBatch(ctx, &batch)
79		defer func() { err = errs.Combine(err, results.Close()) }()
80
81		var errlist errs.Group
82		for i := 0; i < batch.Len(); i++ {
83			_, err := results.Exec()
84			errlist.Add(err)
85		}
86
87		return errlist.Err()
88	})
89}
90
91// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket.
92func (db *ordersDB) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) (err error) {
93	defer mon.Task()(&ctx)(&err)
94
95	return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
96		statement := db.db.Rebind(
97			`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
98			VALUES (?, ?, ?, ?, ?, ?, ?, ?)
99			ON CONFLICT(bucket_name, project_id, interval_start, action)
100			DO UPDATE SET settled = bucket_bandwidth_rollups.settled + ?`,
101		)
102		_, err = db.db.ExecContext(ctx, statement,
103			bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, 0, uint64(settledAmount), uint64(settledAmount),
104		)
105		if err != nil {
106			return ErrUpdateBucketBandwidthSettle.Wrap(err)
107		}
108
109		if action == pb.PieceAction_GET {
110			dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
111			statement = tx.Rebind(
112				`INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled, egress_dead)
113				VALUES (?, ?, ?, ?, ?)
114				ON CONFLICT(project_id, interval_day)
115				DO UPDATE SET
116					egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::BIGINT,
117					egress_dead    = project_bandwidth_daily_rollups.egress_dead + EXCLUDED.egress_dead::BIGINT`,
118			)
119			_, err = tx.Tx.ExecContext(ctx, statement, projectID[:], dailyInterval, 0, uint64(settledAmount), uint64(deadAmount))
120			if err != nil {
121				return err
122			}
123		}
124		return nil
125	})
126}
127
128// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket.
129func (db *ordersDB) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
130	defer mon.Task()(&ctx)(&err)
131
132	statement := db.db.Rebind(
133		`INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled)
134		VALUES (?, ?, ?, ?, ?, ?, ?, ?)
135		ON CONFLICT(bucket_name, project_id, interval_start, action)
136		DO UPDATE SET inline = bucket_bandwidth_rollups.inline + ?`,
137	)
138	_, err = db.db.ExecContext(ctx, statement,
139		bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, uint64(amount), 0, 0, uint64(amount),
140	)
141	if err != nil {
142		return err
143	}
144	return nil
145}
146
147// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node for the given intervalStart time.
148func (db *ordersDB) UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) {
149	defer mon.Task()(&ctx)(&err)
150
151	statement := db.db.Rebind(
152		`INSERT INTO storagenode_bandwidth_rollups (storagenode_id, interval_start, interval_seconds, action, settled)
153		VALUES (?, ?, ?, ?, ?)
154		ON CONFLICT(storagenode_id, interval_start, action)
155		DO UPDATE SET settled = storagenode_bandwidth_rollups.settled + ?`,
156	)
157	_, err = db.db.ExecContext(ctx, statement,
158		storageNode.Bytes(), intervalStart.UTC(), defaultIntervalSeconds, action, uint64(amount), uint64(amount),
159	)
160	if err != nil {
161		return err
162	}
163	return nil
164}
165
166// GetBucketBandwidth gets total bucket bandwidth from period of time.
167func (db *ordersDB) GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (_ int64, err error) {
168	defer mon.Task()(&ctx)(&err)
169
170	var sum *int64
171	query := `SELECT SUM(settled) FROM bucket_bandwidth_rollups WHERE bucket_name = ? AND project_id = ? AND interval_start > ? AND interval_start <= ?`
172	err = db.db.QueryRow(ctx, db.db.Rebind(query), bucketName, projectID[:], from.UTC(), to.UTC()).Scan(&sum)
173	if errors.Is(err, sql.ErrNoRows) || sum == nil {
174		return 0, nil
175	}
176	return *sum, Error.Wrap(err)
177}
178
179// GetStorageNodeBandwidth gets total storage node bandwidth from period of time.
180func (db *ordersDB) GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (_ int64, err error) {
181	defer mon.Task()(&ctx)(&err)
182
183	var sum1, sum2 int64
184
185	err1 := db.db.QueryRow(ctx, db.db.Rebind(`
186		SELECT COALESCE(SUM(settled), 0)
187		FROM storagenode_bandwidth_rollups
188		WHERE storagenode_id = ?
189		  AND interval_start > ?
190		  AND interval_start <= ?
191	`), nodeID.Bytes(), from.UTC(), to.UTC()).Scan(&sum1)
192
193	err2 := db.db.QueryRow(ctx, db.db.Rebind(`
194		SELECT COALESCE(SUM(settled), 0)
195		FROM storagenode_bandwidth_rollups_phase2
196		WHERE storagenode_id = ?
197		  AND interval_start > ?
198		  AND interval_start <= ?
199	`), nodeID.Bytes(), from.UTC(), to.UTC()).Scan(&sum2)
200
201	if err1 != nil && !errors.Is(err1, sql.ErrNoRows) {
202		return 0, err1
203	} else if err2 != nil && !errors.Is(err2, sql.ErrNoRows) {
204		return 0, err2
205	}
206
207	return sum1 + sum2, nil
208}
209
210func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []orders.BucketBandwidthRollup) (err error) {
211	defer mon.Task()(&ctx)(&err)
212
213	return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
214		defer mon.Task()(&ctx)(&err)
215
216		if len(rollups) == 0 {
217			return nil
218		}
219
220		orders.SortBucketBandwidthRollups(rollups)
221
222		intervalStart = intervalStart.UTC()
223		intervalStart = time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, time.UTC)
224
225		var bucketNames [][]byte
226		var projectIDs [][]byte
227		var actionSlice []int32
228		var inlineSlice []int64
229		var allocatedSlice []int64
230		var settledSlice []int64
231
232		type bandwidth struct {
233			Allocated int64
234			Settled   int64
235			Dead      int64
236		}
237		projectRUMap := make(map[uuid.UUID]bandwidth)
238
239		for _, rollup := range rollups {
240			rollup := rollup
241			bucketNames = append(bucketNames, []byte(rollup.BucketName))
242			projectIDs = append(projectIDs, rollup.ProjectID[:])
243			actionSlice = append(actionSlice, int32(rollup.Action))
244			inlineSlice = append(inlineSlice, rollup.Inline)
245			allocatedSlice = append(allocatedSlice, rollup.Allocated)
246			settledSlice = append(settledSlice, rollup.Settled)
247
248			if rollup.Action == pb.PieceAction_GET {
249				b := projectRUMap[rollup.ProjectID]
250				b.Allocated += rollup.Allocated
251				b.Settled += rollup.Settled
252				b.Dead += rollup.Dead
253				projectRUMap[rollup.ProjectID] = b
254			}
255		}
256
257		_, err = tx.Tx.ExecContext(ctx, `
258		INSERT INTO bucket_bandwidth_rollups (
259			bucket_name, project_id,
260			interval_start, interval_seconds,
261			action, inline, allocated, settled)
262		SELECT
263			unnest($1::bytea[]), unnest($2::bytea[]),
264			$3, $4,
265			unnest($5::int4[]), unnest($6::bigint[]), unnest($7::bigint[]), unnest($8::bigint[])
266		ON CONFLICT(bucket_name, project_id, interval_start, action)
267		DO UPDATE SET
268			allocated = bucket_bandwidth_rollups.allocated + EXCLUDED.allocated,
269			inline = bucket_bandwidth_rollups.inline + EXCLUDED.inline,
270			settled = bucket_bandwidth_rollups.settled + EXCLUDED.settled`,
271			pgutil.ByteaArray(bucketNames), pgutil.ByteaArray(projectIDs),
272			intervalStart, defaultIntervalSeconds,
273			pgutil.Int4Array(actionSlice), pgutil.Int8Array(inlineSlice), pgutil.Int8Array(allocatedSlice), pgutil.Int8Array(settledSlice))
274		if err != nil {
275			db.db.log.Error("Bucket bandwidth rollup batch flush failed.", zap.Error(err))
276		}
277
278		projectRUIDs := make([]uuid.UUID, 0, len(projectRUMap))
279		var projectRUAllocated []int64
280		var projectRUSettled []int64
281		var projectRUDead []int64
282		dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC)
283
284		for projectID, v := range projectRUMap {
285			projectRUIDs = append(projectRUIDs, projectID)
286			projectRUAllocated = append(projectRUAllocated, v.Allocated)
287			projectRUSettled = append(projectRUSettled, v.Settled)
288			projectRUDead = append(projectRUDead, v.Dead)
289		}
290
291		if len(projectRUIDs) > 0 {
292			_, err = tx.Tx.ExecContext(ctx, `
293				INSERT INTO project_bandwidth_daily_rollups(project_id, interval_day, egress_allocated, egress_settled, egress_dead)
294					SELECT unnest($1::bytea[]), $2, unnest($3::bigint[]), unnest($4::bigint[]), unnest($5::bigint[])
295				ON CONFLICT(project_id, interval_day)
296				DO UPDATE SET
297					egress_allocated = project_bandwidth_daily_rollups.egress_allocated + EXCLUDED.egress_allocated::bigint,
298					egress_settled   = project_bandwidth_daily_rollups.egress_settled   + EXCLUDED.egress_settled::bigint,
299					egress_dead      = project_bandwidth_daily_rollups.egress_dead      + EXCLUDED.egress_dead::bigint
300			`, pgutil.UUIDArray(projectRUIDs), dailyInterval, pgutil.Int8Array(projectRUAllocated), pgutil.Int8Array(projectRUSettled), pgutil.Int8Array(projectRUDead))
301			if err != nil {
302				db.db.log.Error("Project bandwidth daily rollup batch flush failed.", zap.Error(err))
303			}
304		}
305		return err
306	})
307}
308
309//
310// transaction/batch methods
311//
312
313// UpdateStoragenodeBandwidthSettleWithWindow adds a record to for each action and settled amount.
314// If any of these orders already exist in the database, then all of these orders have already been processed.
315// Orders within a single window may only be processed once to prevent double spending.
316func (db *ordersDB) UpdateStoragenodeBandwidthSettleWithWindow(ctx context.Context, storageNodeID storj.NodeID, actionAmounts map[int32]int64, window time.Time) (status pb.SettlementWithWindowResponse_Status, alreadyProcessed bool, err error) {
317	defer mon.Task()(&ctx)(&err)
318
319	var batchStatus pb.SettlementWithWindowResponse_Status
320	var retryCount int
321	for {
322		err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error {
323			// try to get all rows from the storage node bandwidth table for the 1 hr window
324			// if there are already existing rows for the 1 hr window that means these orders have
325			// already been processed
326			rows, err := tx.All_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart(ctx,
327				dbx.StoragenodeBandwidthRollup_StoragenodeId(storageNodeID[:]),
328				dbx.StoragenodeBandwidthRollup_IntervalStart(window),
329			)
330			if err != nil {
331				return ErrGetStoragenodeBandwidthInWindow.Wrap(err)
332			}
333
334			if len(rows) != 0 {
335				// if there are already rows in the storagenode bandwidth table for this 1 hr window
336				// that means these orders have already been processed
337				// if these orders that the storagenode is trying to process again match what in the
338				// storagenode bandwidth table, then send a successful response to the storagenode
339				// so they don't keep trying to settle these orders again
340				// if these orders do not match what we have in the storage node bandwidth table then send
341				// back an invalid response
342				if SettledAmountsMatch(rows, actionAmounts) {
343					batchStatus = pb.SettlementWithWindowResponse_ACCEPTED
344					alreadyProcessed = true
345					return nil
346				}
347				batchStatus = pb.SettlementWithWindowResponse_REJECTED
348				return nil
349			}
350			// if there aren't any rows in the storagenode bandwidth table for this 1 hr window
351			// that means these orders have not been processed before so we can continue to process them
352			for action, amount := range actionAmounts {
353				_, err := tx.Create_StoragenodeBandwidthRollup(ctx,
354					dbx.StoragenodeBandwidthRollup_StoragenodeId(storageNodeID[:]),
355					dbx.StoragenodeBandwidthRollup_IntervalStart(window),
356					dbx.StoragenodeBandwidthRollup_IntervalSeconds(uint(defaultIntervalSeconds)),
357					dbx.StoragenodeBandwidthRollup_Action(uint(action)),
358					dbx.StoragenodeBandwidthRollup_Settled(uint64(amount)),
359					dbx.StoragenodeBandwidthRollup_Create_Fields{},
360				)
361				if err != nil {
362					return ErrCreateStoragenodeBandwidth.Wrap(err)
363				}
364			}
365
366			batchStatus = pb.SettlementWithWindowResponse_ACCEPTED
367			return nil
368		})
369		if dbx.IsConstraintError(err) {
370			retryCount++
371			if retryCount > 5 {
372				return 0, alreadyProcessed, errs.New("process order with window retry count too high")
373			}
374			continue
375		} else if err != nil {
376			return 0, alreadyProcessed, ErrProcessOrderWithWindowTx.Wrap(err)
377		}
378		break
379	}
380
381	return batchStatus, alreadyProcessed, nil
382}
383
384// SettledAmountsMatch checks if database rows match the orders. If the settled amount for
385// each action are not the same then false is returned.
386func SettledAmountsMatch(rows []*dbx.StoragenodeBandwidthRollup, orderActionAmounts map[int32]int64) bool {
387	rowsSumByAction := map[int32]int64{}
388	for _, row := range rows {
389		rowsSumByAction[int32(row.Action)] += int64(row.Settled)
390	}
391
392	return reflect.DeepEqual(rowsSumByAction, orderActionAmounts)
393}
394