1// Copyright (C) 2019 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package satellitedb
5
6import (
7	"bytes"
8	"context"
9	"database/sql"
10	"errors"
11	"sort"
12	"time"
13
14	"github.com/zeebo/errs"
15
16	"storj.io/common/storj"
17	"storj.io/common/uuid"
18	"storj.io/private/dbutil"
19	"storj.io/private/dbutil/pgutil"
20	"storj.io/private/tagsql"
21	"storj.io/storj/satellite/gracefulexit"
22	"storj.io/storj/satellite/metabase"
23	"storj.io/storj/satellite/satellitedb/dbx"
24)
25
26type gracefulexitDB struct {
27	db *satelliteDB
28}
29
30const (
31	deleteExitProgressBatchSize = 1000
32)
33
34// IncrementProgress increments transfer stats for a node.
35func (db *gracefulexitDB) IncrementProgress(ctx context.Context, nodeID storj.NodeID, bytes int64, successfulTransfers int64, failedTransfers int64) (err error) {
36	defer mon.Task()(&ctx)(&err)
37
38	statement := db.db.Rebind(
39		`INSERT INTO graceful_exit_progress (node_id, bytes_transferred, pieces_transferred, pieces_failed, updated_at) VALUES (?, ?, ?, ?, ?)
40		 ON CONFLICT(node_id)
41		 DO UPDATE SET bytes_transferred = graceful_exit_progress.bytes_transferred + excluded.bytes_transferred,
42		 	pieces_transferred = graceful_exit_progress.pieces_transferred + excluded.pieces_transferred,
43		 	pieces_failed = graceful_exit_progress.pieces_failed + excluded.pieces_failed,
44		 	updated_at = excluded.updated_at;`,
45	)
46	now := time.Now().UTC()
47	_, err = db.db.ExecContext(ctx, statement, nodeID, bytes, successfulTransfers, failedTransfers, now)
48	if err != nil {
49		return Error.Wrap(err)
50	}
51
52	return nil
53}
54
55// GetProgress gets a graceful exit progress entry.
56func (db *gracefulexitDB) GetProgress(ctx context.Context, nodeID storj.NodeID) (_ *gracefulexit.Progress, err error) {
57	defer mon.Task()(&ctx)(&err)
58	dbxProgress, err := db.db.Get_GracefulExitProgress_By_NodeId(ctx, dbx.GracefulExitProgress_NodeId(nodeID.Bytes()))
59	if errors.Is(err, sql.ErrNoRows) {
60		return nil, gracefulexit.ErrNodeNotFound.Wrap(err)
61	} else if err != nil {
62		return nil, Error.Wrap(err)
63	}
64	nID, err := storj.NodeIDFromBytes(dbxProgress.NodeId)
65	if err != nil {
66		return nil, Error.Wrap(err)
67	}
68
69	progress := &gracefulexit.Progress{
70		NodeID:            nID,
71		BytesTransferred:  dbxProgress.BytesTransferred,
72		PiecesTransferred: dbxProgress.PiecesTransferred,
73		PiecesFailed:      dbxProgress.PiecesFailed,
74		UpdatedAt:         dbxProgress.UpdatedAt,
75	}
76
77	return progress, Error.Wrap(err)
78}
79
80// Enqueue batch inserts graceful exit transfer queue entries if it does not exist.
81func (db *gracefulexitDB) Enqueue(ctx context.Context, items []gracefulexit.TransferQueueItem, batchSize int) (err error) {
82	defer mon.Task()(&ctx)(&err)
83
84	sort.Slice(items, func(i, k int) bool {
85		compare := bytes.Compare(items[i].NodeID.Bytes(), items[k].NodeID.Bytes())
86		if compare == 0 {
87			compare = bytes.Compare(items[i].StreamID[:], items[k].StreamID[:])
88			if compare == 0 {
89				return items[i].Position.Encode() < items[k].Position.Encode()
90			}
91			return compare < 0
92		}
93		return compare < 0
94	})
95
96	for i := 0; i < len(items); i += batchSize {
97		lowerBound := i
98		upperBound := lowerBound + batchSize
99
100		if upperBound > len(items) {
101			upperBound = len(items)
102		}
103
104		var nodeIDs []storj.NodeID
105		var streamIds [][]byte
106		var positions []int64
107		var pieceNums []int32
108		var rootPieceIDs [][]byte
109		var durabilities []float64
110
111		for _, item := range items[lowerBound:upperBound] {
112			item := item
113			nodeIDs = append(nodeIDs, item.NodeID)
114			streamIds = append(streamIds, item.StreamID[:])
115			positions = append(positions, int64(item.Position.Encode()))
116			pieceNums = append(pieceNums, item.PieceNum)
117			rootPieceIDs = append(rootPieceIDs, item.RootPieceID.Bytes())
118			durabilities = append(durabilities, item.DurabilityRatio)
119		}
120
121		_, err = db.db.ExecContext(ctx, db.db.Rebind(`
122			INSERT INTO graceful_exit_segment_transfer_queue (
123				node_id, stream_id, position, piece_num,
124				root_piece_id, durability_ratio, queued_at
125			) SELECT
126				unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::int8[]),
127				unnest($4::int4[]), unnest($5::bytea[]), unnest($6::float8[]),
128				$7
129			ON CONFLICT DO NOTHING;`), pgutil.NodeIDArray(nodeIDs), pgutil.ByteaArray(streamIds), pgutil.Int8Array(positions),
130			pgutil.Int4Array(pieceNums), pgutil.ByteaArray(rootPieceIDs), pgutil.Float8Array(durabilities),
131			time.Now().UTC())
132
133		if err != nil {
134			return Error.Wrap(err)
135		}
136
137	}
138	return nil
139}
140
141// UpdateTransferQueueItem creates a graceful exit transfer queue entry.
142func (db *gracefulexitDB) UpdateTransferQueueItem(ctx context.Context, item gracefulexit.TransferQueueItem) (err error) {
143	defer mon.Task()(&ctx)(&err)
144
145	update := dbx.GracefulExitSegmentTransfer_Update_Fields{
146		DurabilityRatio: dbx.GracefulExitSegmentTransfer_DurabilityRatio(item.DurabilityRatio),
147		LastFailedCode:  dbx.GracefulExitSegmentTransfer_LastFailedCode_Raw(item.LastFailedCode),
148		FailedCount:     dbx.GracefulExitSegmentTransfer_FailedCount_Raw(item.FailedCount),
149	}
150
151	if item.RequestedAt != nil {
152		update.RequestedAt = dbx.GracefulExitSegmentTransfer_RequestedAt_Raw(item.RequestedAt)
153	}
154	if item.LastFailedAt != nil {
155		update.LastFailedAt = dbx.GracefulExitSegmentTransfer_LastFailedAt_Raw(item.LastFailedAt)
156	}
157	if item.FinishedAt != nil {
158		update.FinishedAt = dbx.GracefulExitSegmentTransfer_FinishedAt_Raw(item.FinishedAt)
159	}
160
161	return db.db.UpdateNoReturn_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx,
162		dbx.GracefulExitSegmentTransfer_NodeId(item.NodeID.Bytes()),
163		dbx.GracefulExitSegmentTransfer_StreamId(item.StreamID[:]),
164		dbx.GracefulExitSegmentTransfer_Position(item.Position.Encode()),
165		dbx.GracefulExitSegmentTransfer_PieceNum(int(item.PieceNum)),
166		update,
167	)
168}
169
170// DeleteTransferQueueItem deletes a graceful exit transfer queue entry.
171func (db *gracefulexitDB) DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) {
172	defer mon.Task()(&ctx)(&err)
173
174	_, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx,
175		dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()),
176		dbx.GracefulExitSegmentTransfer_StreamId(streamID[:]),
177		dbx.GracefulExitSegmentTransfer_Position(position.Encode()), dbx.GracefulExitSegmentTransfer_PieceNum(int(pieceNum)))
178
179	return Error.Wrap(err)
180}
181
182// DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID.
183func (db *gracefulexitDB) DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) {
184	defer mon.Task()(&ctx)(&err)
185
186	_, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()))
187	return Error.Wrap(err)
188
189}
190
191// DeleteFinishedTransferQueueItem deletes finished graceful exit transfer queue entries by nodeID.
192func (db *gracefulexitDB) DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) (err error) {
193	defer mon.Task()(&ctx)(&err)
194
195	_, err = db.db.Delete_GracefulExitSegmentTransfer_By_NodeId_And_FinishedAt_IsNot_Null(ctx, dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()))
196	return Error.Wrap(err)
197}
198
199// DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer
200// queue items whose nodes have finished the exit before the indicated time
201// returning the total number of deleted items.
202func (db *gracefulexitDB) DeleteAllFinishedTransferQueueItems(
203	ctx context.Context, before time.Time, asOfSystemInterval time.Duration, batchSize int) (_ int64, err error) {
204	defer mon.Task()(&ctx)(&err)
205
206	switch db.db.impl {
207	case dbutil.Postgres:
208		statement := `
209			DELETE FROM graceful_exit_segment_transfer_queue
210			WHERE node_id IN (
211				SELECT node_id FROM graceful_exit_segment_transfer_queue INNER JOIN nodes
212					ON graceful_exit_segment_transfer_queue.node_id = nodes.id
213				WHERE nodes.exit_finished_at IS NOT NULL
214				AND nodes.exit_finished_at < $1
215			)`
216		res, err := db.db.ExecContext(ctx, statement, before)
217		if err != nil {
218			return 0, Error.Wrap(err)
219		}
220
221		count, err := res.RowsAffected()
222		if err != nil {
223			return 0, Error.Wrap(err)
224		}
225
226		return count, nil
227
228	case dbutil.Cockroach:
229		nodesQuery := `
230			SELECT id
231			FROM nodes
232		` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + `
233			WHERE exit_finished_at IS NOT NULL
234				AND exit_finished_at < $1
235			LIMIT $2 OFFSET $3
236		`
237		deleteStmt := `
238			DELETE FROM graceful_exit_segment_transfer_queue
239			WHERE node_id = $1
240			LIMIT $2
241		`
242
243		var (
244			deleteCount int64
245			offset      int
246		)
247		for {
248			var nodeIDs storj.NodeIDList
249			deleteItems := func() (int64, error) {
250				// Select exited nodes
251				rows, err := db.db.QueryContext(ctx, nodesQuery, before, batchSize, offset)
252				if err != nil {
253					return deleteCount, Error.Wrap(err)
254				}
255				defer func() { err = errs.Combine(err, rows.Close()) }()
256
257				count := 0
258				for rows.Next() {
259					var id storj.NodeID
260					if err = rows.Scan(&id); err != nil {
261						return deleteCount, Error.Wrap(err)
262					}
263					nodeIDs = append(nodeIDs, id)
264					count++
265				}
266
267				if count == batchSize {
268					offset += count
269				} else {
270					offset = -1 // indicates that there aren't more nodes to query
271				}
272
273				for _, id := range nodeIDs {
274					for {
275						res, err := db.db.ExecContext(ctx, deleteStmt, id.Bytes(), batchSize)
276						if err != nil {
277							return deleteCount, Error.Wrap(err)
278						}
279						count, err := res.RowsAffected()
280						if err != nil {
281							return deleteCount, Error.Wrap(err)
282						}
283						deleteCount += count
284						if count < int64(batchSize) {
285							break
286						}
287					}
288				}
289				return deleteCount, nil
290			}
291			deleteCount, err = deleteItems()
292			if err != nil {
293				return deleteCount, err
294			}
295			// when offset is negative means that we have get already all the nodes
296			// which have exited
297			if offset < 0 {
298				break
299			}
300		}
301		return deleteCount, nil
302	}
303
304	return 0, Error.New("unsupported implementation: %s", db.db.impl)
305}
306
307// DeleteFinishedExitProgress deletes exit progress entries for nodes that
308// finished exiting before the indicated time, returns number of deleted entries.
309func (db *gracefulexitDB) DeleteFinishedExitProgress(
310	ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (_ int64, err error) {
311	defer mon.Task()(&ctx)(&err)
312
313	finishedNodes, err := db.GetFinishedExitNodes(ctx, before, asOfSystemInterval)
314	if err != nil {
315		return 0, err
316	}
317	return db.DeleteBatchExitProgress(ctx, finishedNodes)
318}
319
320// GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time.
321func (db *gracefulexitDB) GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (finishedNodes []storj.NodeID, err error) {
322	defer mon.Task()(&ctx)(&err)
323	stmt := `
324		SELECT id
325		FROM nodes
326		` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + `
327        WHERE exit_finished_at IS NOT NULL
328	    AND exit_finished_at < ?
329		`
330	rows, err := db.db.Query(ctx, db.db.Rebind(stmt), before.UTC())
331	if err != nil {
332		return nil, Error.Wrap(err)
333	}
334	defer func() {
335		err = Error.Wrap(errs.Combine(err, rows.Close()))
336	}()
337
338	for rows.Next() {
339		var id storj.NodeID
340		err = rows.Scan(&id)
341		if err != nil {
342			return nil, Error.Wrap(err)
343		}
344		finishedNodes = append(finishedNodes, id)
345	}
346	return finishedNodes, Error.Wrap(rows.Err())
347}
348
349// DeleteBatchExitProgress batch deletes from exit progress. This is separate from
350// getting the node IDs because the combined query is slow in CRDB. It's safe to do
351// separately because if nodes are deleted between the get and delete, it doesn't
352// affect correctness.
353func (db *gracefulexitDB) DeleteBatchExitProgress(ctx context.Context, nodeIDs []storj.NodeID) (deleted int64, err error) {
354	defer mon.Task()(&ctx)(&err)
355	stmt := `DELETE from graceful_exit_progress
356			WHERE node_id = ANY($1)`
357	for len(nodeIDs) > 0 {
358		numToSubmit := len(nodeIDs)
359		if numToSubmit > deleteExitProgressBatchSize {
360			numToSubmit = deleteExitProgressBatchSize
361		}
362		nodesToSubmit := nodeIDs[:numToSubmit]
363		res, err := db.db.ExecContext(ctx, stmt, pgutil.NodeIDArray(nodesToSubmit))
364		if err != nil {
365			return deleted, Error.Wrap(err)
366		}
367		count, err := res.RowsAffected()
368		if err != nil {
369			return deleted, Error.Wrap(err)
370		}
371		deleted += count
372		nodeIDs = nodeIDs[numToSubmit:]
373	}
374	return deleted, Error.Wrap(err)
375}
376
377// GetTransferQueueItem gets a graceful exit transfer queue entry.
378func (db *gracefulexitDB) GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (_ *gracefulexit.TransferQueueItem, err error) {
379	defer mon.Task()(&ctx)(&err)
380
381	dbxTransferQueue, err := db.db.Get_GracefulExitSegmentTransfer_By_NodeId_And_StreamId_And_Position_And_PieceNum(ctx,
382		dbx.GracefulExitSegmentTransfer_NodeId(nodeID.Bytes()),
383		dbx.GracefulExitSegmentTransfer_StreamId(streamID[:]),
384		dbx.GracefulExitSegmentTransfer_Position(position.Encode()),
385		dbx.GracefulExitSegmentTransfer_PieceNum(int(pieceNum)))
386
387	if err != nil {
388		return nil, Error.Wrap(err)
389	}
390	transferQueueItem, err := dbxSegmentTransferToTransferQueueItem(dbxTransferQueue)
391	if err != nil {
392		return nil, Error.Wrap(err)
393	}
394
395	return transferQueueItem, Error.Wrap(err)
396
397}
398
399// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending.
400func (db *gracefulexitDB) GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) {
401	defer mon.Task()(&ctx)(&err)
402
403	sql := `
404			SELECT
405				node_id, stream_id, position,
406				piece_num, root_piece_id, durability_ratio,
407				queued_at, requested_at, last_failed_at,
408				last_failed_code, failed_count, finished_at,
409				order_limit_send_count
410			FROM graceful_exit_segment_transfer_queue
411			WHERE node_id = ?
412			AND finished_at is NULL
413			ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
414	rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), limit, offset)
415	if err != nil {
416		return nil, Error.Wrap(err)
417	}
418	defer func() { err = errs.Combine(err, rows.Close()) }()
419
420	transferQueueItemRows, err := scanRows(rows)
421	if err != nil {
422		return nil, Error.Wrap(err)
423	}
424
425	return transferQueueItemRows, nil
426}
427
428// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that haven't failed, ordered by durability ratio and queued date ascending.
429func (db *gracefulexitDB) GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) {
430	defer mon.Task()(&ctx)(&err)
431
432	sql := `
433			SELECT
434				node_id, stream_id, position,
435				piece_num, root_piece_id, durability_ratio,
436				queued_at, requested_at, last_failed_at,
437				last_failed_code, failed_count, finished_at,
438				order_limit_send_count
439			FROM graceful_exit_segment_transfer_queue
440			WHERE node_id = ?
441			AND finished_at is NULL
442			AND last_failed_at is NULL
443			ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
444	rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), limit, offset)
445	if err != nil {
446		return nil, Error.Wrap(err)
447	}
448	defer func() { err = errs.Combine(err, rows.Close()) }()
449
450	transferQueueItemRows, err := scanRows(rows)
451	if err != nil {
452		return nil, Error.Wrap(err)
453	}
454
455	return transferQueueItemRows, nil
456}
457
458// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending.
459func (db *gracefulexitDB) GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) (_ []*gracefulexit.TransferQueueItem, err error) {
460	defer mon.Task()(&ctx)(&err)
461
462	sql := `
463			SELECT
464				node_id, stream_id, position,
465				piece_num, root_piece_id, durability_ratio,
466				queued_at, requested_at, last_failed_at,
467				last_failed_code, failed_count, finished_at,
468				order_limit_send_count
469			FROM graceful_exit_segment_transfer_queue
470			WHERE node_id = ?
471				AND finished_at is NULL
472				AND last_failed_at is not NULL
473				AND failed_count < ?
474			ORDER BY durability_ratio asc, queued_at asc LIMIT ? OFFSET ?`
475	rows, err := db.db.Query(ctx, db.db.Rebind(sql), nodeID.Bytes(), maxFailures, limit, offset)
476	if err != nil {
477		return nil, Error.Wrap(err)
478	}
479	defer func() { err = errs.Combine(err, rows.Close()) }()
480
481	transferQueueItemRows, err := scanRows(rows)
482	if err != nil {
483		return nil, Error.Wrap(err)
484	}
485
486	return transferQueueItemRows, nil
487}
488
489// IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring.
490func (db *gracefulexitDB) IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, streamID uuid.UUID, position metabase.SegmentPosition, pieceNum int32) (err error) {
491	defer mon.Task()(&ctx)(&err)
492
493	sql := `UPDATE graceful_exit_segment_transfer_queue SET order_limit_send_count = graceful_exit_segment_transfer_queue.order_limit_send_count + 1
494			WHERE node_id = ?
495			AND stream_id = ?
496			AND position = ?
497			AND piece_num = ?`
498	_, err = db.db.ExecContext(ctx, db.db.Rebind(sql), nodeID, streamID, position.Encode(), pieceNum)
499
500	return Error.Wrap(err)
501}
502
503// CountFinishedTransferQueueItemsByNode return a map of the nodes which has
504// finished the exit before the indicated time but there are at least one item
505// left in the transfer queue.
506func (db *gracefulexitDB) CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemInterval time.Duration) (_ map[storj.NodeID]int64, err error) {
507	defer mon.Task()(&ctx)(&err)
508
509	query := `SELECT n.id, count(getq.node_id)
510		FROM nodes as n INNER JOIN graceful_exit_segment_transfer_queue as getq
511			ON n.id = getq.node_id
512		` + db.db.impl.AsOfSystemInterval(asOfSystemInterval) + `
513		WHERE n.exit_finished_at IS NOT NULL
514			AND n.exit_finished_at < ?
515		GROUP BY n.id`
516
517	statement := db.db.Rebind(query)
518
519	rows, err := db.db.QueryContext(ctx, statement, before)
520	if err != nil {
521		return nil, Error.Wrap(err)
522	}
523	defer func() { err = errs.Combine(err, Error.Wrap(rows.Close())) }()
524
525	nodesItemsCount := make(map[storj.NodeID]int64)
526	for rows.Next() {
527		var (
528			nodeID storj.NodeID
529			n      int64
530		)
531		err := rows.Scan(&nodeID, &n)
532		if err != nil {
533			return nil, Error.Wrap(err)
534		}
535
536		nodesItemsCount[nodeID] = n
537	}
538
539	return nodesItemsCount, Error.Wrap(rows.Err())
540}
541
542func scanRows(rows tagsql.Rows) (transferQueueItemRows []*gracefulexit.TransferQueueItem, err error) {
543	for rows.Next() {
544		transferQueueItem := &gracefulexit.TransferQueueItem{}
545		var pieceIDBytes []byte
546		err = rows.Scan(&transferQueueItem.NodeID, &transferQueueItem.StreamID, &transferQueueItem.Position, &transferQueueItem.PieceNum, &pieceIDBytes,
547			&transferQueueItem.DurabilityRatio, &transferQueueItem.QueuedAt, &transferQueueItem.RequestedAt, &transferQueueItem.LastFailedAt,
548			&transferQueueItem.LastFailedCode, &transferQueueItem.FailedCount, &transferQueueItem.FinishedAt, &transferQueueItem.OrderLimitSendCount)
549
550		if err != nil {
551			return nil, Error.Wrap(err)
552		}
553		if pieceIDBytes != nil {
554			transferQueueItem.RootPieceID, err = storj.PieceIDFromBytes(pieceIDBytes)
555			if err != nil {
556				return nil, Error.Wrap(err)
557			}
558		}
559
560		transferQueueItemRows = append(transferQueueItemRows, transferQueueItem)
561	}
562	return transferQueueItemRows, Error.Wrap(rows.Err())
563}
564
565func dbxSegmentTransferToTransferQueueItem(dbxSegmentTransfer *dbx.GracefulExitSegmentTransfer) (item *gracefulexit.TransferQueueItem, err error) {
566	nID, err := storj.NodeIDFromBytes(dbxSegmentTransfer.NodeId)
567	if err != nil {
568		return nil, Error.Wrap(err)
569	}
570
571	streamID, err := uuid.FromBytes(dbxSegmentTransfer.StreamId)
572	if err != nil {
573		return nil, Error.Wrap(err)
574	}
575
576	position := metabase.SegmentPositionFromEncoded(dbxSegmentTransfer.Position)
577
578	item = &gracefulexit.TransferQueueItem{
579		NodeID:              nID,
580		StreamID:            streamID,
581		Position:            position,
582		PieceNum:            int32(dbxSegmentTransfer.PieceNum),
583		DurabilityRatio:     dbxSegmentTransfer.DurabilityRatio,
584		QueuedAt:            dbxSegmentTransfer.QueuedAt,
585		OrderLimitSendCount: dbxSegmentTransfer.OrderLimitSendCount,
586	}
587	if dbxSegmentTransfer.RootPieceId != nil {
588		item.RootPieceID, err = storj.PieceIDFromBytes(dbxSegmentTransfer.RootPieceId)
589		if err != nil {
590			return nil, err
591		}
592	}
593	if dbxSegmentTransfer.LastFailedCode != nil {
594		item.LastFailedCode = dbxSegmentTransfer.LastFailedCode
595	}
596	if dbxSegmentTransfer.FailedCount != nil {
597		item.FailedCount = dbxSegmentTransfer.FailedCount
598	}
599	if dbxSegmentTransfer.RequestedAt != nil && !dbxSegmentTransfer.RequestedAt.IsZero() {
600		item.RequestedAt = dbxSegmentTransfer.RequestedAt
601	}
602	if dbxSegmentTransfer.LastFailedAt != nil && !dbxSegmentTransfer.LastFailedAt.IsZero() {
603		item.LastFailedAt = dbxSegmentTransfer.LastFailedAt
604	}
605	if dbxSegmentTransfer.FinishedAt != nil && !dbxSegmentTransfer.FinishedAt.IsZero() {
606		item.FinishedAt = dbxSegmentTransfer.FinishedAt
607	}
608
609	return item, nil
610}
611