1/*
2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
3 * Copyright 2021 MinIO, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21	"archive/tar"
22	"bufio"
23	"bytes"
24	"context"
25	"fmt"
26	"io"
27	"io/ioutil"
28	"os"
29	"strings"
30	"sync"
31	"time"
32
33	"github.com/klauspost/compress/s2"
34)
35
36// SnowballOptions contains options for PutObjectsSnowball calls.
37type SnowballOptions struct {
38	// Opts is options applied to all objects.
39	Opts PutObjectOptions
40
41	// Processing options:
42
43	// InMemory specifies that all objects should be collected in memory
44	// before they are uploaded.
45	// If false a temporary file will be created.
46	InMemory bool
47
48	// Compress enabled content compression before upload.
49	// Compression will typically reduce memory and network usage,
50	// Compression can safely be enabled with MinIO hosts.
51	Compress bool
52}
53
54// SnowballObject contains information about a single object to be added to the snowball.
55type SnowballObject struct {
56	// Key is the destination key, including prefix.
57	Key string
58
59	// Size is the content size of this object.
60	Size int64
61
62	// Modtime to apply to the object.
63	ModTime time.Time
64
65	// Content of the object.
66	// Exactly 'Size' number of bytes must be provided.
67	Content io.Reader
68
69	// Close will be called when an object has finished processing.
70	// Note that if PutObjectsSnowball returns because of an error,
71	// objects not consumed from the input will NOT have been closed.
72	// Leave as nil for no callback.
73	Close func()
74}
75
76type nopReadSeekCloser struct {
77	io.ReadSeeker
78}
79
80func (n nopReadSeekCloser) Close() error {
81	return nil
82}
83
84// This is available as io.ReadSeekCloser from go1.16
85type readSeekCloser interface {
86	io.Reader
87	io.Closer
88	io.Seeker
89}
90
91// PutObjectsSnowball will put multiple objects with a single put call.
92// A (compressed) TAR file will be created which will contain multiple objects.
93// The key for each object will be used for the destination in the specified bucket.
94// Total size should be < 5TB.
95// This function blocks until 'objs' is closed and the content has been uploaded.
96func (c Client) PutObjectsSnowball(ctx context.Context, bucketName string, opts SnowballOptions, objs <-chan SnowballObject) (err error) {
97	err = opts.Opts.validate()
98	if err != nil {
99		return err
100	}
101	var tmpWriter io.Writer
102	var getTmpReader func() (rc readSeekCloser, sz int64, err error)
103	if opts.InMemory {
104		b := bytes.NewBuffer(nil)
105		tmpWriter = b
106		getTmpReader = func() (readSeekCloser, int64, error) {
107			return nopReadSeekCloser{bytes.NewReader(b.Bytes())}, int64(b.Len()), nil
108		}
109	} else {
110		f, err := ioutil.TempFile("", "s3-putsnowballobjects-*")
111		if err != nil {
112			return err
113		}
114		name := f.Name()
115		tmpWriter = f
116		var once sync.Once
117		defer once.Do(func() {
118			f.Close()
119		})
120		defer os.Remove(name)
121		getTmpReader = func() (readSeekCloser, int64, error) {
122			once.Do(func() {
123				f.Close()
124			})
125			f, err := os.Open(name)
126			if err != nil {
127				return nil, 0, err
128			}
129			st, err := f.Stat()
130			if err != nil {
131				return nil, 0, err
132			}
133			return f, st.Size(), nil
134		}
135	}
136	var flush = func() error { return nil }
137	if !opts.Compress {
138		if !opts.InMemory {
139			// Insert buffer for writes.
140			buf := bufio.NewWriterSize(tmpWriter, 1<<20)
141			flush = buf.Flush
142			tmpWriter = buf
143		}
144	} else {
145		s2c := s2.NewWriter(tmpWriter, s2.WriterBetterCompression())
146		flush = s2c.Close
147		defer s2c.Close()
148		tmpWriter = s2c
149	}
150	t := tar.NewWriter(tmpWriter)
151
152objectLoop:
153	for {
154		select {
155		case <-ctx.Done():
156			return ctx.Err()
157		case obj, ok := <-objs:
158			if !ok {
159				break objectLoop
160			}
161
162			closeObj := func() {}
163			if obj.Close != nil {
164				closeObj = obj.Close
165			}
166
167			// Trim accidental slash prefix.
168			obj.Key = strings.TrimPrefix(obj.Key, "/")
169			header := tar.Header{
170				Typeflag: tar.TypeReg,
171				Name:     obj.Key,
172				Size:     obj.Size,
173				ModTime:  obj.ModTime,
174				Format:   tar.FormatPAX,
175			}
176			if err := t.WriteHeader(&header); err != nil {
177				closeObj()
178				return err
179			}
180			n, err := io.Copy(t, obj.Content)
181			if err != nil {
182				closeObj()
183				return err
184			}
185			if n != obj.Size {
186				closeObj()
187				return io.ErrUnexpectedEOF
188			}
189			closeObj()
190		}
191	}
192	// Flush tar
193	err = t.Flush()
194	if err != nil {
195		return err
196	}
197	// Flush compression
198	err = flush()
199	if err != nil {
200		return err
201	}
202	if opts.Opts.UserMetadata == nil {
203		opts.Opts.UserMetadata = map[string]string{}
204	}
205	opts.Opts.UserMetadata["X-Amz-Meta-Snowball-Auto-Extract"] = "true"
206	opts.Opts.DisableMultipart = true
207	rc, sz, err := getTmpReader()
208	if err != nil {
209		return err
210	}
211	defer rc.Close()
212	rand := c.random.Uint64()
213	_, err = c.PutObject(ctx, bucketName, fmt.Sprintf("snowball-upload-%x.tar", rand), rc, sz, opts.Opts)
214	return err
215}
216