1// Copyright (C) 2019 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package satellitedb 5 6import ( 7 "bytes" 8 "context" 9 "database/sql" 10 "errors" 11 "sort" 12 "time" 13 14 "github.com/zeebo/errs" 15 16 "storj.io/common/storj" 17 "storj.io/common/uuid" 18 "storj.io/private/dbutil" 19 "storj.io/private/dbutil/pgutil" 20 "storj.io/private/tagsql" 21 "storj.io/storj/satellite/gracefulexit" 22 "storj.io/storj/satellite/metabase" 23 "storj.io/storj/satellite/satellitedb/dbx" 24) 25 26type gracefulexitDB struct { 27 db *satelliteDB 28} 29 30const ( 31 deleteExitProgressBatchSize = 1000 32) 33 34// IncrementProgress increments transfer stats for a node. 35func (db *gracefulexitDB) IncrementProgress(ctx context.Context, nodeID storj.NodeID, bytes int64, successfulTransfers int64, failedTransfers int64) (err error) { 36 defer mon.Task()(&ctx)(&err) 37 38 statement := db.db.Rebind( 39 `INSERT INTO graceful_exit_progress (node_id, bytes_transferred, pieces_transferred, pieces_failed, updated_at) VALUES (?, ?, ?, ?, ?) 40 ON CONFLICT(node_id) 41 DO UPDATE SET bytes_transferred = graceful_exit_progress.bytes_transferred + excluded.bytes_transferred, 42 pieces_transferred = graceful_exit_progress.pieces_transferred + excluded.pieces_transferred, 43 pieces_failed = graceful_exit_progress.pieces_failed + excluded.pieces_failed, 44 updated_at = excluded.updated_at;`, 45 ) 46 now := time.Now().UTC() 47 _, err = db.db.ExecContext(ctx, statement, nodeID, bytes, successfulTransfers, failedTransfers, now) 48 if err != nil { 49 return Error.Wrap(err) 50 } 51 52 return nil 53} 54 55// GetProgress gets a graceful exit progress entry. 56func (db *gracefulexitDB) GetProgress(ctx context.Context, nodeID storj.NodeID) (_ *gracefulexit.Progress, err error) { 57 defer mon.Task()(&ctx)(&err) 58 dbxProgress, err := db.db.Get_GracefulExitProgress_By_NodeId(ctx, dbx.GracefulExitProgress_NodeId(nodeID.Bytes())) 59 if errors.Is(err, sql.ErrNoRows) { 60 return nil, gracefulexit.ErrNodeNotFound.Wrap(err) 61 } else if err != nil { 62 return nil, Error.Wrap(err) 63 } 64 nID, err := storj.NodeIDFromBytes(dbxProgress.NodeId) 65 if err != nil { 66 return nil, Error.Wrap(err) 67 } 68 69 progress := &gracefulexit.Progress{ 70 NodeID: nID, 71 BytesTransferred: dbxProgress.BytesTransferred, 72 PiecesTransferred: dbxProgress.PiecesTransferred, 73 PiecesFailed: dbxProgress.PiecesFailed, 74 UpdatedAt: dbxProgress.UpdatedAt, 75 } 76 77 return progress, Error.Wrap(err) 78} 79 80// Enqueue batch inserts graceful exit transfer queue entries if it does not exist. 81func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem, batchSize int) (err error) { 82 defer mon.Task()(&ctx)(&err) 83 84 sort.Slice(items, func(i, k int) bool { 85 compare := bytes.Compare(items[i].NodeID.Bytes(), items[k].NodeID.Bytes()) 86 if compare == 0 { 87 compare = bytes.Compare(items[i].StreamID[:], items[k].StreamID[:]) 88 if compare == 0 { 89 return items[i].Position.Encode() < items[k].Position.Encode() 90 } 91 return compare < 0 92 } 93 return compare < 0 94 }) 95 96 for i := 0; i < len(items); i += batchSize { 97 lowerBound := i 98 upperBound := lowerBound + batchSize 99 100 if upperBound > len(items) { 101 upperBound = len(items) 102 } 103 104 var nodeIDs []storj.NodeID 105 var streamIds [][]byte 106 var positions []int64 107 var pieceNums []int32 108 var rootPieceIDs [][]byte 109 var durabilities []float64 110 111 for _, item := range items[lowerBound:upperBound] { 112 item := item 113 nodeIDs = append(nodeIDs, item.NodeID) 114 streamIds = append(streamIds, item.StreamID[:]) 115 positions = append(positions, int64(item.Position.Encode())) 116 pieceNums = append(pieceNums, item.PieceNum) 117 rootPieceIDs = append(rootPieceIDs, item.RootPieceID.Bytes()) 118 durabilities = append(durabilities, item.DurabilityRatio) 119 } 120 121 _, err = db.db.ExecContext(ctx, db.db.Rebind(` 122 INSERT INTO graceful_exit_segment_transfer_queue ( 123 node_id, stream_id, position, piece_num, 124 root_piece_id, durability_ratio, queued_at 125 ) SELECT 126 unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::int8[]), 127 unnest($4::int4[]), unnest($5::bytea[]), unnest($6::float8[]), 128 $7 129 ON CONFLICT DO NOTHING;`), pgutil.NodeIDArray(nodeIDs), pgutil.ByteaArray(streamIds), pgutil.Int8Array(positions), 130 pgutil.Int4Array(pieceNums), pgutil.ByteaArray(rootPieceIDs), pgutil.Float8Array(durabilities), 131 time.Now().UTC()) 132 133 if err != nil { 134 return Error.Wrap(err) 135 } 136 137 } 138 return nil 139} 140 141// UpdateTransferQueueItem creates a graceful exit transfer queue entry. 142func (db *gracefulexitDB) UpdateTransferQueueItem(ctx context.Context, item gracefulexit.TransferQueueItem) (err error) { 143 defer mon.Task()(&ctx)(&err) 144 145 update := dbx.GracefulExitSegmentTransfer_Update_Fields{ 146 DurabilityRatio: dbx.GracefulExitSegmentTransfer_DurabilityRatio(item.DurabilityRatio), 147 LastFailedCode: dbx.GracefulExitSegmentTransfer_LastFailedCode_Raw(item.LastFailedCode), 148 FailedCount: dbx.GracefulExitSegmentTransfer_FailedCount_Raw(item.FailedCount), 149 } 150 151 if item.RequestedAt != nil { 152 update.RequestedAt = dbx.GracefulExitSegmentTransfer_RequestedAt_Raw(item.RequestedAt) 153 } 154 if item.LastFailedAt != nil { 155 update.LastFailedAt = dbx.GracefulExitSegmentTransfer_LastFailedAt_Raw(item.LastFailedAt) 156 } 157 if item.FinishedAt != nil { 158 update.FinishedAt = dbx.GracefulExitSegmentTransfer_FinishedAt_Raw(item.FinishedAt) 159 } 160 161 return db.db.UpdateNoReturn_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx, 162 dbx.GracefulExitSegmentTransfer_NodeId(item.NodeID.Bytes()), 163 dbx.GracefulExitSegmentTransfer_StreamId(item.StreamID[:]), 164 dbx.GracefulExitSegmentTransfer_Position(item.Position.Encode()), 165 dbx.GracefulExitSegmentTransfer_PieceNum(int(item.PieceNum)), 166 update, 167 ) 168} 169 170// DeleteTransferQueueItem deletes a graceful exit transfer queue entry. 171func (db *gracefulexitDB) DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) { 172 defer mon.Task()(&ctx)(&err) 173 174 _, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx, 175 dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()), 176 dbx.GracefulExitSegmentTransfer_StreamId(streamID[:]), 177 dbx.GracefulExitSegmentTransfer_Position(position.Encode()), dbx.GracefulExitSegmentTransfer_PieceNum(int(pieceNum))) 178 179 return Error.Wrap(err) 180} 181 182// DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID. 183func (db *gracefulexitDB) DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) { 184 defer mon.Task()(&ctx)(&err) 185 186 _, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes())) 187 return Error.Wrap(err) 188 189} 190 191// DeleteFinishedTransferQueueItem deletes finished graceful exit transfer queue entries by nodeID. 192func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) { 193 defer mon.Task()(&ctx)(&err) 194 195 _, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId_And_FinishedAt_IsNot_Null(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes())) 196 return Error.Wrap(err) 197} 198 199// DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer 200// queue items whose nodes have finished the exit before the indicated time 201// returning the total number of deleted items. 202func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems( 203 ctx context.Context, before time.Time, asOfSystemInterval time.Duration, batchSize int) (_ int64, err error) { 204 defer mon.Task()(&ctx)(&err) 205 206 switch db.db.impl { 207 case dbutil.Postgres: 208 statement := ` 209 DELETE FROM graceful_exit_segment_transfer_queue 210 WHERE node_id IN ( 211 SELECT node_id FROM graceful_exit_segment_transfer_queue INNER JOIN nodes 212 ON graceful_exit_segment_transfer_queue.node_id = nodes.id 213 WHERE nodes.exit_finished_at IS NOT NULL 214 AND nodes.exit_finished_at < $1 215 )` 216 res, err := db.db.ExecContext(ctx, statement, before) 217 if err != nil { 218 return 0, Error.Wrap(err) 219 } 220 221 count, err := res.RowsAffected() 222 if err != nil { 223 return 0, Error.Wrap(err) 224 } 225 226 return count, nil 227 228 case dbutil.Cockroach: 229 nodesQuery := ` 230 SELECT id 231 FROM nodes 232 ` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + ` 233 WHERE exit_finished_at IS NOT NULL 234 AND exit_finished_at < $1 235 LIMIT $2 OFFSET $3 236 ` 237 deleteStmt := ` 238 DELETE FROM graceful_exit_segment_transfer_queue 239 WHERE node_id = $1 240 LIMIT $2 241 ` 242 243 var ( 244 deleteCount int64 245 offset int 246 ) 247 for { 248 var nodeIDs storj.NodeIDList 249 deleteItems := func() (int64, error) { 250 // Select exited nodes 251 rows, err := db.db.QueryContext(ctx, nodesQuery, before, batchSize, offset) 252 if err != nil { 253 return deleteCount, Error.Wrap(err) 254 } 255 defer func() { err = errs.Combine(err, rows.Close()) }() 256 257 count := 0 258 for rows.Next() { 259 var id storj.NodeID 260 if err = rows.Scan(&id); err != nil { 261 return deleteCount, Error.Wrap(err) 262 } 263 nodeIDs = append(nodeIDs, id) 264 count++ 265 } 266 267 if count == batchSize { 268 offset += count 269 } else { 270 offset = -1 // indicates that there aren't more nodes to query 271 } 272 273 for _, id := range nodeIDs { 274 for { 275 res, err := db.db.ExecContext(ctx, deleteStmt, id.Bytes(), batchSize) 276 if err != nil { 277 return deleteCount, Error.Wrap(err) 278 } 279 count, err := res.RowsAffected() 280 if err != nil { 281 return deleteCount, Error.Wrap(err) 282 } 283 deleteCount += count 284 if count < int64(batchSize) { 285 break 286 } 287 } 288 } 289 return deleteCount, nil 290 } 291 deleteCount, err = deleteItems() 292 if err != nil { 293 return deleteCount, err 294 } 295 // when offset is negative means that we have get already all the nodes 296 // which have exited 297 if offset < 0 { 298 break 299 } 300 } 301 return deleteCount, nil 302 } 303 304 return 0, Error.New("unsupported implementation: %s", db.db.impl) 305} 306 307// DeleteFinishedExitProgress deletes exit progress entries for nodes that 308// finished exiting before the indicated time, returns number of deleted entries. 309func (db *gracefulexitDB) DeleteFinishedExitProgress( 310 ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (_ int64, err error) { 311 defer mon.Task()(&ctx)(&err) 312 313 finishedNodes, err := db.GetFinishedExitNodes(ctx, before, asOfSystemInterval) 314 if err != nil { 315 return 0, err 316 } 317 return db.DeleteBatchExitProgress(ctx, finishedNodes) 318} 319 320// GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time. 321func (db *gracefulexitDB) GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (finishedNodes []storj.NodeID, err error) { 322 defer mon.Task()(&ctx)(&err) 323 stmt := ` 324 SELECT id 325 FROM nodes 326 ` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + ` 327 WHERE exit_finished_at IS NOT NULL 328 AND exit_finished_at < ? 329 ` 330 rows, err := db.db.Query(ctx, db.db.Rebind(stmt), before.UTC()) 331 if err != nil { 332 return nil, Error.Wrap(err) 333 } 334 defer func() { 335 err = Error.Wrap(errs.Combine(err, rows.Close())) 336 }() 337 338 for rows.Next() { 339 var id storj.NodeID 340 err = rows.Scan(&id) 341 if err != nil { 342 return nil, Error.Wrap(err) 343 } 344 finishedNodes = append(finishedNodes, id) 345 } 346 return finishedNodes, Error.Wrap(rows.Err()) 347} 348 349// DeleteBatchExitProgress batch deletes from exit progress. This is separate from 350// getting the node IDs because the combined query is slow in CRDB. It's safe to do 351// separately because if nodes are deleted between the get and delete, it doesn't 352// affect correctness. 353func (db *gracefulexitDB) DeleteBatchExitProgress(ctx context.Context, nodeIDs []storj.NodeID) (deleted int64, err error) { 354 defer mon.Task()(&ctx)(&err) 355 stmt := `DELETE from graceful_exit_progress 356 WHERE node_id = ANY($1)` 357 for len(nodeIDs) > 0 { 358 numToSubmit := len(nodeIDs) 359 if numToSubmit > deleteExitProgressBatchSize { 360 numToSubmit = deleteExitProgressBatchSize 361 } 362 nodesToSubmit := nodeIDs[:numToSubmit] 363 res, err := db.db.ExecContext(ctx, stmt, pgutil.NodeIDArray(nodesToSubmit)) 364 if err != nil { 365 return deleted, Error.Wrap(err) 366 } 367 count, err := res.RowsAffected() 368 if err != nil { 369 return deleted, Error.Wrap(err) 370 } 371 deleted += count 372 nodeIDs = nodeIDs[numToSubmit:] 373 } 374 return deleted, Error.Wrap(err) 375} 376 377// GetTransferQueueItem gets a graceful exit transfer queue entry. 378func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (_ *gracefulexit.TransferQueueItem, err error) { 379 defer mon.Task()(&ctx)(&err) 380 381 dbxTransferQueue, err := db.db.Get_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx, 382 dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()), 383 dbx.GracefulExitSegmentTransfer_StreamId(streamID[:]), 384 dbx.GracefulExitSegmentTransfer_Position(position.Encode()), 385 dbx.GracefulExitSegmentTransfer_PieceNum(int(pieceNum))) 386 387 if err != nil { 388 return nil, Error.Wrap(err) 389 } 390 transferQueueItem, err := dbxSegmentTransferToTransferQueueItem(dbxTransferQueue) 391 if err != nil { 392 return nil, Error.Wrap(err) 393 } 394 395 return transferQueueItem, Error.Wrap(err) 396 397} 398 399// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending. 400func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) { 401 defer mon.Task()(&ctx)(&err) 402 403 sql := ` 404 SELECT 405 node_id, stream_id, position, 406 piece_num, root_piece_id, durability_ratio, 407 queued_at, requested_at, last_failed_at, 408 last_failed_code, failed_count, finished_at, 409 order_limit_send_count 410 FROM graceful_exit_segment_transfer_queue 411 WHERE node_id = ? 412 AND finished_at is NULL 413 ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?` 414 rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), limit, offset) 415 if err != nil { 416 return nil, Error.Wrap(err) 417 } 418 defer func() { err = errs.Combine(err, rows.Close()) }() 419 420 transferQueueItemRows, err := scanRows(rows) 421 if err != nil { 422 return nil, Error.Wrap(err) 423 } 424 425 return transferQueueItemRows, nil 426} 427 428// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that haven't failed, ordered by durability ratio and queued date ascending. 429func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) { 430 defer mon.Task()(&ctx)(&err) 431 432 sql := ` 433 SELECT 434 node_id, stream_id, position, 435 piece_num, root_piece_id, durability_ratio, 436 queued_at, requested_at, last_failed_at, 437 last_failed_code, failed_count, finished_at, 438 order_limit_send_count 439 FROM graceful_exit_segment_transfer_queue 440 WHERE node_id = ? 441 AND finished_at is NULL 442 AND last_failed_at is NULL 443 ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?` 444 rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), limit, offset) 445 if err != nil { 446 return nil, Error.Wrap(err) 447 } 448 defer func() { err = errs.Combine(err, rows.Close()) }() 449 450 transferQueueItemRows, err := scanRows(rows) 451 if err != nil { 452 return nil, Error.Wrap(err) 453 } 454 455 return transferQueueItemRows, nil 456} 457 458// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending. 459func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) { 460 defer mon.Task()(&ctx)(&err) 461 462 sql := ` 463 SELECT 464 node_id, stream_id, position, 465 piece_num, root_piece_id, durability_ratio, 466 queued_at, requested_at, last_failed_at, 467 last_failed_code, failed_count, finished_at, 468 order_limit_send_count 469 FROM graceful_exit_segment_transfer_queue 470 WHERE node_id = ? 471 AND finished_at is NULL 472 AND last_failed_at is not NULL 473 AND failed_count < ? 474 ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?` 475 rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), maxFailures, limit, offset) 476 if err != nil { 477 return nil, Error.Wrap(err) 478 } 479 defer func() { err = errs.Combine(err, rows.Close()) }() 480 481 transferQueueItemRows, err := scanRows(rows) 482 if err != nil { 483 return nil, Error.Wrap(err) 484 } 485 486 return transferQueueItemRows, nil 487} 488 489// IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring. 490func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) { 491 defer mon.Task()(&ctx)(&err) 492 493 sql := `UPDATE graceful_exit_segment_transfer_queue SET order_limit_send_count = graceful_exit_segment_transfer_queue.order_limit_send_count + 1 494 WHERE node_id = ? 495 AND stream_id = ? 496 AND position = ? 497 AND piece_num = ?` 498 _, err = db.db.ExecContext(ctx, db.db.Rebind(sql), nodeID, streamID, position.Encode(), pieceNum) 499 500 return Error.Wrap(err) 501} 502 503// CountFinishedTransferQueueItemsByNode return a map of the nodes which has 504// finished the exit before the indicated time but there are at least one item 505// left in the transfer queue. 506func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (_ map[storj.NodeID]int64, err error) { 507 defer mon.Task()(&ctx)(&err) 508 509 query := `SELECT n.id, count(getq.node_id) 510 FROM nodes as n INNER JOIN graceful_exit_segment_transfer_queue as getq 511 ON n.id = getq.node_id 512 ` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + ` 513 WHERE n.exit_finished_at IS NOT NULL 514 AND n.exit_finished_at < ? 515 GROUP BY n.id` 516 517 statement := db.db.Rebind(query) 518 519 rows, err := db.db.QueryContext(ctx, statement, before) 520 if err != nil { 521 return nil, Error.Wrap(err) 522 } 523 defer func() { err = errs.Combine(err, Error.Wrap(rows.Close())) }() 524 525 nodesItemsCount := make(map[storj.NodeID]int64) 526 for rows.Next() { 527 var ( 528 nodeID storj.NodeID 529 n int64 530 ) 531 err := rows.Scan(&nodeID, &n) 532 if err != nil { 533 return nil, Error.Wrap(err) 534 } 535 536 nodesItemsCount[nodeID] = n 537 } 538 539 return nodesItemsCount, Error.Wrap(rows.Err()) 540} 541 542func scanRows(rows tagsql.Rows) (transferQueueItemRows []*gracefulexit.TransferQueueItem, err error) { 543 for rows.Next() { 544 transferQueueItem := &gracefulexit.TransferQueueItem{} 545 var pieceIDBytes []byte 546 err = rows.Scan(&transferQueueItem.NodeID, &transferQueueItem.StreamID, &transferQueueItem.Position, &transferQueueItem.PieceNum, &pieceIDBytes, 547 &transferQueueItem.DurabilityRatio, &transferQueueItem.QueuedAt, &transferQueueItem.RequestedAt, &transferQueueItem.LastFailedAt, 548 &transferQueueItem.LastFailedCode, &transferQueueItem.FailedCount, &transferQueueItem.FinishedAt, &transferQueueItem.OrderLimitSendCount) 549 550 if err != nil { 551 return nil, Error.Wrap(err) 552 } 553 if pieceIDBytes != nil { 554 transferQueueItem.RootPieceID, err = storj.PieceIDFromBytes(pieceIDBytes) 555 if err != nil { 556 return nil, Error.Wrap(err) 557 } 558 } 559 560 transferQueueItemRows = append(transferQueueItemRows, transferQueueItem) 561 } 562 return transferQueueItemRows, Error.Wrap(rows.Err()) 563} 564 565func dbxSegmentTransferToTransferQueueItem(dbxSegmentTransfer *dbx.GracefulExitSegmentTransfer) (item *gracefulexit.TransferQueueItem, err error) { 566 nID, err := storj.NodeIDFromBytes(dbxSegmentTransfer.NodeId) 567 if err != nil { 568 return nil, Error.Wrap(err) 569 } 570 571 streamID, err := uuid.FromBytes(dbxSegmentTransfer.StreamId) 572 if err != nil { 573 return nil, Error.Wrap(err) 574 } 575 576 position := metabase.SegmentPositionFromEncoded(dbxSegmentTransfer.Position) 577 578 item = &gracefulexit.TransferQueueItem{ 579 NodeID: nID, 580 StreamID: streamID, 581 Position: position, 582 PieceNum: int32(dbxSegmentTransfer.PieceNum), 583 DurabilityRatio: dbxSegmentTransfer.DurabilityRatio, 584 QueuedAt: dbxSegmentTransfer.QueuedAt, 585 OrderLimitSendCount: dbxSegmentTransfer.OrderLimitSendCount, 586 } 587 if dbxSegmentTransfer.RootPieceId != nil { 588 item.RootPieceID, err = storj.PieceIDFromBytes(dbxSegmentTransfer.RootPieceId) 589 if err != nil { 590 return nil, err 591 } 592 } 593 if dbxSegmentTransfer.LastFailedCode != nil { 594 item.LastFailedCode = dbxSegmentTransfer.LastFailedCode 595 } 596 if dbxSegmentTransfer.FailedCount != nil { 597 item.FailedCount = dbxSegmentTransfer.FailedCount 598 } 599 if dbxSegmentTransfer.RequestedAt != nil && !dbxSegmentTransfer.RequestedAt.IsZero() { 600 item.RequestedAt = dbxSegmentTransfer.RequestedAt 601 } 602 if dbxSegmentTransfer.LastFailedAt != nil && !dbxSegmentTransfer.LastFailedAt.IsZero() { 603 item.LastFailedAt = dbxSegmentTransfer.LastFailedAt 604 } 605 if dbxSegmentTransfer.FinishedAt != nil && !dbxSegmentTransfer.FinishedAt.IsZero() { 606 item.FinishedAt = dbxSegmentTransfer.FinishedAt 607 } 608 609 return item, nil 610} 611