1// Copyright (C) 2020 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package uplink 5 6import ( 7 "context" 8 9 "github.com/zeebo/errs" 10 11 "storj.io/uplink/private/metaclient" 12 "storj.io/uplink/private/storage/streams" 13 "storj.io/uplink/private/stream" 14) 15 16// DownloadOptions contains additional options for downloading. 17type DownloadOptions struct { 18 // When Offset is negative it will read the suffix of the blob. 19 // Combining negative offset and positive length is not supported. 20 Offset int64 21 // When Length is negative it will read until the end of the blob. 22 Length int64 23} 24 25// DownloadObject starts a download from the specific key. 26func (project *Project) DownloadObject(ctx context.Context, bucket, key string, options *DownloadOptions) (download *Download, err error) { 27 defer mon.Task()(&ctx)(&err) 28 29 if bucket == "" { 30 return nil, errwrapf("%w (%q)", ErrBucketNameInvalid, bucket) 31 } 32 if key == "" { 33 return nil, errwrapf("%w (%q)", ErrObjectKeyInvalid, key) 34 } 35 36 var opts metaclient.DownloadOptions 37 switch { 38 case options == nil: 39 opts.Range = metaclient.StreamRange{ 40 Mode: metaclient.StreamRangeAll, 41 } 42 case options.Offset < 0: 43 if options.Length >= 0 { 44 return nil, packageError.New("suffix requires length to be negative, got %v", options.Length) 45 } 46 opts.Range = metaclient.StreamRange{ 47 Mode: metaclient.StreamRangeSuffix, 48 Suffix: -options.Offset, 49 } 50 case options.Length < 0: 51 opts.Range = metaclient.StreamRange{ 52 Mode: metaclient.StreamRangeStart, 53 Start: options.Offset, 54 } 55 56 default: 57 opts.Range = metaclient.StreamRange{ 58 Mode: metaclient.StreamRangeStartLimit, 59 Start: options.Offset, 60 Limit: options.Offset + options.Length, 61 } 62 } 63 64 // N.B. we always call dbCleanup which closes the db because 65 // closing it earlier has the benefit of returning a connection to 66 // the pool, so we try to do that as early as possible. 67 68 db, err := project.dialMetainfoDB(ctx) 69 if err != nil { 70 return nil, convertKnownErrors(err, bucket, key) 71 } 72 defer func() { err = errs.Combine(err, db.Close()) }() 73 74 // TODO: handle DownloadObject & downloadInfo.ListSegments.More in the same location. 75 // currently this code is rather disjoint. 76 77 objectDownload, err := db.DownloadObject(ctx, bucket, key, opts) 78 if err != nil { 79 return nil, convertKnownErrors(err, bucket, key) 80 } 81 82 // Return the connection to the pool as soon as we can. 83 if err := db.Close(); err != nil { 84 return nil, convertKnownErrors(err, bucket, key) 85 } 86 87 streams, err := project.getStreamsStore(ctx) 88 if err != nil { 89 return nil, convertKnownErrors(err, bucket, key) 90 } 91 92 streamRange := objectDownload.Range 93 return &Download{ 94 streams: streams, 95 download: stream.NewDownloadRange(ctx, objectDownload, streams, streamRange.Start, streamRange.Limit-streamRange.Start), 96 bucket: bucket, 97 object: convertObject(&objectDownload.Object), 98 }, nil 99} 100 101// Download is a download from Storj Network. 102type Download struct { 103 download *stream.Download 104 object *Object 105 bucket string 106 streams *streams.Store 107} 108 109// Info returns the last information about the object. 110func (download *Download) Info() *Object { 111 return download.object 112} 113 114// Read downloads up to len(p) bytes into p from the object's data stream. 115// It returns the number of bytes read (0 <= n <= len(p)) and any error encountered. 116func (download *Download) Read(p []byte) (n int, err error) { 117 n, err = download.download.Read(p) 118 return n, convertKnownErrors(err, download.bucket, download.object.Key) 119} 120 121// Close closes the reader of the download. 122func (download *Download) Close() error { 123 err := errs.Combine( 124 download.download.Close(), 125 download.streams.Close(), 126 ) 127 return convertKnownErrors(err, download.bucket, download.object.Key) 128} 129