1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package satellitedb 5 6import ( 7 "context" 8 "database/sql" 9 "errors" 10 "reflect" 11 "time" 12 13 "github.com/jackc/pgx/v4" 14 "github.com/zeebo/errs" 15 "go.uber.org/zap" 16 17 "storj.io/common/pb" 18 "storj.io/common/storj" 19 "storj.io/common/uuid" 20 "storj.io/private/dbutil/pgutil" 21 "storj.io/private/dbutil/pgxutil" 22 "storj.io/storj/satellite/orders" 23 "storj.io/storj/satellite/satellitedb/dbx" 24) 25 26const defaultIntervalSeconds = int(time.Hour / time.Second) 27 28var ( 29 // ErrDifferentStorageNodes is returned when ProcessOrders gets orders from different storage nodes. 30 ErrDifferentStorageNodes = errs.Class("different storage nodes") 31 // ErrBucketFromSerial is returned when there is an error trying to get the bucket name from the serial number. 32 ErrBucketFromSerial = errs.Class("bucket from serial number") 33 // ErrUpdateBucketBandwidthSettle is returned when there is an error updating bucket bandwidth. 34 ErrUpdateBucketBandwidthSettle = errs.Class("update bucket bandwidth settle") 35 // ErrProcessOrderWithWindowTx is returned when there is an error with the ProcessOrders transaction. 36 ErrProcessOrderWithWindowTx = errs.Class("process order with window transaction") 37 // ErrGetStoragenodeBandwidthInWindow is returned when there is an error getting all storage node bandwidth for a window. 38 ErrGetStoragenodeBandwidthInWindow = errs.Class("get storagenode bandwidth in window") 39 // ErrCreateStoragenodeBandwidth is returned when there is an error updating storage node bandwidth. 40 ErrCreateStoragenodeBandwidth = errs.Class("create storagenode bandwidth") 41) 42 43type ordersDB struct { 44 db *satelliteDB 45} 46 47// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket. 48func (db *ordersDB) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) { 49 defer mon.Task()(&ctx)(&err) 50 51 return pgxutil.Conn(ctx, db.db, func(conn *pgx.Conn) error { 52 var batch pgx.Batch 53 54 // TODO decide if we need to have transaction here 55 batch.Queue(`START TRANSACTION`) 56 57 statement := db.db.Rebind( 58 `INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled) 59 VALUES (?, ?, ?, ?, ?, ?, ?, ?) 60 ON CONFLICT(bucket_name, project_id, interval_start, action) 61 DO UPDATE SET allocated = bucket_bandwidth_rollups.allocated + ?`, 62 ) 63 batch.Queue(statement, bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, uint64(amount), 0, uint64(amount)) 64 65 if action == pb.PieceAction_GET { 66 dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC) 67 statement = db.db.Rebind( 68 `INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled, egress_dead) 69 VALUES (?, ?, ?, ?, ?) 70 ON CONFLICT(project_id, interval_day) 71 DO UPDATE SET egress_allocated = project_bandwidth_daily_rollups.egress_allocated + EXCLUDED.egress_allocated::BIGINT`, 72 ) 73 batch.Queue(statement, projectID[:], dailyInterval, uint64(amount), 0, 0) 74 } 75 76 batch.Queue(`COMMIT TRANSACTION`) 77 78 results := conn.SendBatch(ctx, &batch) 79 defer func() { err = errs.Combine(err, results.Close()) }() 80 81 var errlist errs.Group 82 for i := 0; i < batch.Len(); i++ { 83 _, err := results.Exec() 84 errlist.Add(err) 85 } 86 87 return errlist.Err() 88 }) 89} 90 91// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket. 92func (db *ordersDB) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) (err error) { 93 defer mon.Task()(&ctx)(&err) 94 95 return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { 96 statement := db.db.Rebind( 97 `INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled) 98 VALUES (?, ?, ?, ?, ?, ?, ?, ?) 99 ON CONFLICT(bucket_name, project_id, interval_start, action) 100 DO UPDATE SET settled = bucket_bandwidth_rollups.settled + ?`, 101 ) 102 _, err = db.db.ExecContext(ctx, statement, 103 bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, 0, 0, uint64(settledAmount), uint64(settledAmount), 104 ) 105 if err != nil { 106 return ErrUpdateBucketBandwidthSettle.Wrap(err) 107 } 108 109 if action == pb.PieceAction_GET { 110 dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC) 111 statement = tx.Rebind( 112 `INSERT INTO project_bandwidth_daily_rollups (project_id, interval_day, egress_allocated, egress_settled, egress_dead) 113 VALUES (?, ?, ?, ?, ?) 114 ON CONFLICT(project_id, interval_day) 115 DO UPDATE SET 116 egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::BIGINT, 117 egress_dead = project_bandwidth_daily_rollups.egress_dead + EXCLUDED.egress_dead::BIGINT`, 118 ) 119 _, err = tx.Tx.ExecContext(ctx, statement, projectID[:], dailyInterval, 0, uint64(settledAmount), uint64(deadAmount)) 120 if err != nil { 121 return err 122 } 123 } 124 return nil 125 }) 126} 127 128// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket. 129func (db *ordersDB) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) { 130 defer mon.Task()(&ctx)(&err) 131 132 statement := db.db.Rebind( 133 `INSERT INTO bucket_bandwidth_rollups (bucket_name, project_id, interval_start, interval_seconds, action, inline, allocated, settled) 134 VALUES (?, ?, ?, ?, ?, ?, ?, ?) 135 ON CONFLICT(bucket_name, project_id, interval_start, action) 136 DO UPDATE SET inline = bucket_bandwidth_rollups.inline + ?`, 137 ) 138 _, err = db.db.ExecContext(ctx, statement, 139 bucketName, projectID[:], intervalStart.UTC(), defaultIntervalSeconds, action, uint64(amount), 0, 0, uint64(amount), 140 ) 141 if err != nil { 142 return err 143 } 144 return nil 145} 146 147// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node for the given intervalStart time. 148func (db *ordersDB) UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) (err error) { 149 defer mon.Task()(&ctx)(&err) 150 151 statement := db.db.Rebind( 152 `INSERT INTO storagenode_bandwidth_rollups (storagenode_id, interval_start, interval_seconds, action, settled) 153 VALUES (?, ?, ?, ?, ?) 154 ON CONFLICT(storagenode_id, interval_start, action) 155 DO UPDATE SET settled = storagenode_bandwidth_rollups.settled + ?`, 156 ) 157 _, err = db.db.ExecContext(ctx, statement, 158 storageNode.Bytes(), intervalStart.UTC(), defaultIntervalSeconds, action, uint64(amount), uint64(amount), 159 ) 160 if err != nil { 161 return err 162 } 163 return nil 164} 165 166// GetBucketBandwidth gets total bucket bandwidth from period of time. 167func (db *ordersDB) GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (_ int64, err error) { 168 defer mon.Task()(&ctx)(&err) 169 170 var sum *int64 171 query := `SELECT SUM(settled) FROM bucket_bandwidth_rollups WHERE bucket_name = ? AND project_id = ? AND interval_start > ? AND interval_start <= ?` 172 err = db.db.QueryRow(ctx, db.db.Rebind(query), bucketName, projectID[:], from.UTC(), to.UTC()).Scan(&sum) 173 if errors.Is(err, sql.ErrNoRows) || sum == nil { 174 return 0, nil 175 } 176 return *sum, Error.Wrap(err) 177} 178 179// GetStorageNodeBandwidth gets total storage node bandwidth from period of time. 180func (db *ordersDB) GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (_ int64, err error) { 181 defer mon.Task()(&ctx)(&err) 182 183 var sum1, sum2 int64 184 185 err1 := db.db.QueryRow(ctx, db.db.Rebind(` 186 SELECT COALESCE(SUM(settled), 0) 187 FROM storagenode_bandwidth_rollups 188 WHERE storagenode_id = ? 189 AND interval_start > ? 190 AND interval_start <= ? 191 `), nodeID.Bytes(), from.UTC(), to.UTC()).Scan(&sum1) 192 193 err2 := db.db.QueryRow(ctx, db.db.Rebind(` 194 SELECT COALESCE(SUM(settled), 0) 195 FROM storagenode_bandwidth_rollups_phase2 196 WHERE storagenode_id = ? 197 AND interval_start > ? 198 AND interval_start <= ? 199 `), nodeID.Bytes(), from.UTC(), to.UTC()).Scan(&sum2) 200 201 if err1 != nil && !errors.Is(err1, sql.ErrNoRows) { 202 return 0, err1 203 } else if err2 != nil && !errors.Is(err2, sql.ErrNoRows) { 204 return 0, err2 205 } 206 207 return sum1 + sum2, nil 208} 209 210func (db *ordersDB) UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []orders.BucketBandwidthRollup) (err error) { 211 defer mon.Task()(&ctx)(&err) 212 213 return db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { 214 defer mon.Task()(&ctx)(&err) 215 216 if len(rollups) == 0 { 217 return nil 218 } 219 220 orders.SortBucketBandwidthRollups(rollups) 221 222 intervalStart = intervalStart.UTC() 223 intervalStart = time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, time.UTC) 224 225 var bucketNames [][]byte 226 var projectIDs [][]byte 227 var actionSlice []int32 228 var inlineSlice []int64 229 var allocatedSlice []int64 230 var settledSlice []int64 231 232 type bandwidth struct { 233 Allocated int64 234 Settled int64 235 Dead int64 236 } 237 projectRUMap := make(map[uuid.UUID]bandwidth) 238 239 for _, rollup := range rollups { 240 rollup := rollup 241 bucketNames = append(bucketNames, []byte(rollup.BucketName)) 242 projectIDs = append(projectIDs, rollup.ProjectID[:]) 243 actionSlice = append(actionSlice, int32(rollup.Action)) 244 inlineSlice = append(inlineSlice, rollup.Inline) 245 allocatedSlice = append(allocatedSlice, rollup.Allocated) 246 settledSlice = append(settledSlice, rollup.Settled) 247 248 if rollup.Action == pb.PieceAction_GET { 249 b := projectRUMap[rollup.ProjectID] 250 b.Allocated += rollup.Allocated 251 b.Settled += rollup.Settled 252 b.Dead += rollup.Dead 253 projectRUMap[rollup.ProjectID] = b 254 } 255 } 256 257 _, err = tx.Tx.ExecContext(ctx, ` 258 INSERT INTO bucket_bandwidth_rollups ( 259 bucket_name, project_id, 260 interval_start, interval_seconds, 261 action, inline, allocated, settled) 262 SELECT 263 unnest($1::bytea[]), unnest($2::bytea[]), 264 $3, $4, 265 unnest($5::int4[]), unnest($6::bigint[]), unnest($7::bigint[]), unnest($8::bigint[]) 266 ON CONFLICT(bucket_name, project_id, interval_start, action) 267 DO UPDATE SET 268 allocated = bucket_bandwidth_rollups.allocated + EXCLUDED.allocated, 269 inline = bucket_bandwidth_rollups.inline + EXCLUDED.inline, 270 settled = bucket_bandwidth_rollups.settled + EXCLUDED.settled`, 271 pgutil.ByteaArray(bucketNames), pgutil.ByteaArray(projectIDs), 272 intervalStart, defaultIntervalSeconds, 273 pgutil.Int4Array(actionSlice), pgutil.Int8Array(inlineSlice), pgutil.Int8Array(allocatedSlice), pgutil.Int8Array(settledSlice)) 274 if err != nil { 275 db.db.log.Error("Bucket bandwidth rollup batch flush failed.", zap.Error(err)) 276 } 277 278 projectRUIDs := make([]uuid.UUID, 0, len(projectRUMap)) 279 var projectRUAllocated []int64 280 var projectRUSettled []int64 281 var projectRUDead []int64 282 dailyInterval := time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), 0, 0, 0, 0, time.UTC) 283 284 for projectID, v := range projectRUMap { 285 projectRUIDs = append(projectRUIDs, projectID) 286 projectRUAllocated = append(projectRUAllocated, v.Allocated) 287 projectRUSettled = append(projectRUSettled, v.Settled) 288 projectRUDead = append(projectRUDead, v.Dead) 289 } 290 291 if len(projectRUIDs) > 0 { 292 _, err = tx.Tx.ExecContext(ctx, ` 293 INSERT INTO project_bandwidth_daily_rollups(project_id, interval_day, egress_allocated, egress_settled, egress_dead) 294 SELECT unnest($1::bytea[]), $2, unnest($3::bigint[]), unnest($4::bigint[]), unnest($5::bigint[]) 295 ON CONFLICT(project_id, interval_day) 296 DO UPDATE SET 297 egress_allocated = project_bandwidth_daily_rollups.egress_allocated + EXCLUDED.egress_allocated::bigint, 298 egress_settled = project_bandwidth_daily_rollups.egress_settled + EXCLUDED.egress_settled::bigint, 299 egress_dead = project_bandwidth_daily_rollups.egress_dead + EXCLUDED.egress_dead::bigint 300 `, pgutil.UUIDArray(projectRUIDs), dailyInterval, pgutil.Int8Array(projectRUAllocated), pgutil.Int8Array(projectRUSettled), pgutil.Int8Array(projectRUDead)) 301 if err != nil { 302 db.db.log.Error("Project bandwidth daily rollup batch flush failed.", zap.Error(err)) 303 } 304 } 305 return err 306 }) 307} 308 309// 310// transaction/batch methods 311// 312 313// UpdateStoragenodeBandwidthSettleWithWindow adds a record to for each action and settled amount. 314// If any of these orders already exist in the database, then all of these orders have already been processed. 315// Orders within a single window may only be processed once to prevent double spending. 316func (db *ordersDB) UpdateStoragenodeBandwidthSettleWithWindow(ctx context.Context, storageNodeID storj.NodeID, actionAmounts map[int32]int64, window time.Time) (status pb.SettlementWithWindowResponse_Status, alreadyProcessed bool, err error) { 317 defer mon.Task()(&ctx)(&err) 318 319 var batchStatus pb.SettlementWithWindowResponse_Status 320 var retryCount int 321 for { 322 err = db.db.WithTx(ctx, func(ctx context.Context, tx *dbx.Tx) error { 323 // try to get all rows from the storage node bandwidth table for the 1 hr window 324 // if there are already existing rows for the 1 hr window that means these orders have 325 // already been processed 326 rows, err := tx.All_StoragenodeBandwidthRollup_By_StoragenodeId_And_IntervalStart(ctx, 327 dbx.StoragenodeBandwidthRollup_StoragenodeId(storageNodeID[:]), 328 dbx.StoragenodeBandwidthRollup_IntervalStart(window), 329 ) 330 if err != nil { 331 return ErrGetStoragenodeBandwidthInWindow.Wrap(err) 332 } 333 334 if len(rows) != 0 { 335 // if there are already rows in the storagenode bandwidth table for this 1 hr window 336 // that means these orders have already been processed 337 // if these orders that the storagenode is trying to process again match what in the 338 // storagenode bandwidth table, then send a successful response to the storagenode 339 // so they don't keep trying to settle these orders again 340 // if these orders do not match what we have in the storage node bandwidth table then send 341 // back an invalid response 342 if SettledAmountsMatch(rows, actionAmounts) { 343 batchStatus = pb.SettlementWithWindowResponse_ACCEPTED 344 alreadyProcessed = true 345 return nil 346 } 347 batchStatus = pb.SettlementWithWindowResponse_REJECTED 348 return nil 349 } 350 // if there aren't any rows in the storagenode bandwidth table for this 1 hr window 351 // that means these orders have not been processed before so we can continue to process them 352 for action, amount := range actionAmounts { 353 _, err := tx.Create_StoragenodeBandwidthRollup(ctx, 354 dbx.StoragenodeBandwidthRollup_StoragenodeId(storageNodeID[:]), 355 dbx.StoragenodeBandwidthRollup_IntervalStart(window), 356 dbx.StoragenodeBandwidthRollup_IntervalSeconds(uint(defaultIntervalSeconds)), 357 dbx.StoragenodeBandwidthRollup_Action(uint(action)), 358 dbx.StoragenodeBandwidthRollup_Settled(uint64(amount)), 359 dbx.StoragenodeBandwidthRollup_Create_Fields{}, 360 ) 361 if err != nil { 362 return ErrCreateStoragenodeBandwidth.Wrap(err) 363 } 364 } 365 366 batchStatus = pb.SettlementWithWindowResponse_ACCEPTED 367 return nil 368 }) 369 if dbx.IsConstraintError(err) { 370 retryCount++ 371 if retryCount > 5 { 372 return 0, alreadyProcessed, errs.New("process order with window retry count too high") 373 } 374 continue 375 } else if err != nil { 376 return 0, alreadyProcessed, ErrProcessOrderWithWindowTx.Wrap(err) 377 } 378 break 379 } 380 381 return batchStatus, alreadyProcessed, nil 382} 383 384// SettledAmountsMatch checks if database rows match the orders. If the settled amount for 385// each action are not the same then false is returned. 386func SettledAmountsMatch(rows []*dbx.StoragenodeBandwidthRollup, orderActionAmounts map[int32]int64) bool { 387 rowsSumByAction := map[int32]int64{} 388 for _, row := range rows { 389 rowsSumByAction[int32(row.Action)] += int64(row.Settled) 390 } 391 392 return reflect.DeepEqual(rowsSumByAction, orderActionAmounts) 393} 394