1// Copyright (C) 2020 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package metabase
5
6import (
7	"context"
8	"time"
9
10	"github.com/zeebo/errs"
11
12	"storj.io/common/storj"
13	"storj.io/common/uuid"
14)
15
16// RawObject defines the full object that is stored in the database. It should be rarely used directly.
17type RawObject struct {
18	ObjectStream
19
20	CreatedAt time.Time
21	ExpiresAt *time.Time
22
23	Status       ObjectStatus
24	SegmentCount int32
25
26	EncryptedMetadataNonce        []byte
27	EncryptedMetadata             []byte
28	EncryptedMetadataEncryptedKey []byte
29
30	// TotalPlainSize is 0 for a migrated object.
31	TotalPlainSize     int64
32	TotalEncryptedSize int64
33	// FixedSegmentSize is 0 for a migrated object.
34	FixedSegmentSize int32
35
36	Encryption storj.EncryptionParameters
37
38	// ZombieDeletionDeadline defines when the pending raw object should be deleted from the database.
39	// This is as a safeguard against objects that failed to upload and the client has not indicated
40	// whether they want to continue uploading or delete the already uploaded data.
41	ZombieDeletionDeadline *time.Time
42}
43
44// RawSegment defines the full segment that is stored in the database. It should be rarely used directly.
45type RawSegment struct {
46	StreamID uuid.UUID
47	Position SegmentPosition
48
49	CreatedAt  time.Time // non-nillable
50	RepairedAt *time.Time
51	ExpiresAt  *time.Time
52
53	RootPieceID       storj.PieceID
54	EncryptedKeyNonce []byte
55	EncryptedKey      []byte
56
57	EncryptedSize int32 // size of the whole segment (not a piece)
58	// PlainSize is 0 for a migrated object.
59	PlainSize int32
60	// PlainOffset is 0 for a migrated object.
61	PlainOffset   int64
62	EncryptedETag []byte
63
64	Redundancy storj.RedundancyScheme
65
66	InlineData []byte
67	Pieces     Pieces
68
69	Placement storj.PlacementConstraint
70}
71
72// RawState contains full state of a table.
73type RawState struct {
74	Objects  []RawObject
75	Segments []RawSegment
76}
77
78// TestingGetState returns the state of the database.
79func (db *DB) TestingGetState(ctx context.Context) (_ *RawState, err error) {
80	state := &RawState{}
81
82	state.Objects, err = db.testingGetAllObjects(ctx)
83	if err != nil {
84		return nil, Error.New("GetState: %w", err)
85	}
86
87	state.Segments, err = db.testingGetAllSegments(ctx)
88	if err != nil {
89		return nil, Error.New("GetState: %w", err)
90	}
91
92	return state, nil
93}
94
95// TestingDeleteAll deletes all objects and segments from the database.
96func (db *DB) TestingDeleteAll(ctx context.Context) (err error) {
97	_, err = db.db.ExecContext(ctx, `
98		DELETE FROM objects;
99		DELETE FROM segments;
100		DELETE FROM node_aliases;
101		SELECT setval('node_alias_seq', 1, false);
102	`)
103	db.aliasCache = NewNodeAliasCache(db)
104	return Error.Wrap(err)
105}
106
107// testingGetAllObjects returns the state of the database.
108func (db *DB) testingGetAllObjects(ctx context.Context) (_ []RawObject, err error) {
109	objs := []RawObject{}
110
111	rows, err := db.db.QueryContext(ctx, `
112		SELECT
113			project_id, bucket_name, object_key, version, stream_id,
114			created_at, expires_at,
115			status, segment_count,
116			encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
117			total_plain_size, total_encrypted_size, fixed_segment_size,
118			encryption,
119			zombie_deletion_deadline
120		FROM objects
121		ORDER BY project_id ASC, bucket_name ASC, object_key ASC, version ASC
122	`)
123	if err != nil {
124		return nil, Error.New("testingGetAllObjects query: %w", err)
125	}
126	defer func() { err = errs.Combine(err, rows.Close()) }()
127	for rows.Next() {
128		var obj RawObject
129		err := rows.Scan(
130			&obj.ProjectID,
131			&obj.BucketName,
132			&obj.ObjectKey,
133			&obj.Version,
134			&obj.StreamID,
135
136			&obj.CreatedAt,
137			&obj.ExpiresAt,
138
139			&obj.Status, // TODO: fix encoding
140			&obj.SegmentCount,
141
142			&obj.EncryptedMetadataNonce,
143			&obj.EncryptedMetadata,
144			&obj.EncryptedMetadataEncryptedKey,
145
146			&obj.TotalPlainSize,
147			&obj.TotalEncryptedSize,
148			&obj.FixedSegmentSize,
149
150			encryptionParameters{&obj.Encryption},
151			&obj.ZombieDeletionDeadline,
152		)
153		if err != nil {
154			return nil, Error.New("testingGetAllObjects scan failed: %w", err)
155		}
156		objs = append(objs, obj)
157	}
158	if err := rows.Err(); err != nil {
159		return nil, Error.New("testingGetAllObjects scan failed: %w", err)
160	}
161
162	if len(objs) == 0 {
163		return nil, nil
164	}
165	return objs, nil
166}
167
168// testingGetAllSegments returns the state of the database.
169func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err error) {
170	segs := []RawSegment{}
171
172	rows, err := db.db.QueryContext(ctx, `
173		SELECT
174			stream_id, position,
175			created_at, repaired_at, expires_at,
176			root_piece_id, encrypted_key_nonce, encrypted_key,
177			encrypted_size,
178			plain_offset, plain_size,
179			encrypted_etag,
180			redundancy,
181			inline_data, remote_alias_pieces,
182			placement
183		FROM segments
184		ORDER BY stream_id ASC, position ASC
185	`)
186	if err != nil {
187		return nil, Error.New("testingGetAllSegments query: %w", err)
188	}
189	defer func() { err = errs.Combine(err, rows.Close()) }()
190	for rows.Next() {
191		var seg RawSegment
192		var aliasPieces AliasPieces
193		err := rows.Scan(
194			&seg.StreamID,
195			&seg.Position,
196
197			&seg.CreatedAt,
198			&seg.RepairedAt,
199			&seg.ExpiresAt,
200
201			&seg.RootPieceID,
202			&seg.EncryptedKeyNonce,
203			&seg.EncryptedKey,
204
205			&seg.EncryptedSize,
206			&seg.PlainOffset,
207			&seg.PlainSize,
208			&seg.EncryptedETag,
209
210			redundancyScheme{&seg.Redundancy},
211
212			&seg.InlineData,
213			&aliasPieces,
214			&seg.Placement,
215		)
216		if err != nil {
217			return nil, Error.New("testingGetAllSegments scan failed: %w", err)
218		}
219
220		seg.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
221		if err != nil {
222			return nil, Error.New("testingGetAllSegments convert aliases to pieces failed: %w", err)
223		}
224
225		segs = append(segs, seg)
226	}
227	if err := rows.Err(); err != nil {
228		return nil, Error.New("testingGetAllSegments scan failed: %w", err)
229	}
230
231	if len(segs) == 0 {
232		return nil, nil
233	}
234	return segs, nil
235}
236