1/* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2021 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package minio 19 20import ( 21 "archive/tar" 22 "bufio" 23 "bytes" 24 "context" 25 "fmt" 26 "io" 27 "io/ioutil" 28 "os" 29 "strings" 30 "sync" 31 "time" 32 33 "github.com/klauspost/compress/s2" 34) 35 36// SnowballOptions contains options for PutObjectsSnowball calls. 37type SnowballOptions struct { 38 // Opts is options applied to all objects. 39 Opts PutObjectOptions 40 41 // Processing options: 42 43 // InMemory specifies that all objects should be collected in memory 44 // before they are uploaded. 45 // If false a temporary file will be created. 46 InMemory bool 47 48 // Compress enabled content compression before upload. 49 // Compression will typically reduce memory and network usage, 50 // Compression can safely be enabled with MinIO hosts. 51 Compress bool 52} 53 54// SnowballObject contains information about a single object to be added to the snowball. 55type SnowballObject struct { 56 // Key is the destination key, including prefix. 57 Key string 58 59 // Size is the content size of this object. 60 Size int64 61 62 // Modtime to apply to the object. 63 ModTime time.Time 64 65 // Content of the object. 66 // Exactly 'Size' number of bytes must be provided. 67 Content io.Reader 68 69 // Close will be called when an object has finished processing. 70 // Note that if PutObjectsSnowball returns because of an error, 71 // objects not consumed from the input will NOT have been closed. 72 // Leave as nil for no callback. 73 Close func() 74} 75 76type nopReadSeekCloser struct { 77 io.ReadSeeker 78} 79 80func (n nopReadSeekCloser) Close() error { 81 return nil 82} 83 84// This is available as io.ReadSeekCloser from go1.16 85type readSeekCloser interface { 86 io.Reader 87 io.Closer 88 io.Seeker 89} 90 91// PutObjectsSnowball will put multiple objects with a single put call. 92// A (compressed) TAR file will be created which will contain multiple objects. 93// The key for each object will be used for the destination in the specified bucket. 94// Total size should be < 5TB. 95// This function blocks until 'objs' is closed and the content has been uploaded. 96func (c Client) PutObjectsSnowball(ctx context.Context, bucketName string, opts SnowballOptions, objs <-chan SnowballObject) (err error) { 97 err = opts.Opts.validate() 98 if err != nil { 99 return err 100 } 101 var tmpWriter io.Writer 102 var getTmpReader func() (rc readSeekCloser, sz int64, err error) 103 if opts.InMemory { 104 b := bytes.NewBuffer(nil) 105 tmpWriter = b 106 getTmpReader = func() (readSeekCloser, int64, error) { 107 return nopReadSeekCloser{bytes.NewReader(b.Bytes())}, int64(b.Len()), nil 108 } 109 } else { 110 f, err := ioutil.TempFile("", "s3-putsnowballobjects-*") 111 if err != nil { 112 return err 113 } 114 name := f.Name() 115 tmpWriter = f 116 var once sync.Once 117 defer once.Do(func() { 118 f.Close() 119 }) 120 defer os.Remove(name) 121 getTmpReader = func() (readSeekCloser, int64, error) { 122 once.Do(func() { 123 f.Close() 124 }) 125 f, err := os.Open(name) 126 if err != nil { 127 return nil, 0, err 128 } 129 st, err := f.Stat() 130 if err != nil { 131 return nil, 0, err 132 } 133 return f, st.Size(), nil 134 } 135 } 136 var flush = func() error { return nil } 137 if !opts.Compress { 138 if !opts.InMemory { 139 // Insert buffer for writes. 140 buf := bufio.NewWriterSize(tmpWriter, 1<<20) 141 flush = buf.Flush 142 tmpWriter = buf 143 } 144 } else { 145 s2c := s2.NewWriter(tmpWriter, s2.WriterBetterCompression()) 146 flush = s2c.Close 147 defer s2c.Close() 148 tmpWriter = s2c 149 } 150 t := tar.NewWriter(tmpWriter) 151 152objectLoop: 153 for { 154 select { 155 case <-ctx.Done(): 156 return ctx.Err() 157 case obj, ok := <-objs: 158 if !ok { 159 break objectLoop 160 } 161 162 closeObj := func() {} 163 if obj.Close != nil { 164 closeObj = obj.Close 165 } 166 167 // Trim accidental slash prefix. 168 obj.Key = strings.TrimPrefix(obj.Key, "/") 169 header := tar.Header{ 170 Typeflag: tar.TypeReg, 171 Name: obj.Key, 172 Size: obj.Size, 173 ModTime: obj.ModTime, 174 Format: tar.FormatPAX, 175 } 176 if err := t.WriteHeader(&header); err != nil { 177 closeObj() 178 return err 179 } 180 n, err := io.Copy(t, obj.Content) 181 if err != nil { 182 closeObj() 183 return err 184 } 185 if n != obj.Size { 186 closeObj() 187 return io.ErrUnexpectedEOF 188 } 189 closeObj() 190 } 191 } 192 // Flush tar 193 err = t.Flush() 194 if err != nil { 195 return err 196 } 197 // Flush compression 198 err = flush() 199 if err != nil { 200 return err 201 } 202 if opts.Opts.UserMetadata == nil { 203 opts.Opts.UserMetadata = map[string]string{} 204 } 205 opts.Opts.UserMetadata["X-Amz-Meta-Snowball-Auto-Extract"] = "true" 206 opts.Opts.DisableMultipart = true 207 rc, sz, err := getTmpReader() 208 if err != nil { 209 return err 210 } 211 defer rc.Close() 212 rand := c.random.Uint64() 213 _, err = c.PutObject(ctx, bucketName, fmt.Sprintf("snowball-upload-%x.tar", rand), rc, sz, opts.Opts) 214 return err 215} 216