1// Copyright (C) 2020 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package metabase 5 6import ( 7 "context" 8 "time" 9 10 "github.com/zeebo/errs" 11 12 "storj.io/common/storj" 13 "storj.io/common/uuid" 14) 15 16// RawObject defines the full object that is stored in the database. It should be rarely used directly. 17type RawObject struct { 18 ObjectStream 19 20 CreatedAt time.Time 21 ExpiresAt *time.Time 22 23 Status ObjectStatus 24 SegmentCount int32 25 26 EncryptedMetadataNonce []byte 27 EncryptedMetadata []byte 28 EncryptedMetadataEncryptedKey []byte 29 30 // TotalPlainSize is 0 for a migrated object. 31 TotalPlainSize int64 32 TotalEncryptedSize int64 33 // FixedSegmentSize is 0 for a migrated object. 34 FixedSegmentSize int32 35 36 Encryption storj.EncryptionParameters 37 38 // ZombieDeletionDeadline defines when the pending raw object should be deleted from the database. 39 // This is as a safeguard against objects that failed to upload and the client has not indicated 40 // whether they want to continue uploading or delete the already uploaded data. 41 ZombieDeletionDeadline *time.Time 42} 43 44// RawSegment defines the full segment that is stored in the database. It should be rarely used directly. 45type RawSegment struct { 46 StreamID uuid.UUID 47 Position SegmentPosition 48 49 CreatedAt time.Time // non-nillable 50 RepairedAt *time.Time 51 ExpiresAt *time.Time 52 53 RootPieceID storj.PieceID 54 EncryptedKeyNonce []byte 55 EncryptedKey []byte 56 57 EncryptedSize int32 // size of the whole segment (not a piece) 58 // PlainSize is 0 for a migrated object. 59 PlainSize int32 60 // PlainOffset is 0 for a migrated object. 61 PlainOffset int64 62 EncryptedETag []byte 63 64 Redundancy storj.RedundancyScheme 65 66 InlineData []byte 67 Pieces Pieces 68 69 Placement storj.PlacementConstraint 70} 71 72// RawState contains full state of a table. 73type RawState struct { 74 Objects []RawObject 75 Segments []RawSegment 76} 77 78// TestingGetState returns the state of the database. 79func (db *DB) TestingGetState(ctx context.Context) (_ *RawState, err error) { 80 state := &RawState{} 81 82 state.Objects, err = db.testingGetAllObjects(ctx) 83 if err != nil { 84 return nil, Error.New("GetState: %w", err) 85 } 86 87 state.Segments, err = db.testingGetAllSegments(ctx) 88 if err != nil { 89 return nil, Error.New("GetState: %w", err) 90 } 91 92 return state, nil 93} 94 95// TestingDeleteAll deletes all objects and segments from the database. 96func (db *DB) TestingDeleteAll(ctx context.Context) (err error) { 97 _, err = db.db.ExecContext(ctx, ` 98 DELETE FROM objects; 99 DELETE FROM segments; 100 DELETE FROM node_aliases; 101 SELECT setval('node_alias_seq', 1, false); 102 `) 103 db.aliasCache = NewNodeAliasCache(db) 104 return Error.Wrap(err) 105} 106 107// testingGetAllObjects returns the state of the database. 108func (db *DB) testingGetAllObjects(ctx context.Context) (_ []RawObject, err error) { 109 objs := []RawObject{} 110 111 rows, err := db.db.QueryContext(ctx, ` 112 SELECT 113 project_id, bucket_name, object_key, version, stream_id, 114 created_at, expires_at, 115 status, segment_count, 116 encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, 117 total_plain_size, total_encrypted_size, fixed_segment_size, 118 encryption, 119 zombie_deletion_deadline 120 FROM objects 121 ORDER BY project_id ASC, bucket_name ASC, object_key ASC, version ASC 122 `) 123 if err != nil { 124 return nil, Error.New("testingGetAllObjects query: %w", err) 125 } 126 defer func() { err = errs.Combine(err, rows.Close()) }() 127 for rows.Next() { 128 var obj RawObject 129 err := rows.Scan( 130 &obj.ProjectID, 131 &obj.BucketName, 132 &obj.ObjectKey, 133 &obj.Version, 134 &obj.StreamID, 135 136 &obj.CreatedAt, 137 &obj.ExpiresAt, 138 139 &obj.Status, // TODO: fix encoding 140 &obj.SegmentCount, 141 142 &obj.EncryptedMetadataNonce, 143 &obj.EncryptedMetadata, 144 &obj.EncryptedMetadataEncryptedKey, 145 146 &obj.TotalPlainSize, 147 &obj.TotalEncryptedSize, 148 &obj.FixedSegmentSize, 149 150 encryptionParameters{&obj.Encryption}, 151 &obj.ZombieDeletionDeadline, 152 ) 153 if err != nil { 154 return nil, Error.New("testingGetAllObjects scan failed: %w", err) 155 } 156 objs = append(objs, obj) 157 } 158 if err := rows.Err(); err != nil { 159 return nil, Error.New("testingGetAllObjects scan failed: %w", err) 160 } 161 162 if len(objs) == 0 { 163 return nil, nil 164 } 165 return objs, nil 166} 167 168// testingGetAllSegments returns the state of the database. 169func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err error) { 170 segs := []RawSegment{} 171 172 rows, err := db.db.QueryContext(ctx, ` 173 SELECT 174 stream_id, position, 175 created_at, repaired_at, expires_at, 176 root_piece_id, encrypted_key_nonce, encrypted_key, 177 encrypted_size, 178 plain_offset, plain_size, 179 encrypted_etag, 180 redundancy, 181 inline_data, remote_alias_pieces, 182 placement 183 FROM segments 184 ORDER BY stream_id ASC, position ASC 185 `) 186 if err != nil { 187 return nil, Error.New("testingGetAllSegments query: %w", err) 188 } 189 defer func() { err = errs.Combine(err, rows.Close()) }() 190 for rows.Next() { 191 var seg RawSegment 192 var aliasPieces AliasPieces 193 err := rows.Scan( 194 &seg.StreamID, 195 &seg.Position, 196 197 &seg.CreatedAt, 198 &seg.RepairedAt, 199 &seg.ExpiresAt, 200 201 &seg.RootPieceID, 202 &seg.EncryptedKeyNonce, 203 &seg.EncryptedKey, 204 205 &seg.EncryptedSize, 206 &seg.PlainOffset, 207 &seg.PlainSize, 208 &seg.EncryptedETag, 209 210 redundancyScheme{&seg.Redundancy}, 211 212 &seg.InlineData, 213 &aliasPieces, 214 &seg.Placement, 215 ) 216 if err != nil { 217 return nil, Error.New("testingGetAllSegments scan failed: %w", err) 218 } 219 220 seg.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) 221 if err != nil { 222 return nil, Error.New("testingGetAllSegments convert aliases to pieces failed: %w", err) 223 } 224 225 segs = append(segs, seg) 226 } 227 if err := rows.Err(); err != nil { 228 return nil, Error.New("testingGetAllSegments scan failed: %w", err) 229 } 230 231 if len(segs) == 0 { 232 return nil, nil 233 } 234 return segs, nil 235} 236