1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package storagenodedb 5 6import ( 7 "context" 8 "database/sql" 9 "errors" 10 "sync" 11 "time" 12 13 "github.com/zeebo/errs" 14 15 "storj.io/common/pb" 16 "storj.io/common/storj" 17 "storj.io/private/dbutil" 18 "storj.io/storj/private/date" 19 "storj.io/storj/storagenode/bandwidth" 20) 21 22// ErrBandwidth represents errors from the bandwidthdb database. 23var ErrBandwidth = errs.Class("bandwidthdb") 24 25// BandwidthDBName represents the database name. 26const BandwidthDBName = "bandwidth" 27 28type bandwidthDB struct { 29 // Moved to top of struct to resolve alignment issue with atomic operations on ARM 30 usedSpace int64 31 usedMu sync.RWMutex 32 usedSince time.Time 33 34 dbContainerImpl 35} 36 37// Add adds bandwidth usage to the table. 38func (db *bandwidthDB) Add(ctx context.Context, satelliteID storj.NodeID, action pb.PieceAction, amount int64, created time.Time) (err error) { 39 defer mon.Task()(&ctx)(&err) 40 _, err = db.ExecContext(ctx, ` 41 INSERT INTO 42 bandwidth_usage(satellite_id, action, amount, created_at) 43 VALUES(?, ?, ?, datetime(?))`, satelliteID, action, amount, created.UTC()) 44 if err == nil { 45 db.usedMu.Lock() 46 defer db.usedMu.Unlock() 47 48 beginningOfMonth := getBeginningOfMonth(created.UTC()) 49 if beginningOfMonth.Equal(db.usedSince) { 50 db.usedSpace += amount 51 } else if beginningOfMonth.After(db.usedSince) { 52 usage, err := db.Summary(ctx, beginningOfMonth, time.Now()) 53 if err != nil { 54 return err 55 } 56 db.usedSince = beginningOfMonth 57 db.usedSpace = usage.Total() 58 } 59 } 60 return ErrBandwidth.Wrap(err) 61} 62 63// MonthSummary returns summary of the current months bandwidth usages. 64func (db *bandwidthDB) MonthSummary(ctx context.Context, now time.Time) (_ int64, err error) { 65 defer mon.Task()(&ctx)(&err) 66 67 db.usedMu.RLock() 68 beginningOfMonth := getBeginningOfMonth(now) 69 if beginningOfMonth.Equal(db.usedSince) { 70 defer db.usedMu.RUnlock() 71 return db.usedSpace, nil 72 } 73 db.usedMu.RUnlock() 74 75 usage, err := db.Summary(ctx, beginningOfMonth, now) 76 if err != nil { 77 return 0, err 78 } 79 // Just return the usage, don't update the cache. Let add handle updates 80 return usage.Total(), nil 81} 82 83// actionFilter sums bandwidth depending on piece action type. 84type actionFilter func(action pb.PieceAction, amount int64, usage *bandwidth.Usage) 85 86var ( 87 // ingressFilter sums put and put repair. 88 ingressFilter actionFilter = func(action pb.PieceAction, amount int64, usage *bandwidth.Usage) { 89 switch action { 90 case pb.PieceAction_PUT, pb.PieceAction_PUT_REPAIR: 91 usage.Include(action, amount) 92 } 93 } 94 95 // egressFilter sums get, get audit and get repair. 96 egressFilter actionFilter = func(action pb.PieceAction, amount int64, usage *bandwidth.Usage) { 97 switch action { 98 case pb.PieceAction_GET, pb.PieceAction_GET_AUDIT, pb.PieceAction_GET_REPAIR: 99 usage.Include(action, amount) 100 } 101 } 102 103 // bandwidthFilter sums all bandwidth. 104 bandwidthFilter actionFilter = func(action pb.PieceAction, amount int64, usage *bandwidth.Usage) { 105 usage.Include(action, amount) 106 } 107) 108 109// Summary returns summary of bandwidth usages for all satellites. 110func (db *bandwidthDB) Summary(ctx context.Context, from, to time.Time) (_ *bandwidth.Usage, err error) { 111 defer mon.Task()(&ctx)(&err) 112 113 return db.getSummary(ctx, from, to, bandwidthFilter) 114} 115 116// EgressSummary returns summary of egress usages for all satellites. 117func (db *bandwidthDB) EgressSummary(ctx context.Context, from, to time.Time) (_ *bandwidth.Usage, err error) { 118 defer mon.Task()(&ctx)(&err) 119 120 return db.getSummary(ctx, from, to, egressFilter) 121} 122 123// IngressSummary returns summary of ingress usages for all satellites. 124func (db *bandwidthDB) IngressSummary(ctx context.Context, from, to time.Time) (_ *bandwidth.Usage, err error) { 125 defer mon.Task()(&ctx)(&err) 126 127 return db.getSummary(ctx, from, to, ingressFilter) 128} 129 130// getSummary returns bandwidth data for all satellites. 131func (db *bandwidthDB) getSummary(ctx context.Context, from, to time.Time, filter actionFilter) (_ *bandwidth.Usage, err error) { 132 defer mon.Task()(&ctx)(&err) 133 134 usage := &bandwidth.Usage{} 135 136 from, to = from.UTC(), to.UTC() 137 138 rows, err := db.QueryContext(ctx, ` 139 SELECT action, sum(a) amount from( 140 SELECT action, sum(amount) a 141 FROM bandwidth_usage 142 WHERE datetime(?) <= created_at AND created_at <= datetime(?) 143 GROUP BY action 144 UNION ALL 145 SELECT action, sum(amount) a 146 FROM bandwidth_usage_rollups 147 WHERE datetime(?) <= interval_start AND interval_start <= datetime(?) 148 GROUP BY action 149 ) GROUP BY action; 150 `, from, to, from, to) 151 if err != nil { 152 if errors.Is(err, sql.ErrNoRows) { 153 return usage, nil 154 } 155 return nil, ErrBandwidth.Wrap(err) 156 } 157 defer func() { err = errs.Combine(err, rows.Close()) }() 158 159 for rows.Next() { 160 var action pb.PieceAction 161 var amount int64 162 163 err := rows.Scan(&action, &amount) 164 if err != nil { 165 return nil, ErrBandwidth.Wrap(err) 166 } 167 168 filter(action, amount, usage) 169 } 170 171 return usage, ErrBandwidth.Wrap(rows.Err()) 172} 173 174// SatelliteSummary returns summary of bandwidth usages for a particular satellite. 175func (db *bandwidthDB) SatelliteSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ *bandwidth.Usage, err error) { 176 defer mon.Task()(&ctx, satelliteID, from, to)(&err) 177 178 return db.getSatelliteSummary(ctx, satelliteID, from, to, bandwidthFilter) 179} 180 181// SatelliteEgressSummary returns summary of egress usage for a particular satellite. 182func (db *bandwidthDB) SatelliteEgressSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ *bandwidth.Usage, err error) { 183 defer mon.Task()(&ctx, satelliteID, from, to)(&err) 184 185 return db.getSatelliteSummary(ctx, satelliteID, from, to, egressFilter) 186} 187 188// SatelliteIngressSummary returns summary of ingress usage for a particular satellite. 189func (db *bandwidthDB) SatelliteIngressSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ *bandwidth.Usage, err error) { 190 defer mon.Task()(&ctx, satelliteID, from, to)(&err) 191 192 return db.getSatelliteSummary(ctx, satelliteID, from, to, ingressFilter) 193} 194 195// getSummary returns bandwidth data for a particular satellite. 196func (db *bandwidthDB) getSatelliteSummary(ctx context.Context, satelliteID storj.NodeID, from, to time.Time, filter actionFilter) (_ *bandwidth.Usage, err error) { 197 defer mon.Task()(&ctx, satelliteID, from, to)(&err) 198 199 from, to = from.UTC(), to.UTC() 200 201 query := `SELECT action, sum(a) amount from( 202 SELECT action, sum(amount) a 203 FROM bandwidth_usage 204 WHERE datetime(?) <= created_at AND created_at <= datetime(?) 205 AND satellite_id = ? 206 GROUP BY action 207 UNION ALL 208 SELECT action, sum(amount) a 209 FROM bandwidth_usage_rollups 210 WHERE datetime(?) <= interval_start AND interval_start <= datetime(?) 211 AND satellite_id = ? 212 GROUP BY action 213 ) GROUP BY action;` 214 215 rows, err := db.QueryContext(ctx, query, from, to, satelliteID, from, to, satelliteID) 216 if err != nil { 217 return nil, ErrBandwidth.Wrap(err) 218 } 219 defer func() { 220 err = ErrBandwidth.Wrap(errs.Combine(err, rows.Close())) 221 }() 222 223 usage := new(bandwidth.Usage) 224 for rows.Next() { 225 var action pb.PieceAction 226 var amount int64 227 228 err := rows.Scan(&action, &amount) 229 if err != nil { 230 return nil, err 231 } 232 233 filter(action, amount, usage) 234 } 235 236 return usage, ErrBandwidth.Wrap(rows.Err()) 237} 238 239// SummaryBySatellite returns summary of bandwidth usage grouping by satellite. 240func (db *bandwidthDB) SummaryBySatellite(ctx context.Context, from, to time.Time) (_ map[storj.NodeID]*bandwidth.Usage, err error) { 241 defer mon.Task()(&ctx)(&err) 242 243 entries := map[storj.NodeID]*bandwidth.Usage{} 244 245 from, to = from.UTC(), to.UTC() 246 247 rows, err := db.QueryContext(ctx, ` 248 SELECT satellite_id, action, sum(a) amount from( 249 SELECT satellite_id, action, sum(amount) a 250 FROM bandwidth_usage 251 WHERE datetime(?) <= created_at AND created_at <= datetime(?) 252 GROUP BY satellite_id, action 253 UNION ALL 254 SELECT satellite_id, action, sum(amount) a 255 FROM bandwidth_usage_rollups 256 WHERE datetime(?) <= interval_start AND interval_start <= datetime(?) 257 GROUP BY satellite_id, action 258 ) GROUP BY satellite_id, action; 259 `, from, to, from, to) 260 if err != nil { 261 if errors.Is(err, sql.ErrNoRows) { 262 return entries, nil 263 } 264 return nil, ErrBandwidth.Wrap(err) 265 } 266 defer func() { err = errs.Combine(err, rows.Close()) }() 267 268 for rows.Next() { 269 var satelliteID storj.NodeID 270 var action pb.PieceAction 271 var amount int64 272 273 err := rows.Scan(&satelliteID, &action, &amount) 274 if err != nil { 275 return nil, ErrBandwidth.Wrap(err) 276 } 277 278 entry, ok := entries[satelliteID] 279 if !ok { 280 entry = &bandwidth.Usage{} 281 entries[satelliteID] = entry 282 } 283 284 entry.Include(action, amount) 285 } 286 287 return entries, ErrBandwidth.Wrap(rows.Err()) 288} 289 290// Rollup bandwidth_usage data earlier than the current hour, then delete the rolled up records. 291func (db *bandwidthDB) Rollup(ctx context.Context) (err error) { 292 defer mon.Task()(&ctx)(&err) 293 294 now := time.Now().UTC() 295 296 // Go back an hour to give us room for late persists 297 hour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()).Add(-time.Hour) 298 299 tx, err := db.BeginTx(ctx, nil) 300 if err != nil { 301 return ErrBandwidth.Wrap(err) 302 } 303 304 defer func() { 305 if err == nil { 306 err = tx.Commit() 307 } else { 308 err = errs.Combine(err, tx.Rollback()) 309 } 310 }() 311 312 result, err := tx.ExecContext(ctx, ` 313 INSERT INTO bandwidth_usage_rollups (interval_start, satellite_id, action, amount) 314 SELECT datetime(strftime('%Y-%m-%dT%H:00:00', created_at)) created_hr, satellite_id, action, SUM(amount) 315 FROM bandwidth_usage 316 WHERE created_at < datetime(?) 317 GROUP BY created_hr, satellite_id, action 318 ON CONFLICT(interval_start, satellite_id, action) 319 DO UPDATE SET amount = bandwidth_usage_rollups.amount + excluded.amount; 320 321 DELETE FROM bandwidth_usage WHERE created_at < datetime(?); 322 `, hour, hour) 323 if err != nil { 324 return ErrBandwidth.Wrap(err) 325 } 326 327 _, err = result.RowsAffected() 328 if err != nil { 329 return ErrBandwidth.Wrap(err) 330 } 331 332 return nil 333} 334 335// GetDailyRollups returns slice of daily bandwidth usage rollups for provided time range, 336// sorted in ascending order. 337func (db *bandwidthDB) GetDailyRollups(ctx context.Context, from, to time.Time) (_ []bandwidth.UsageRollup, err error) { 338 defer mon.Task()(&ctx, from, to)(&err) 339 340 since, _ := date.DayBoundary(from.UTC()) 341 _, before := date.DayBoundary(to.UTC()) 342 343 return db.getDailyUsageRollups(ctx, 344 "WHERE datetime(?) <= interval_start AND interval_start <= datetime(?)", 345 since, before) 346} 347 348// GetDailySatelliteRollups returns slice of daily bandwidth usage for provided time range, 349// sorted in ascending order for a particular satellite. 350func (db *bandwidthDB) GetDailySatelliteRollups(ctx context.Context, satelliteID storj.NodeID, from, to time.Time) (_ []bandwidth.UsageRollup, err error) { 351 defer mon.Task()(&ctx, satelliteID, from, to)(&err) 352 353 since, _ := date.DayBoundary(from.UTC()) 354 _, before := date.DayBoundary(to.UTC()) 355 356 return db.getDailyUsageRollups(ctx, 357 "WHERE satellite_id = ? AND datetime(?) <= interval_start AND interval_start <= datetime(?)", 358 satelliteID, since, before) 359} 360 361// getDailyUsageRollups returns slice of grouped by date bandwidth usage rollups 362// sorted in ascending order and applied condition if any. 363func (db *bandwidthDB) getDailyUsageRollups(ctx context.Context, cond string, args ...interface{}) (_ []bandwidth.UsageRollup, err error) { 364 defer mon.Task()(&ctx)(&err) 365 366 query := `SELECT action, sum(a) as amount, DATETIME(DATE(interval_start)) as date FROM ( 367 SELECT action, sum(amount) as a, created_at AS interval_start 368 FROM bandwidth_usage 369 ` + cond + ` 370 GROUP BY interval_start, action 371 UNION ALL 372 SELECT action, sum(amount) as a, interval_start 373 FROM bandwidth_usage_rollups 374 ` + cond + ` 375 GROUP BY interval_start, action 376 ) GROUP BY date, action 377 ORDER BY interval_start` 378 379 // duplicate args as they are used twice 380 args = append(args, args...) 381 382 rows, err := db.QueryContext(ctx, query, args...) 383 if err != nil { 384 return nil, ErrBandwidth.Wrap(err) 385 } 386 defer func() { 387 err = ErrBandwidth.Wrap(errs.Combine(err, rows.Close())) 388 }() 389 390 var dates []time.Time 391 usageRollupsByDate := make(map[time.Time]*bandwidth.UsageRollup) 392 393 for rows.Next() { 394 var action int32 395 var amount int64 396 var intervalStartN dbutil.NullTime 397 398 err = rows.Scan(&action, &amount, &intervalStartN) 399 if err != nil { 400 return nil, err 401 } 402 403 intervalStart := intervalStartN.Time 404 405 rollup, ok := usageRollupsByDate[intervalStart] 406 if !ok { 407 rollup = &bandwidth.UsageRollup{ 408 IntervalStart: intervalStart, 409 } 410 411 dates = append(dates, intervalStart) 412 usageRollupsByDate[intervalStart] = rollup 413 } 414 415 switch pb.PieceAction(action) { 416 case pb.PieceAction_GET: 417 rollup.Egress.Usage = amount 418 case pb.PieceAction_GET_AUDIT: 419 rollup.Egress.Audit = amount 420 case pb.PieceAction_GET_REPAIR: 421 rollup.Egress.Repair = amount 422 case pb.PieceAction_PUT: 423 rollup.Ingress.Usage = amount 424 case pb.PieceAction_PUT_REPAIR: 425 rollup.Ingress.Repair = amount 426 case pb.PieceAction_DELETE: 427 rollup.Delete = amount 428 } 429 } 430 431 var usageRollups []bandwidth.UsageRollup 432 for _, d := range dates { 433 usageRollups = append(usageRollups, *usageRollupsByDate[d]) 434 } 435 436 return usageRollups, ErrBandwidth.Wrap(rows.Err()) 437} 438 439func getBeginningOfMonth(now time.Time) time.Time { 440 y, m, _ := now.UTC().Date() 441 return time.Date(y, m, 1, 0, 0, 0, 0, time.UTC) 442} 443