1// Copyright (c) 2015-2021 MinIO, Inc.
2//
3// This file is part of MinIO Object Storage stack
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18package cmd
19
20import (
21	"bytes"
22	"context"
23	"crypto/tls"
24	"encoding/json"
25	"errors"
26	"fmt"
27	"hash/fnv"
28	"io"
29	"net"
30	"net/http"
31	"net/url"
32	"os"
33	"path"
34	"path/filepath"
35	"regexp"
36	"strings"
37	"sync"
38	"time"
39
40	"github.com/minio/mc/pkg/httptracer"
41	"github.com/minio/mc/pkg/probe"
42	minio "github.com/minio/minio-go/v7"
43	"github.com/minio/minio-go/v7/pkg/credentials"
44	"github.com/minio/minio-go/v7/pkg/encrypt"
45	"github.com/minio/minio-go/v7/pkg/lifecycle"
46	"github.com/minio/minio-go/v7/pkg/notification"
47	"github.com/minio/minio-go/v7/pkg/policy"
48	"github.com/minio/minio-go/v7/pkg/replication"
49	"github.com/minio/minio-go/v7/pkg/sse"
50
51	"github.com/minio/minio-go/v7/pkg/s3utils"
52	"github.com/minio/minio-go/v7/pkg/tags"
53	"github.com/minio/pkg/mimedb"
54)
55
56// S3Client construct
57type S3Client struct {
58	sync.Mutex
59	targetURL    *ClientURL
60	api          *minio.Client
61	virtualStyle bool
62}
63
64const (
65	amazonHostNameAccelerated = "s3-accelerate.amazonaws.com"
66	googleHostName            = "storage.googleapis.com"
67	serverEncryptionKeyPrefix = "x-amz-server-side-encryption"
68
69	defaultRecordDelimiter = "\n"
70	defaultFieldDelimiter  = ","
71)
72
73const (
74	recordDelimiterType      = "recorddelimiter"
75	fieldDelimiterType       = "fielddelimiter"
76	quoteCharacterType       = "quotechar"
77	quoteEscapeCharacterType = "quoteescchar"
78	quoteFieldsType          = "quotefields"
79	fileHeaderType           = "fileheader"
80	commentCharType          = "commentchar"
81	typeJSONType             = "type"
82	// AmzObjectLockMode sets object lock mode
83	AmzObjectLockMode = "X-Amz-Object-Lock-Mode"
84	// AmzObjectLockRetainUntilDate sets object lock retain until date
85	AmzObjectLockRetainUntilDate = "X-Amz-Object-Lock-Retain-Until-Date"
86	// AmzObjectLockLegalHold sets object lock legal hold
87	AmzObjectLockLegalHold = "X-Amz-Object-Lock-Legal-Hold"
88)
89
90var timeSentinel = time.Unix(0, 0).UTC()
91
92// newFactory encloses New function with client cache.
93func newFactory() func(config *Config) (Client, *probe.Error) {
94	clientCache := make(map[uint32]*minio.Client)
95	var mutex sync.Mutex
96
97	// Return New function.
98	return func(config *Config) (Client, *probe.Error) {
99		// Creates a parsed URL.
100		targetURL := newClientURL(config.HostURL)
101		// By default enable HTTPs.
102		useTLS := true
103		if targetURL.Scheme == "http" {
104			useTLS = false
105		}
106
107		// Instantiate s3
108		s3Clnt := &S3Client{}
109		// Save the target URL.
110		s3Clnt.targetURL = targetURL
111
112		// Save if target supports virtual host style.
113		hostName := targetURL.Host
114		s3Clnt.virtualStyle = isVirtualHostStyle(hostName, config.Lookup)
115		isS3AcceleratedEndpoint := isAmazonAccelerated(hostName)
116
117		if s3Clnt.virtualStyle {
118			// If Google URL replace it with 'storage.googleapis.com'
119			if isGoogle(hostName) {
120				hostName = googleHostName
121			}
122		}
123		// Generate a hash out of s3Conf.
124		confHash := fnv.New32a()
125		confHash.Write([]byte(hostName + config.AccessKey + config.SecretKey + config.SessionToken))
126		confSum := confHash.Sum32()
127
128		// Lookup previous cache by hash.
129		mutex.Lock()
130		defer mutex.Unlock()
131		var api *minio.Client
132		var found bool
133		if api, found = clientCache[confSum]; !found {
134			// if Signature version '4' use NewV4 directly.
135			creds := credentials.NewStaticV4(config.AccessKey, config.SecretKey, config.SessionToken)
136			// if Signature version '2' use NewV2 directly.
137			if strings.ToUpper(config.Signature) == "S3V2" {
138				creds = credentials.NewStaticV2(config.AccessKey, config.SecretKey, "")
139			}
140
141			var transport http.RoundTripper
142
143			if config.Transport != nil {
144				transport = config.Transport
145			} else {
146				tr := &http.Transport{
147					Proxy: http.ProxyFromEnvironment,
148					DialContext: (&net.Dialer{
149						Timeout:   10 * time.Second,
150						KeepAlive: 15 * time.Second,
151					}).DialContext,
152					MaxIdleConnsPerHost:   256,
153					IdleConnTimeout:       90 * time.Second,
154					TLSHandshakeTimeout:   10 * time.Second,
155					ExpectContinueTimeout: 10 * time.Second,
156					// Set this value so that the underlying transport round-tripper
157					// doesn't try to auto decode the body of objects with
158					// content-encoding set to `gzip`.
159					//
160					// Refer:
161					//    https://golang.org/src/net/http/transport.go?h=roundTrip#L1843
162					DisableCompression: true,
163				}
164				if useTLS {
165					// Keep TLS config.
166					tlsConfig := &tls.Config{
167						RootCAs: globalRootCAs,
168						// Can't use SSLv3 because of POODLE and BEAST
169						// Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher
170						// Can't use TLSv1.1 because of RC4 cipher usage
171						MinVersion: tls.VersionTLS12,
172					}
173					if config.Insecure {
174						tlsConfig.InsecureSkipVerify = true
175					}
176					tr.TLSClientConfig = tlsConfig
177
178					// Because we create a custom TLSClientConfig, we have to opt-in to HTTP/2.
179					// See https://github.com/golang/go/issues/14275
180					//
181					// TODO: Enable http2.0 when upstream issues related to HTTP/2 are fixed.
182					//
183					// if e = http2.ConfigureTransport(tr); e != nil {
184					// 	return nil, probe.NewError(e)
185					// }
186				}
187				transport = tr
188			}
189
190			if config.Debug {
191				if strings.EqualFold(config.Signature, "S3v4") {
192					transport = httptracer.GetNewTraceTransport(newTraceV4(), transport)
193				} else if strings.EqualFold(config.Signature, "S3v2") {
194					transport = httptracer.GetNewTraceTransport(newTraceV2(), transport)
195				}
196			}
197
198			// Not found. Instantiate a new MinIO
199			var e error
200
201			options := minio.Options{
202				Creds:        creds,
203				Secure:       useTLS,
204				Region:       os.Getenv("MC_REGION"),
205				BucketLookup: config.Lookup,
206				Transport:    transport,
207			}
208
209			api, e = minio.New(hostName, &options)
210			if e != nil {
211				return nil, probe.NewError(e)
212			}
213
214			// If Amazon Accelerated URL is requested enable it.
215			if isS3AcceleratedEndpoint {
216				api.SetS3TransferAccelerate(amazonHostNameAccelerated)
217			}
218
219			// Set app info.
220			api.SetAppInfo(config.AppName, config.AppVersion)
221
222			// Cache the new MinIO Client with hash of config as key.
223			clientCache[confSum] = api
224		}
225
226		// Store the new api object.
227		s3Clnt.api = api
228
229		return s3Clnt, nil
230	}
231}
232
233// S3New returns an initialized S3Client structure. If debug is enabled,
234// it also enables an internal trace transport.
235var S3New = newFactory()
236
237// GetURL get url.
238func (c *S3Client) GetURL() ClientURL {
239	return c.targetURL.Clone()
240}
241
242// AddNotificationConfig - Add bucket notification
243func (c *S3Client) AddNotificationConfig(ctx context.Context, arn string, events []string, prefix, suffix string, ignoreExisting bool) *probe.Error {
244	bucket, _ := c.url2BucketAndObject()
245	// Validate total fields in ARN.
246	fields := strings.Split(arn, ":")
247	if len(fields) != 6 {
248		return errInvalidArgument()
249	}
250
251	// Get any enabled notification.
252	mb, e := c.api.GetBucketNotification(ctx, bucket)
253	if e != nil {
254		return probe.NewError(e)
255	}
256
257	accountArn := notification.NewArn(fields[1], fields[2], fields[3], fields[4], fields[5])
258	nc := notification.NewConfig(accountArn)
259
260	// Configure events
261	for _, event := range events {
262		switch event {
263		case "put":
264			nc.AddEvents(notification.ObjectCreatedAll)
265		case "delete":
266			nc.AddEvents(notification.ObjectRemovedAll)
267		case "get":
268			nc.AddEvents(notification.ObjectAccessedAll)
269		case "replica":
270			nc.AddEvents(notification.EventType("s3:Replication:*"))
271		case "ilm":
272			nc.AddEvents(notification.EventType("s3:ObjectRestore:*"))
273			nc.AddEvents(notification.EventType("s3:ObjectTransition:*"))
274		default:
275			return errInvalidArgument().Trace(events...)
276		}
277	}
278	if prefix != "" {
279		nc.AddFilterPrefix(prefix)
280	}
281	if suffix != "" {
282		nc.AddFilterSuffix(suffix)
283	}
284
285	switch fields[2] {
286	case "sns":
287		if !mb.AddTopic(nc) {
288			return errInvalidArgument().Trace("Overlapping Topic configs")
289		}
290	case "sqs":
291		if !mb.AddQueue(nc) {
292			return errInvalidArgument().Trace("Overlapping Queue configs")
293		}
294	case "lambda":
295		if !mb.AddLambda(nc) {
296			return errInvalidArgument().Trace("Overlapping lambda configs")
297		}
298	default:
299		return errInvalidArgument().Trace(fields[2])
300	}
301
302	// Set the new bucket configuration
303	if err := c.api.SetBucketNotification(ctx, bucket, mb); err != nil {
304		if ignoreExisting && strings.Contains(err.Error(), "An object key name filtering rule defined with overlapping prefixes, overlapping suffixes, or overlapping combinations of prefixes and suffixes for the same event types") {
305			return nil
306		}
307		return probe.NewError(err)
308	}
309	return nil
310}
311
312// RemoveNotificationConfig - Remove bucket notification
313func (c *S3Client) RemoveNotificationConfig(ctx context.Context, arn string, event string, prefix string, suffix string) *probe.Error {
314	bucket, _ := c.url2BucketAndObject()
315	// Remove all notification configs if arn is empty
316	if arn == "" {
317		if err := c.api.RemoveAllBucketNotification(ctx, bucket); err != nil {
318			return probe.NewError(err)
319		}
320		return nil
321	}
322
323	mb, e := c.api.GetBucketNotification(ctx, bucket)
324	if e != nil {
325		return probe.NewError(e)
326	}
327
328	fields := strings.Split(arn, ":")
329	if len(fields) != 6 {
330		return errInvalidArgument().Trace(fields...)
331	}
332	accountArn := notification.NewArn(fields[1], fields[2], fields[3], fields[4], fields[5])
333
334	// if we are passed filters for either events, suffix or prefix, then only delete the single event that matches
335	// the arguments
336	if event != "" || suffix != "" || prefix != "" {
337		// Translate events to type events for comparison
338		events := strings.Split(event, ",")
339		var eventsTyped []notification.EventType
340		for _, e := range events {
341			switch e {
342			case "put":
343				eventsTyped = append(eventsTyped, notification.ObjectCreatedAll)
344			case "delete":
345				eventsTyped = append(eventsTyped, notification.ObjectRemovedAll)
346			case "get":
347				eventsTyped = append(eventsTyped, notification.ObjectAccessedAll)
348			case "replica":
349				eventsTyped = append(eventsTyped, notification.EventType("s3:Replication:*"))
350			case "ilm":
351				eventsTyped = append(eventsTyped, notification.EventType("s3:ObjectRestore:*"))
352				eventsTyped = append(eventsTyped, notification.EventType("s3:ObjectTransition:*"))
353			default:
354				return errInvalidArgument().Trace(events...)
355			}
356		}
357		var err error
358		// based on the arn type, we'll look for the event in the corresponding sublist and delete it if there's a match
359		switch fields[2] {
360		case "sns":
361			err = mb.RemoveTopicByArnEventsPrefixSuffix(accountArn, eventsTyped, prefix, suffix)
362		case "sqs":
363			err = mb.RemoveQueueByArnEventsPrefixSuffix(accountArn, eventsTyped, prefix, suffix)
364		case "lambda":
365			err = mb.RemoveLambdaByArnEventsPrefixSuffix(accountArn, eventsTyped, prefix, suffix)
366		default:
367			return errInvalidArgument().Trace(fields[2])
368		}
369		if err != nil {
370			return probe.NewError(err)
371		}
372
373	} else {
374		// remove all events for matching arn
375		switch fields[2] {
376		case "sns":
377			mb.RemoveTopicByArn(accountArn)
378		case "sqs":
379			mb.RemoveQueueByArn(accountArn)
380		case "lambda":
381			mb.RemoveLambdaByArn(accountArn)
382		default:
383			return errInvalidArgument().Trace(fields[2])
384		}
385	}
386
387	// Set the new bucket configuration
388	if e := c.api.SetBucketNotification(ctx, bucket, mb); e != nil {
389		return probe.NewError(e)
390	}
391	return nil
392}
393
394// NotificationConfig notification config
395type NotificationConfig struct {
396	ID     string   `json:"id"`
397	Arn    string   `json:"arn"`
398	Events []string `json:"events"`
399	Prefix string   `json:"prefix"`
400	Suffix string   `json:"suffix"`
401}
402
403// ListNotificationConfigs - List notification configs
404func (c *S3Client) ListNotificationConfigs(ctx context.Context, arn string) ([]NotificationConfig, *probe.Error) {
405	var configs []NotificationConfig
406	bucket, _ := c.url2BucketAndObject()
407	mb, e := c.api.GetBucketNotification(ctx, bucket)
408	if e != nil {
409		return nil, probe.NewError(e)
410	}
411
412	// Generate pretty event names from event types
413	prettyEventNames := func(eventsTypes []notification.EventType) []string {
414		var result []string
415		for _, eventType := range eventsTypes {
416			result = append(result, string(eventType))
417		}
418		return result
419	}
420
421	getFilters := func(config notification.Config) (prefix, suffix string) {
422		if config.Filter == nil {
423			return
424		}
425		for _, filter := range config.Filter.S3Key.FilterRules {
426			if strings.ToLower(filter.Name) == "prefix" {
427				prefix = filter.Value
428			}
429			if strings.ToLower(filter.Name) == "suffix" {
430				suffix = filter.Value
431			}
432
433		}
434		return prefix, suffix
435	}
436
437	for _, config := range mb.TopicConfigs {
438		if arn != "" && config.Topic != arn {
439			continue
440		}
441		prefix, suffix := getFilters(config.Config)
442		configs = append(configs, NotificationConfig{ID: config.ID,
443			Arn:    config.Topic,
444			Events: prettyEventNames(config.Events),
445			Prefix: prefix,
446			Suffix: suffix})
447	}
448
449	for _, config := range mb.QueueConfigs {
450		if arn != "" && config.Queue != arn {
451			continue
452		}
453		prefix, suffix := getFilters(config.Config)
454		configs = append(configs, NotificationConfig{ID: config.ID,
455			Arn:    config.Queue,
456			Events: prettyEventNames(config.Events),
457			Prefix: prefix,
458			Suffix: suffix})
459	}
460
461	for _, config := range mb.LambdaConfigs {
462		if arn != "" && config.Lambda != arn {
463			continue
464		}
465		prefix, suffix := getFilters(config.Config)
466		configs = append(configs, NotificationConfig{ID: config.ID,
467			Arn:    config.Lambda,
468			Events: prettyEventNames(config.Events),
469			Prefix: prefix,
470			Suffix: suffix})
471	}
472
473	return configs, nil
474}
475
476// Supported content types
477var supportedContentTypes = []string{
478	"csv",
479	"json",
480	"gzip",
481	"bzip2",
482}
483
484// set the SelectObjectOutputSerialization struct using options passed in by client. If unspecified,
485// default S3 API specified defaults
486func selectObjectOutputOpts(selOpts SelectObjectOpts, i minio.SelectObjectInputSerialization) minio.SelectObjectOutputSerialization {
487	var isOK bool
488	var recDelim, fldDelim, quoteChar, quoteEscChar, qf string
489
490	o := minio.SelectObjectOutputSerialization{}
491	if _, ok := selOpts.OutputSerOpts["json"]; ok {
492		jo := minio.JSONOutputOptions{}
493		if recDelim, isOK = selOpts.OutputSerOpts["json"][recordDelimiterType]; !isOK {
494			recDelim = "\n"
495		}
496		jo.SetRecordDelimiter(recDelim)
497		o.JSON = &jo
498	}
499	if _, ok := selOpts.OutputSerOpts["csv"]; ok {
500		ocsv := minio.CSVOutputOptions{}
501		if recDelim, isOK = selOpts.OutputSerOpts["csv"][recordDelimiterType]; !isOK {
502			recDelim = defaultRecordDelimiter
503		}
504		ocsv.SetRecordDelimiter(recDelim)
505		if fldDelim, isOK = selOpts.OutputSerOpts["csv"][fieldDelimiterType]; !isOK {
506			fldDelim = defaultFieldDelimiter
507		}
508		ocsv.SetFieldDelimiter(fldDelim)
509		if quoteChar, isOK = selOpts.OutputSerOpts["csv"][quoteCharacterType]; isOK {
510			ocsv.SetQuoteCharacter(quoteChar)
511		}
512		if quoteEscChar, isOK = selOpts.OutputSerOpts["csv"][quoteEscapeCharacterType]; isOK {
513			ocsv.SetQuoteEscapeCharacter(quoteEscChar)
514		}
515		if qf, isOK = selOpts.OutputSerOpts["csv"][quoteFieldsType]; isOK {
516			ocsv.SetQuoteFields(minio.CSVQuoteFields(qf))
517		}
518		o.CSV = &ocsv
519	}
520	// default to CSV output if options left unspecified
521	if o.CSV == nil && o.JSON == nil {
522		if i.JSON != nil {
523			j := minio.JSONOutputOptions{}
524			j.SetRecordDelimiter("\n")
525			o.JSON = &j
526		} else {
527			ocsv := minio.CSVOutputOptions{}
528			ocsv.SetRecordDelimiter(defaultRecordDelimiter)
529			ocsv.SetFieldDelimiter(defaultFieldDelimiter)
530			o.CSV = &ocsv
531		}
532	}
533	return o
534}
535
536func trimCompressionFileExts(name string) string {
537	return strings.TrimSuffix(strings.TrimSuffix(strings.TrimSuffix(name, ".gz"), ".bz"), ".bz2")
538}
539
540// set the SelectObjectInputSerialization struct using options passed in by client. If unspecified,
541// default S3 API specified defaults
542func selectObjectInputOpts(selOpts SelectObjectOpts, object string) minio.SelectObjectInputSerialization {
543	var isOK bool
544	var recDelim, fldDelim, quoteChar, quoteEscChar, fileHeader, commentChar, typ string
545
546	i := minio.SelectObjectInputSerialization{}
547	if _, ok := selOpts.InputSerOpts["parquet"]; ok {
548		iparquet := minio.ParquetInputOptions{}
549		i.Parquet = &iparquet
550	}
551	if _, ok := selOpts.InputSerOpts["json"]; ok {
552		j := minio.JSONInputOptions{}
553		if typ = selOpts.InputSerOpts["json"][typeJSONType]; typ != "" {
554			j.SetType(minio.JSONType(typ))
555		}
556		i.JSON = &j
557	}
558	if _, ok := selOpts.InputSerOpts["csv"]; ok {
559		icsv := minio.CSVInputOptions{}
560		icsv.SetRecordDelimiter(defaultRecordDelimiter)
561		if recDelim, isOK = selOpts.InputSerOpts["csv"][recordDelimiterType]; isOK {
562			icsv.SetRecordDelimiter(recDelim)
563		}
564		if fldDelim, isOK = selOpts.InputSerOpts["csv"][fieldDelimiterType]; isOK {
565			icsv.SetFieldDelimiter(fldDelim)
566		}
567		if quoteChar, isOK = selOpts.InputSerOpts["csv"][quoteCharacterType]; isOK {
568			icsv.SetQuoteCharacter(quoteChar)
569		}
570		if quoteEscChar, isOK = selOpts.InputSerOpts["csv"][quoteEscapeCharacterType]; isOK {
571			icsv.SetQuoteEscapeCharacter(quoteEscChar)
572		}
573		if fileHeader, isOK = selOpts.InputSerOpts["csv"][fileHeaderType]; isOK {
574			icsv.SetFileHeaderInfo(minio.CSVFileHeaderInfo(fileHeader))
575		}
576		if commentChar, isOK = selOpts.InputSerOpts["csv"][commentCharType]; isOK {
577			icsv.SetComments(commentChar)
578		}
579		i.CSV = &icsv
580	}
581	if i.CSV == nil && i.JSON == nil && i.Parquet == nil {
582		ext := filepath.Ext(trimCompressionFileExts(object))
583		if strings.Contains(ext, "csv") {
584			icsv := minio.CSVInputOptions{}
585			icsv.SetRecordDelimiter(defaultRecordDelimiter)
586			icsv.SetFieldDelimiter(defaultFieldDelimiter)
587			icsv.SetFileHeaderInfo(minio.CSVFileHeaderInfoUse)
588			i.CSV = &icsv
589		}
590		if strings.Contains(ext, "parquet") || strings.Contains(object, ".parquet") {
591			iparquet := minio.ParquetInputOptions{}
592			i.Parquet = &iparquet
593		}
594		if strings.Contains(ext, "json") {
595			ijson := minio.JSONInputOptions{}
596			ijson.SetType(minio.JSONLinesType)
597			i.JSON = &ijson
598		}
599	}
600	if i.CompressionType == "" {
601		i.CompressionType = selectCompressionType(selOpts, object)
602	}
603	return i
604}
605
606// get client specified compression type or default compression type from file extension
607func selectCompressionType(selOpts SelectObjectOpts, object string) minio.SelectCompressionType {
608	ext := filepath.Ext(object)
609	contentType := mimedb.TypeByExtension(ext)
610
611	if selOpts.CompressionType != "" {
612		return selOpts.CompressionType
613	}
614	if strings.Contains(ext, "parquet") || strings.Contains(object, ".parquet") {
615		return minio.SelectCompressionNONE
616	}
617	if contentType != "" {
618		if strings.Contains(contentType, "gzip") {
619			return minio.SelectCompressionGZIP
620		} else if strings.Contains(contentType, "bzip") {
621			return minio.SelectCompressionBZIP
622		}
623	}
624	return minio.SelectCompressionNONE
625}
626
627// Select - select object content wrapper.
628func (c *S3Client) Select(ctx context.Context, expression string, sse encrypt.ServerSide, selOpts SelectObjectOpts) (io.ReadCloser, *probe.Error) {
629	opts := minio.SelectObjectOptions{
630		Expression:     expression,
631		ExpressionType: minio.QueryExpressionTypeSQL,
632		// Set any encryption headers
633		ServerSideEncryption: sse,
634	}
635
636	bucket, object := c.url2BucketAndObject()
637
638	opts.InputSerialization = selectObjectInputOpts(selOpts, object)
639	opts.OutputSerialization = selectObjectOutputOpts(selOpts, opts.InputSerialization)
640	reader, e := c.api.SelectObjectContent(ctx, bucket, object, opts)
641	if e != nil {
642		return nil, probe.NewError(e)
643	}
644	return reader, nil
645}
646
647func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo {
648	var eventsInfo = make([]EventInfo, len(ninfo.Records))
649	for i, record := range ninfo.Records {
650		bucketName := record.S3.Bucket.Name
651		var key string
652		// Unescape only if needed, look for URL encoded content.
653		if strings.Contains(record.S3.Object.Key, "%2F") {
654			var e error
655			key, e = url.QueryUnescape(record.S3.Object.Key)
656			if e != nil {
657				key = record.S3.Object.Key
658			}
659		} else {
660			key = record.S3.Object.Key
661		}
662		u := c.targetURL.Clone()
663		u.Path = path.Join(string(u.Separator), bucketName, key)
664		if strings.HasPrefix(record.EventName, "s3:ObjectCreated:") {
665			if strings.HasPrefix(record.EventName, "s3:ObjectCreated:Copy") {
666				eventsInfo[i] = EventInfo{
667					Time:         record.EventTime,
668					Size:         record.S3.Object.Size,
669					UserMetadata: record.S3.Object.UserMetadata,
670					Path:         u.String(),
671					Type:         notification.ObjectCreatedCopy,
672					Host:         record.Source.Host,
673					Port:         record.Source.Port,
674					UserAgent:    record.Source.UserAgent,
675				}
676			} else if strings.HasPrefix(record.EventName, "s3:ObjectCreated:PutRetention") {
677				eventsInfo[i] = EventInfo{
678					Time:         record.EventTime,
679					Size:         record.S3.Object.Size,
680					UserMetadata: record.S3.Object.UserMetadata,
681					Path:         u.String(),
682					Type:         notification.EventType("s3:ObjectCreated:PutRetention"),
683					Host:         record.Source.Host,
684					Port:         record.Source.Port,
685					UserAgent:    record.Source.UserAgent,
686				}
687			} else if strings.HasPrefix(record.EventName, "s3:ObjectCreated:PutLegalHold") {
688				eventsInfo[i] = EventInfo{
689					Time:         record.EventTime,
690					Size:         record.S3.Object.Size,
691					UserMetadata: record.S3.Object.UserMetadata,
692					Path:         u.String(),
693					Type:         notification.EventType("s3:ObjectCreated:PutLegalHold"),
694					Host:         record.Source.Host,
695					Port:         record.Source.Port,
696					UserAgent:    record.Source.UserAgent,
697				}
698			} else {
699				eventsInfo[i] = EventInfo{
700					Time:         record.EventTime,
701					Size:         record.S3.Object.Size,
702					UserMetadata: record.S3.Object.UserMetadata,
703					Path:         u.String(),
704					Type:         notification.ObjectCreatedPut,
705					Host:         record.Source.Host,
706					Port:         record.Source.Port,
707					UserAgent:    record.Source.UserAgent,
708				}
709			}
710		} else {
711			eventsInfo[i] = EventInfo{
712				Time:         record.EventTime,
713				Size:         record.S3.Object.Size,
714				UserMetadata: record.S3.Object.UserMetadata,
715				Path:         u.String(),
716				Type:         notification.EventType(record.EventName),
717				Host:         record.Source.Host,
718				Port:         record.Source.Port,
719				UserAgent:    record.Source.UserAgent,
720			}
721		}
722	}
723	return eventsInfo
724}
725
726// Watch - Start watching on all bucket events for a given account ID.
727func (c *S3Client) Watch(ctx context.Context, options WatchOptions) (*WatchObject, *probe.Error) {
728	// Extract bucket and object.
729	bucket, object := c.url2BucketAndObject()
730
731	// Validation
732	if bucket == "" && object != "" {
733		return nil, errInvalidArgument().Trace(bucket, object)
734	}
735	if object != "" && options.Prefix != "" {
736		return nil, errInvalidArgument().Trace(options.Prefix, object)
737	}
738
739	// Flag set to set the notification.
740	var events []string
741	for _, event := range options.Events {
742		switch event {
743		case "put":
744			events = append(events, string(notification.ObjectCreatedAll))
745		case "delete":
746			events = append(events, string(notification.ObjectRemovedAll))
747		case "get":
748			events = append(events, string(notification.ObjectAccessedAll))
749		case "replica":
750			events = append(events, "s3:Replication:*") // TODO: add it to minio-go as constant
751		case "ilm":
752			events = append(events, "s3:ObjectRestore:*", "s3:ObjectTransition:*") // TODO: add it to minio-go as constant
753		case "bucket-creation":
754			events = append(events, string(notification.BucketCreatedAll))
755		case "bucket-removal":
756			events = append(events, string(notification.BucketRemovedAll))
757		default:
758			return nil, errInvalidArgument().Trace(event)
759		}
760	}
761
762	wo := &WatchObject{
763		EventInfoChan: make(chan []EventInfo),
764		ErrorChan:     make(chan *probe.Error),
765		DoneChan:      make(chan struct{}),
766	}
767
768	var eventsCh <-chan notification.Info
769	if bucket != "" {
770		if object != "" && options.Prefix == "" {
771			options.Prefix = object
772		}
773		eventsCh = c.api.ListenBucketNotification(ctx, bucket, options.Prefix, options.Suffix, events)
774	} else {
775		eventsCh = c.api.ListenNotification(ctx, "", "", events)
776	}
777
778	go func() {
779		// Start listening on all bucket events.
780		for notificationInfo := range eventsCh {
781			if notificationInfo.Err != nil {
782				var perr *probe.Error
783				if minio.ToErrorResponse(notificationInfo.Err).Code == "NotImplemented" {
784					perr = probe.NewError(APINotImplemented{
785						API:     "Watch",
786						APIType: c.GetURL().String(),
787					})
788				} else {
789					perr = probe.NewError(notificationInfo.Err)
790				}
791				wo.Errors() <- perr
792			} else {
793				wo.Events() <- c.notificationToEventsInfo(notificationInfo)
794			}
795		}
796
797		close(wo.EventInfoChan)
798		close(wo.ErrorChan)
799	}()
800
801	return wo, nil
802}
803
804// Get - get object with GET options.
805func (c *S3Client) Get(ctx context.Context, opts GetOptions) (io.ReadCloser, *probe.Error) {
806	bucket, object := c.url2BucketAndObject()
807
808	reader, e := c.api.GetObject(ctx, bucket, object,
809		minio.GetObjectOptions{
810			ServerSideEncryption: opts.SSE,
811			VersionID:            opts.VersionID,
812		})
813	if e != nil {
814		errResponse := minio.ToErrorResponse(e)
815		if errResponse.Code == "NoSuchBucket" {
816			return nil, probe.NewError(BucketDoesNotExist{
817				Bucket: bucket,
818			})
819		}
820		if errResponse.Code == "InvalidBucketName" {
821			return nil, probe.NewError(BucketInvalid{
822				Bucket: bucket,
823			})
824		}
825		if errResponse.Code == "NoSuchKey" {
826			return nil, probe.NewError(ObjectMissing{})
827		}
828		return nil, probe.NewError(e)
829	}
830	return reader, nil
831}
832
833// Copy - copy object, uses server side copy API. Also uses an abstracted API
834// such that large file sizes will be copied in multipart manner on server
835// side.
836func (c *S3Client) Copy(ctx context.Context, source string, opts CopyOptions, progress io.Reader) *probe.Error {
837	dstBucket, dstObject := c.url2BucketAndObject()
838	if dstBucket == "" {
839		return probe.NewError(BucketNameEmpty{})
840	}
841
842	metadata := make(map[string]string, len(opts.metadata))
843	for k, v := range opts.metadata {
844		metadata[k] = v
845	}
846
847	delete(metadata, "X-Amz-Storage-Class")
848	if opts.storageClass != "" {
849		metadata["X-Amz-Storage-Class"] = opts.storageClass
850	}
851
852	tokens := splitStr(source, string(c.targetURL.Separator), 3)
853
854	// Source object
855	srcOpts := minio.CopySrcOptions{
856		Bucket:     tokens[1],
857		Object:     tokens[2],
858		Encryption: opts.srcSSE,
859		VersionID:  opts.versionID,
860	}
861
862	destOpts := minio.CopyDestOptions{
863		Bucket:     dstBucket,
864		Object:     dstObject,
865		Encryption: opts.tgtSSE,
866		Progress:   progress,
867		Size:       opts.size,
868	}
869
870	if lockModeStr, ok := metadata[AmzObjectLockMode]; ok {
871		destOpts.Mode = minio.RetentionMode(strings.ToUpper(lockModeStr))
872		delete(metadata, AmzObjectLockMode)
873	}
874
875	if retainUntilDateStr, ok := metadata[AmzObjectLockRetainUntilDate]; ok {
876		delete(metadata, AmzObjectLockRetainUntilDate)
877		if t, e := time.Parse(time.RFC3339, retainUntilDateStr); e == nil {
878			destOpts.RetainUntilDate = t.UTC()
879		}
880	}
881
882	if lh, ok := metadata[AmzObjectLockLegalHold]; ok {
883		destOpts.LegalHold = minio.LegalHoldStatus(lh)
884		delete(metadata, AmzObjectLockLegalHold)
885	}
886
887	// Assign metadata after irrelevant parts are delete above
888	destOpts.UserMetadata = metadata
889	destOpts.ReplaceMetadata = len(metadata) > 0
890
891	var e error
892	if opts.disableMultipart || opts.size < 64*1024*1024 {
893		_, e = c.api.CopyObject(ctx, destOpts, srcOpts)
894	} else {
895		_, e = c.api.ComposeObject(ctx, destOpts, srcOpts)
896	}
897
898	if e != nil {
899		errResponse := minio.ToErrorResponse(e)
900		if errResponse.Code == "AccessDenied" {
901			return probe.NewError(PathInsufficientPermission{
902				Path: c.targetURL.String(),
903			})
904		}
905		if errResponse.Code == "NoSuchBucket" {
906			return probe.NewError(BucketDoesNotExist{
907				Bucket: dstBucket,
908			})
909		}
910		if errResponse.Code == "InvalidBucketName" {
911			return probe.NewError(BucketInvalid{
912				Bucket: dstBucket,
913			})
914		}
915		if errResponse.Code == "NoSuchKey" {
916			return probe.NewError(ObjectMissing{})
917		}
918		return probe.NewError(e)
919	}
920	return nil
921}
922
923// Put - upload an object with custom metadata.
924func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progress io.Reader, putOpts PutOptions) (int64, *probe.Error) {
925	bucket, object := c.url2BucketAndObject()
926	if bucket == "" {
927		return 0, probe.NewError(BucketNameEmpty{})
928	}
929
930	metadata := make(map[string]string, len(putOpts.metadata))
931	for k, v := range putOpts.metadata {
932		metadata[k] = v
933	}
934
935	// Do not copy storage class, it needs to be specified in putOpts
936	delete(metadata, "X-Amz-Storage-Class")
937
938	contentType, ok := metadata["Content-Type"]
939	if ok {
940		delete(metadata, "Content-Type")
941	} else {
942		// Set content-type if not specified.
943		contentType = "application/octet-stream"
944	}
945
946	cacheControl, ok := metadata["Cache-Control"]
947	if ok {
948		delete(metadata, "Cache-Control")
949	}
950
951	contentEncoding, ok := metadata["Content-Encoding"]
952	if ok {
953		delete(metadata, "Content-Encoding")
954	}
955
956	contentDisposition, ok := metadata["Content-Disposition"]
957	if ok {
958		delete(metadata, "Content-Disposition")
959	}
960
961	contentLanguage, ok := metadata["Content-Language"]
962	if ok {
963		delete(metadata, "Content-Language")
964	}
965
966	var tagsMap map[string]string
967	tagsHdr, ok := metadata["X-Amz-Tagging"]
968	if ok {
969		tagsSet, e := tags.Parse(tagsHdr, true)
970		if e != nil {
971			return 0, probe.NewError(e)
972		}
973		tagsMap = tagsSet.ToMap()
974		delete(metadata, "X-Amz-Tagging")
975	}
976
977	lockModeStr, ok := metadata[AmzObjectLockMode]
978	lockMode := minio.RetentionMode("")
979	if ok {
980		lockMode = minio.RetentionMode(strings.ToUpper(lockModeStr))
981		delete(metadata, AmzObjectLockMode)
982	}
983
984	retainUntilDate := timeSentinel
985	retainUntilDateStr, ok := metadata[AmzObjectLockRetainUntilDate]
986	if ok {
987		delete(metadata, AmzObjectLockRetainUntilDate)
988		if t, e := time.Parse(time.RFC3339, retainUntilDateStr); e == nil {
989			retainUntilDate = t.UTC()
990		}
991	}
992
993	opts := minio.PutObjectOptions{
994		UserMetadata:         metadata,
995		UserTags:             tagsMap,
996		Progress:             progress,
997		ContentType:          contentType,
998		CacheControl:         cacheControl,
999		ContentDisposition:   contentDisposition,
1000		ContentEncoding:      contentEncoding,
1001		ContentLanguage:      contentLanguage,
1002		StorageClass:         strings.ToUpper(putOpts.storageClass),
1003		ServerSideEncryption: putOpts.sse,
1004		SendContentMd5:       putOpts.md5,
1005		DisableMultipart:     putOpts.disableMultipart,
1006		PartSize:             putOpts.multipartSize,
1007		NumThreads:           putOpts.multipartThreads,
1008	}
1009
1010	if !retainUntilDate.IsZero() && !retainUntilDate.Equal(timeSentinel) {
1011		opts.RetainUntilDate = retainUntilDate
1012	}
1013
1014	if lockModeStr != "" {
1015		opts.Mode = lockMode
1016		opts.SendContentMd5 = true
1017	}
1018
1019	if lh, ok := metadata[AmzObjectLockLegalHold]; ok {
1020		delete(metadata, AmzObjectLockLegalHold)
1021		opts.LegalHold = minio.LegalHoldStatus(strings.ToUpper(lh))
1022		opts.SendContentMd5 = true
1023	}
1024
1025	ui, e := c.api.PutObject(ctx, bucket, object, reader, size, opts)
1026	if e != nil {
1027		errResponse := minio.ToErrorResponse(e)
1028		if errResponse.Code == "UnexpectedEOF" || e == io.EOF {
1029			return ui.Size, probe.NewError(UnexpectedEOF{
1030				TotalSize:    size,
1031				TotalWritten: ui.Size,
1032			})
1033		}
1034		if errResponse.Code == "AccessDenied" {
1035			return ui.Size, probe.NewError(PathInsufficientPermission{
1036				Path: c.targetURL.String(),
1037			})
1038		}
1039		if errResponse.Code == "MethodNotAllowed" {
1040			return ui.Size, probe.NewError(ObjectAlreadyExists{
1041				Object: object,
1042			})
1043		}
1044		if errResponse.Code == "XMinioObjectExistsAsDirectory" {
1045			return ui.Size, probe.NewError(ObjectAlreadyExistsAsDirectory{
1046				Object: object,
1047			})
1048		}
1049		if errResponse.Code == "NoSuchBucket" {
1050			return ui.Size, probe.NewError(BucketDoesNotExist{
1051				Bucket: bucket,
1052			})
1053		}
1054		if errResponse.Code == "InvalidBucketName" {
1055			return ui.Size, probe.NewError(BucketInvalid{
1056				Bucket: bucket,
1057			})
1058		}
1059		if errResponse.Code == "NoSuchKey" {
1060			return ui.Size, probe.NewError(ObjectMissing{})
1061		}
1062		return ui.Size, probe.NewError(e)
1063	}
1064	return ui.Size, nil
1065}
1066
1067// Remove incomplete uploads.
1068func (c *S3Client) removeIncompleteObjects(ctx context.Context, bucket string, objectsCh <-chan minio.ObjectInfo) <-chan minio.RemoveObjectError {
1069	removeObjectErrorCh := make(chan minio.RemoveObjectError)
1070
1071	// Goroutine reads from objectsCh and sends error to removeObjectErrorCh if any.
1072	go func() {
1073		defer close(removeObjectErrorCh)
1074
1075		for info := range objectsCh {
1076			if err := c.api.RemoveIncompleteUpload(ctx, bucket, info.Key); err != nil {
1077				removeObjectErrorCh <- minio.RemoveObjectError{ObjectName: info.Key, Err: err}
1078			}
1079		}
1080	}()
1081
1082	return removeObjectErrorCh
1083}
1084
1085// AddUserAgent - add custom user agent.
1086func (c *S3Client) AddUserAgent(app string, version string) {
1087	c.api.SetAppInfo(app, version)
1088}
1089
1090// Remove - remove object or bucket(s).
1091func (c *S3Client) Remove(ctx context.Context, isIncomplete, isRemoveBucket, isBypass bool, contentCh <-chan *ClientContent) <-chan *probe.Error {
1092	errorCh := make(chan *probe.Error)
1093
1094	prevBucket := ""
1095	// Maintain objectsCh, statusCh for each bucket
1096	var objectsCh chan minio.ObjectInfo
1097	var statusCh <-chan minio.RemoveObjectError
1098	opts := minio.RemoveObjectsOptions{
1099		GovernanceBypass: isBypass,
1100	}
1101
1102	go func() {
1103		defer close(errorCh)
1104		if isRemoveBucket {
1105			if _, object := c.url2BucketAndObject(); object != "" {
1106				errorCh <- probe.NewError(errors.New(
1107					"use `mc rm` command to delete prefixes, or point your" +
1108						" bucket directly, `mc rb <alias>/<bucket-name>/`"))
1109				return
1110			}
1111		}
1112		for {
1113			select {
1114			case <-ctx.Done():
1115				errorCh <- probe.NewError(ctx.Err())
1116				return
1117			case content, ok := <-contentCh:
1118				if !ok {
1119					goto breakout
1120				}
1121
1122				// Convert content.URL.Path to objectName for objectsCh.
1123				bucket, objectName := c.splitPath(content.URL.Path)
1124				objectVersionID := content.VersionID
1125
1126				// We don't treat path when bucket is
1127				// empty, just skip it when it happens.
1128				if bucket == "" {
1129					continue
1130				}
1131
1132				// Init objectsCh the first time.
1133				if prevBucket == "" {
1134					objectsCh = make(chan minio.ObjectInfo)
1135					prevBucket = bucket
1136					if isIncomplete {
1137						statusCh = c.removeIncompleteObjects(ctx, bucket, objectsCh)
1138					} else {
1139						statusCh = c.api.RemoveObjects(ctx, bucket, objectsCh, opts)
1140					}
1141				}
1142
1143				if prevBucket != bucket {
1144					if objectsCh != nil {
1145						close(objectsCh)
1146					}
1147					for removeStatus := range statusCh {
1148						errorCh <- probe.NewError(removeStatus.Err)
1149					}
1150					// Remove bucket if it qualifies.
1151					if isRemoveBucket && !isIncomplete {
1152						if err := c.api.RemoveBucket(ctx, prevBucket); err != nil {
1153							errorCh <- probe.NewError(err)
1154						}
1155					}
1156					// Re-init objectsCh for next bucket
1157					objectsCh = make(chan minio.ObjectInfo)
1158					if isIncomplete {
1159						statusCh = c.removeIncompleteObjects(ctx, bucket, objectsCh)
1160					} else {
1161						statusCh = c.api.RemoveObjects(ctx, bucket, objectsCh, opts)
1162					}
1163					prevBucket = bucket
1164				}
1165
1166				if objectName != "" {
1167					// Send object name once but continuously checks for pending
1168					// errors in parallel, the reason is that minio-go RemoveObjects
1169					// can block if there is any pending error not received yet.
1170					sent := false
1171					for !sent {
1172						select {
1173						case objectsCh <- minio.ObjectInfo{Key: objectName, VersionID: objectVersionID}:
1174							sent = true
1175						case removeStatus := <-statusCh:
1176							errorCh <- probe.NewError(removeStatus.Err)
1177						}
1178					}
1179				} else {
1180					// end of bucket - close the objectsCh
1181					if objectsCh != nil {
1182						close(objectsCh)
1183					}
1184					objectsCh = nil
1185				}
1186			}
1187		}
1188
1189	breakout:
1190		// Close objectsCh at end of contentCh
1191		if objectsCh != nil {
1192			close(objectsCh)
1193		}
1194		// Write remove objects status to errorCh
1195		if statusCh != nil {
1196			for removeStatus := range statusCh {
1197				// If the removeStatus error message is:
1198				// "Object is WORM protected and cannot be overwritten",
1199				// it is too generic. We have the object's name and vid.
1200				// Adding the object's name and version id into the error msg
1201				removeStatus.Err = errors.New(strings.Replace(
1202					removeStatus.Err.Error(), "Object is WORM protected",
1203					"Object, '"+removeStatus.ObjectName+" (Version ID="+
1204						removeStatus.VersionID+")' is WORM protected", 1))
1205				errorCh <- probe.NewError(removeStatus.Err)
1206			}
1207		}
1208		// Remove last bucket if it qualifies.
1209		if isRemoveBucket && prevBucket != "" && !isIncomplete {
1210			if err := c.api.RemoveBucket(ctx, prevBucket); err != nil {
1211				errorCh <- probe.NewError(err)
1212			}
1213		}
1214	}()
1215	return errorCh
1216}
1217
1218// MakeBucket - make a new bucket.
1219func (c *S3Client) MakeBucket(ctx context.Context, region string, ignoreExisting, withLock bool) *probe.Error {
1220	bucket, object := c.url2BucketAndObject()
1221	if bucket == "" {
1222		return probe.NewError(BucketNameEmpty{})
1223	}
1224	if object != "" {
1225		if !strings.HasSuffix(object, string(c.targetURL.Separator)) {
1226			object += string(c.targetURL.Separator)
1227		}
1228		var retried bool
1229		for {
1230			_, e := c.api.PutObject(ctx, bucket, object, bytes.NewReader([]byte("")), 0,
1231				// Always send Content-MD5 to succeed with bucket with
1232				// locking enabled. There is no performance hit since
1233				// this is always an empty object
1234				minio.PutObjectOptions{SendContentMd5: true},
1235			)
1236			if e == nil {
1237				return nil
1238			}
1239			if retried {
1240				return probe.NewError(e)
1241			}
1242			switch minio.ToErrorResponse(e).Code {
1243			case "NoSuchBucket":
1244				opts := minio.MakeBucketOptions{Region: region, ObjectLocking: withLock}
1245				if e = c.api.MakeBucket(ctx, bucket, opts); e != nil {
1246					return probe.NewError(e)
1247				}
1248				retried = true
1249				continue
1250			}
1251			return probe.NewError(e)
1252		}
1253	}
1254
1255	var e error
1256	opts := minio.MakeBucketOptions{Region: region, ObjectLocking: withLock}
1257	if e = c.api.MakeBucket(ctx, bucket, opts); e != nil {
1258		// Ignore bucket already existing error when ignoreExisting flag is enabled
1259		if ignoreExisting {
1260			switch minio.ToErrorResponse(e).Code {
1261			case "BucketAlreadyOwnedByYou":
1262				fallthrough
1263			case "BucketAlreadyExists":
1264				return nil
1265			}
1266		}
1267		return probe.NewError(e)
1268	}
1269	return nil
1270}
1271
1272// RemoveBucket removes a bucket, forcibly if asked
1273func (c *S3Client) RemoveBucket(ctx context.Context, forceRemove bool) *probe.Error {
1274	bucket, object := c.url2BucketAndObject()
1275	if bucket == "" {
1276		return probe.NewError(BucketNameEmpty{})
1277	}
1278	if object != "" {
1279		return errInvalidArgument()
1280	}
1281
1282	opts := minio.BucketOptions{ForceDelete: forceRemove}
1283	if e := c.api.RemoveBucketWithOptions(ctx, bucket, opts); e != nil {
1284		return probe.NewError(e)
1285	}
1286	return nil
1287}
1288
1289// GetAccessRules - get configured policies from the server
1290func (c *S3Client) GetAccessRules(ctx context.Context) (map[string]string, *probe.Error) {
1291	bucket, object := c.url2BucketAndObject()
1292	if bucket == "" {
1293		return map[string]string{}, probe.NewError(BucketNameEmpty{})
1294	}
1295	policies := map[string]string{}
1296	policyStr, e := c.api.GetBucketPolicy(ctx, bucket)
1297	if e != nil {
1298		return nil, probe.NewError(e)
1299	}
1300	if policyStr == "" {
1301		return policies, nil
1302	}
1303	var p policy.BucketAccessPolicy
1304	if e = json.Unmarshal([]byte(policyStr), &p); e != nil {
1305		return nil, probe.NewError(e)
1306	}
1307	policyRules := policy.GetPolicies(p.Statements, bucket, object)
1308	// Hide policy data structure at this level
1309	for k, v := range policyRules {
1310		policies[k] = string(v)
1311	}
1312	return policies, nil
1313}
1314
1315// GetAccess get access policy permissions.
1316func (c *S3Client) GetAccess(ctx context.Context) (string, string, *probe.Error) {
1317	bucket, object := c.url2BucketAndObject()
1318	if bucket == "" {
1319		return "", "", probe.NewError(BucketNameEmpty{})
1320	}
1321	policyStr, e := c.api.GetBucketPolicy(ctx, bucket)
1322	if e != nil {
1323		return "", "", probe.NewError(e)
1324	}
1325	if policyStr == "" {
1326		return string(policy.BucketPolicyNone), policyStr, nil
1327	}
1328	var p policy.BucketAccessPolicy
1329	if e = json.Unmarshal([]byte(policyStr), &p); e != nil {
1330		return "", "", probe.NewError(e)
1331	}
1332	pType := string(policy.GetPolicy(p.Statements, bucket, object))
1333	if pType == string(policy.BucketPolicyNone) && policyStr != "" {
1334		pType = "custom"
1335	}
1336	return pType, policyStr, nil
1337}
1338
1339// SetAccess set access policy permissions.
1340func (c *S3Client) SetAccess(ctx context.Context, bucketPolicy string, isJSON bool) *probe.Error {
1341	bucket, object := c.url2BucketAndObject()
1342	if bucket == "" {
1343		return probe.NewError(BucketNameEmpty{})
1344	}
1345	if isJSON {
1346		if e := c.api.SetBucketPolicy(ctx, bucket, bucketPolicy); e != nil {
1347			return probe.NewError(e)
1348		}
1349		return nil
1350	}
1351	policyStr, e := c.api.GetBucketPolicy(ctx, bucket)
1352	if e != nil {
1353		return probe.NewError(e)
1354	}
1355	var p = policy.BucketAccessPolicy{Version: "2012-10-17"}
1356	if policyStr != "" {
1357		if e = json.Unmarshal([]byte(policyStr), &p); e != nil {
1358			return probe.NewError(e)
1359		}
1360	}
1361	p.Statements = policy.SetPolicy(p.Statements, policy.BucketPolicy(bucketPolicy), bucket, object)
1362	if len(p.Statements) == 0 {
1363		if e = c.api.SetBucketPolicy(ctx, bucket, ""); e != nil {
1364			return probe.NewError(e)
1365		}
1366		return nil
1367	}
1368	policyB, e := json.Marshal(p)
1369	if e != nil {
1370		return probe.NewError(e)
1371	}
1372	if e = c.api.SetBucketPolicy(ctx, bucket, string(policyB)); e != nil {
1373		return probe.NewError(e)
1374	}
1375	return nil
1376}
1377
1378// listObjectWrapper - select ObjectList mode depending on arguments
1379func (c *S3Client) listObjectWrapper(ctx context.Context, bucket, object string, isRecursive bool, timeRef time.Time, withVersions, withDeleteMarkers bool, metadata bool, maxKeys int) <-chan minio.ObjectInfo {
1380	if !timeRef.IsZero() || withVersions {
1381		return c.listVersions(ctx, bucket, object, isRecursive, timeRef, withVersions, withDeleteMarkers)
1382	}
1383
1384	if isGoogle(c.targetURL.Host) {
1385		// Google Cloud S3 layer doesn't implement ListObjectsV2 implementation
1386		// https://github.com/minio/mc/issues/3073
1387		return c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, UseV1: true, MaxKeys: maxKeys})
1388	}
1389	return c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, WithMetadata: metadata, MaxKeys: maxKeys})
1390}
1391
1392func (c *S3Client) statIncompleteUpload(ctx context.Context, bucket, object string) (*ClientContent, *probe.Error) {
1393	nonRecursive := false
1394	objectMetadata := &ClientContent{}
1395	// Prefix to pass to minio-go listing in order to fetch a given object/directory
1396	prefix := strings.TrimRight(object, string(c.targetURL.Separator))
1397
1398	for objectMultipartInfo := range c.api.ListIncompleteUploads(ctx, bucket, prefix, nonRecursive) {
1399		if objectMultipartInfo.Err != nil {
1400			return nil, probe.NewError(objectMultipartInfo.Err)
1401		}
1402
1403		if objectMultipartInfo.Key == object {
1404			objectMetadata.URL = c.targetURL.Clone()
1405			objectMetadata.Time = objectMultipartInfo.Initiated
1406			objectMetadata.Size = objectMultipartInfo.Size
1407			objectMetadata.Type = os.FileMode(0664)
1408			objectMetadata.Metadata = map[string]string{}
1409			return objectMetadata, nil
1410		}
1411
1412		if strings.HasSuffix(objectMultipartInfo.Key, string(c.targetURL.Separator)) {
1413			objectMetadata.URL = c.targetURL.Clone()
1414			objectMetadata.Type = os.ModeDir
1415			objectMetadata.Metadata = map[string]string{}
1416			return objectMetadata, nil
1417		}
1418	}
1419	return nil, probe.NewError(ObjectMissing{})
1420}
1421
1422// Stat - send a 'HEAD' on a bucket or object to fetch its metadata. It also returns
1423// a DIR type content if a prefix does exist in the server.
1424func (c *S3Client) Stat(ctx context.Context, opts StatOptions) (*ClientContent, *probe.Error) {
1425	c.Lock()
1426	defer c.Unlock()
1427	bucket, object := c.url2BucketAndObject()
1428
1429	// Bucket name cannot be empty, stat on URL has no meaning.
1430	if bucket == "" {
1431		url := c.targetURL.Clone()
1432		url.Path = string(c.targetURL.Separator)
1433		return &ClientContent{URL: url,
1434			Size: 0,
1435			Type: os.ModeDir,
1436		}, nil
1437	}
1438
1439	if object == "" {
1440		content, err := c.bucketStat(ctx, bucket)
1441		if err != nil {
1442			return nil, err.Trace(bucket)
1443		}
1444		return content, nil
1445	}
1446
1447	// If the request is for incomplete upload stat, handle it here.
1448	if opts.incomplete {
1449		return c.statIncompleteUpload(ctx, bucket, object)
1450	}
1451
1452	// The following code tries to calculate if a given prefix/object does really exist
1453	// using minio-go listing API. The following inputs are supported:
1454	//     - /path/to/existing/object
1455	//     - /path/to/existing_directory
1456	//     - /path/to/existing_directory/
1457	//     - /path/to/empty_directory
1458	//     - /path/to/empty_directory/
1459
1460	// First an HEAD call is issued, this is faster than doing listing even if the object exists
1461	// because the list could be very large. At the same time, the HEAD call is avoided if the
1462	// object already contains a trailing prefix or we passed rewind flag to know the object version
1463	// created just before the rewind parameter.
1464	if !strings.HasSuffix(object, string(c.targetURL.Separator)) && opts.timeRef.IsZero() {
1465		// Issue HEAD request first but ignore no such key error
1466		// so we can check if there is such prefix which exists
1467		ctnt, err := c.getObjectStat(ctx, bucket, object, minio.StatObjectOptions{ServerSideEncryption: opts.sse, VersionID: opts.versionID})
1468		if err == nil {
1469			return ctnt, nil
1470		}
1471
1472		// Ignore object missing error but return for other errors
1473		if !errors.As(err.ToGoError(), &ObjectMissing{}) && !errors.As(err.ToGoError(), &ObjectIsDeleteMarker{}) {
1474			return nil, err
1475		}
1476	}
1477
1478	nonRecursive := false
1479	// Prefix to pass to minio-go listing in order to fetch if a prefix exists
1480	prefix := strings.TrimRight(object, string(c.targetURL.Separator))
1481
1482	for objectStat := range c.listObjectWrapper(ctx, bucket, prefix, nonRecursive, opts.timeRef, false, false, false, 1) {
1483		if objectStat.Err != nil {
1484			return nil, probe.NewError(objectStat.Err)
1485		}
1486
1487		if object == objectStat.Key || object == strings.TrimSuffix(objectStat.Key, string(c.targetURL.Separator)) {
1488			return c.objectInfo2ClientContent(bucket, objectStat), nil
1489		}
1490		break
1491	}
1492
1493	return nil, probe.NewError(ObjectMissing{opts.timeRef})
1494}
1495
1496// getObjectStat returns the metadata of an object from a HEAD call.
1497func (c *S3Client) getObjectStat(ctx context.Context, bucket, object string, opts minio.StatObjectOptions) (*ClientContent, *probe.Error) {
1498	objectStat, e := c.api.StatObject(ctx, bucket, object, opts)
1499	objectMetadata := c.objectInfo2ClientContent(bucket, objectStat)
1500	if e != nil {
1501		errResponse := minio.ToErrorResponse(e)
1502		if errResponse.Code == "AccessDenied" {
1503			return nil, probe.NewError(PathInsufficientPermission{Path: c.targetURL.String()})
1504		}
1505		if errResponse.Code == "NoSuchBucket" {
1506			return nil, probe.NewError(BucketDoesNotExist{
1507				Bucket: bucket,
1508			})
1509		}
1510		if errResponse.Code == "InvalidBucketName" {
1511			return nil, probe.NewError(BucketInvalid{
1512				Bucket: bucket,
1513			})
1514		}
1515		if errResponse.Code == "NoSuchKey" {
1516			if objectMetadata.IsDeleteMarker {
1517				return nil, probe.NewError(ObjectIsDeleteMarker{})
1518			}
1519			return nil, probe.NewError(ObjectMissing{})
1520		}
1521		return nil, probe.NewError(e)
1522	}
1523	// HEAD with a version ID will not return version in the response headers
1524	if objectMetadata.VersionID == "" {
1525		objectMetadata.VersionID = opts.VersionID
1526	}
1527	return objectMetadata, nil
1528}
1529
1530func isAmazon(host string) bool {
1531	return s3utils.IsAmazonEndpoint(url.URL{Host: host})
1532}
1533
1534func isAmazonChina(host string) bool {
1535	amazonS3ChinaHost := regexp.MustCompile(`^s3\.(cn.*?)\.amazonaws\.com\.cn$`)
1536	parts := amazonS3ChinaHost.FindStringSubmatch(host)
1537	return len(parts) > 1
1538}
1539
1540func isAmazonAccelerated(host string) bool {
1541	return host == "s3-accelerate.amazonaws.com"
1542}
1543
1544func isGoogle(host string) bool {
1545	return s3utils.IsGoogleEndpoint(url.URL{Host: host})
1546}
1547
1548// Figure out if the URL is of 'virtual host' style.
1549// Use lookup from config to see if dns/path style look
1550// up should be used. If it is set to "auto", use virtual
1551// style for supported hosts such as Amazon S3 and Google
1552// Cloud Storage. Otherwise, default to path style
1553func isVirtualHostStyle(host string, lookup minio.BucketLookupType) bool {
1554	if lookup == minio.BucketLookupDNS {
1555		return true
1556	}
1557	if lookup == minio.BucketLookupPath {
1558		return false
1559	}
1560	return isAmazon(host) && !isAmazonChina(host) || isGoogle(host) || isAmazonAccelerated(host)
1561}
1562
1563// url2BucketAndObject gives bucketName and objectName from URL path.
1564func (c *S3Client) url2BucketAndObject() (bucketName, objectName string) {
1565	path := c.targetURL.Path
1566	// Convert any virtual host styled requests.
1567	//
1568	// For the time being this check is introduced for S3,
1569	// If you have custom virtual styled hosts please.
1570	// List them below.
1571	if c.virtualStyle {
1572		var bucket string
1573		hostIndex := strings.Index(c.targetURL.Host, "s3")
1574		if hostIndex != -1 && !matchS3InHost(c.targetURL.Host) {
1575			hostIndex = -1
1576		}
1577		if hostIndex == -1 {
1578			hostIndex = strings.Index(c.targetURL.Host, "s3-accelerate")
1579		}
1580		if hostIndex == -1 {
1581			hostIndex = strings.Index(c.targetURL.Host, "storage.googleapis")
1582		}
1583		if hostIndex > 0 {
1584			bucket = c.targetURL.Host[:hostIndex-1]
1585			path = string(c.targetURL.Separator) + bucket + c.targetURL.Path
1586		}
1587	}
1588	tokens := splitStr(path, string(c.targetURL.Separator), 3)
1589	return tokens[1], tokens[2]
1590}
1591
1592// splitPath split path into bucket and object.
1593func (c *S3Client) splitPath(path string) (bucketName, objectName string) {
1594	path = strings.TrimPrefix(path, string(c.targetURL.Separator))
1595
1596	// Handle path if its virtual style.
1597	if c.virtualStyle {
1598		hostIndex := strings.Index(c.targetURL.Host, "s3")
1599		if hostIndex == -1 {
1600			hostIndex = strings.Index(c.targetURL.Host, "s3-accelerate")
1601		}
1602		if hostIndex == -1 {
1603			hostIndex = strings.Index(c.targetURL.Host, "storage.googleapis")
1604		}
1605		if hostIndex > 0 {
1606			bucketName = c.targetURL.Host[:hostIndex-1]
1607			objectName = path
1608			return bucketName, objectName
1609		}
1610	}
1611
1612	tokens := splitStr(path, string(c.targetURL.Separator), 2)
1613	return tokens[0], tokens[1]
1614}
1615
1616/// Bucket API operations.
1617
1618func (c *S3Client) listVersions(ctx context.Context, b, o string, isRecursive bool, timeRef time.Time, includeOlderVersions, withDeleteMarkers bool) chan minio.ObjectInfo {
1619	objectInfoCh := make(chan minio.ObjectInfo)
1620	go func() {
1621		defer close(objectInfoCh)
1622		c.listVersionsRoutine(ctx, b, o, isRecursive, timeRef, includeOlderVersions, withDeleteMarkers, objectInfoCh)
1623	}()
1624	return objectInfoCh
1625}
1626
1627func (c *S3Client) listVersionsRoutine(ctx context.Context, b, o string, isRecursive bool, timeRef time.Time, includeOlderVersions, withDeleteMarkers bool, objectInfoCh chan minio.ObjectInfo) {
1628	if timeRef.IsZero() {
1629		timeRef = time.Now().UTC()
1630	}
1631
1632	var buckets []string
1633	if b == "" {
1634		bucketsInfo, err := c.api.ListBuckets(ctx)
1635		if err != nil {
1636			objectInfoCh <- minio.ObjectInfo{
1637				Err: err,
1638			}
1639			return
1640		}
1641		for _, b := range bucketsInfo {
1642			buckets = append(buckets, b.Name)
1643		}
1644	} else {
1645		buckets = append(buckets, b)
1646	}
1647
1648	for _, b := range buckets {
1649		var skipKey string
1650		for objectVersion := range c.api.ListObjects(ctx, b, minio.ListObjectsOptions{
1651			Prefix:       o,
1652			Recursive:    isRecursive,
1653			WithVersions: true,
1654		}) {
1655			if objectVersion.Err != nil {
1656				objectInfoCh <- objectVersion
1657				continue
1658			}
1659
1660			if !includeOlderVersions && skipKey == objectVersion.Key {
1661				// Skip current version if not asked to list all versions
1662				// and we already listed the current object key name
1663				continue
1664			}
1665
1666			if objectVersion.LastModified.Before(timeRef) {
1667				skipKey = objectVersion.Key
1668
1669				// Skip if this is a delete marker and we are not asked to list it
1670				if !withDeleteMarkers && objectVersion.IsDeleteMarker {
1671					continue
1672				}
1673
1674				objectInfoCh <- objectVersion
1675			}
1676		}
1677	}
1678}
1679
1680// List - list at delimited path, if not recursive.
1681func (c *S3Client) List(ctx context.Context, opts ListOptions) <-chan *ClientContent {
1682	c.Lock()
1683	defer c.Unlock()
1684
1685	contentCh := make(chan *ClientContent)
1686	go func() {
1687		defer close(contentCh)
1688		if !opts.TimeRef.IsZero() || opts.WithOlderVersions {
1689			c.versionedList(ctx, contentCh, opts)
1690		} else {
1691			c.unversionedList(ctx, contentCh, opts)
1692		}
1693	}()
1694
1695	return contentCh
1696}
1697
1698// versionedList returns objects versions if the S3 backend supports versioning,
1699// it falls back to the regular listing if not.
1700func (c *S3Client) versionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) {
1701	b, o := c.url2BucketAndObject()
1702	switch {
1703	case b == "" && o == "":
1704		buckets, err := c.api.ListBuckets(ctx)
1705		if err != nil {
1706			contentCh <- &ClientContent{
1707				Err: probe.NewError(err),
1708			}
1709			return
1710		}
1711
1712		for _, bucket := range buckets {
1713			if opts.ShowDir != DirLast {
1714				contentCh <- c.bucketInfo2ClientContent(bucket)
1715			}
1716			for objectVersion := range c.listVersions(ctx, bucket.Name, "",
1717				opts.Recursive, opts.TimeRef, opts.WithOlderVersions, opts.WithDeleteMarkers) {
1718				if objectVersion.Err != nil {
1719					if minio.ToErrorResponse(objectVersion.Err).Code == "NotImplemented" {
1720						goto noVersioning
1721					} else {
1722						contentCh <- &ClientContent{
1723							Err: probe.NewError(objectVersion.Err),
1724						}
1725						continue
1726					}
1727				}
1728				contentCh <- c.objectInfo2ClientContent(bucket.Name, objectVersion)
1729			}
1730
1731			if opts.ShowDir == DirLast {
1732				contentCh <- c.bucketInfo2ClientContent(bucket)
1733			}
1734		}
1735		return
1736	default:
1737		for objectVersion := range c.listVersions(ctx, b, o,
1738			opts.Recursive, opts.TimeRef, opts.WithOlderVersions, opts.WithDeleteMarkers) {
1739			if objectVersion.Err != nil {
1740				if minio.ToErrorResponse(objectVersion.Err).Code == "NotImplemented" {
1741					goto noVersioning
1742				} else {
1743					contentCh <- &ClientContent{
1744						Err: probe.NewError(objectVersion.Err),
1745					}
1746					continue
1747				}
1748			}
1749			contentCh <- c.objectInfo2ClientContent(b, objectVersion)
1750		}
1751		return
1752	}
1753
1754noVersioning:
1755	c.unversionedList(ctx, contentCh, opts)
1756
1757}
1758
1759// unversionedList is the non versioned S3 listing
1760func (c *S3Client) unversionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) {
1761	if opts.Incomplete {
1762		if opts.Recursive {
1763			c.listIncompleteRecursiveInRoutine(ctx, contentCh, opts)
1764		} else {
1765			c.listIncompleteInRoutine(ctx, contentCh, opts)
1766		}
1767	} else {
1768		if opts.Recursive {
1769			c.listRecursiveInRoutine(ctx, contentCh, opts)
1770		} else {
1771			c.listInRoutine(ctx, contentCh, opts)
1772		}
1773	}
1774}
1775
1776func (c *S3Client) listIncompleteInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) {
1777	// get bucket and object from URL.
1778	b, o := c.url2BucketAndObject()
1779	switch {
1780	case b == "" && o == "":
1781		buckets, err := c.api.ListBuckets(ctx)
1782		if err != nil {
1783			contentCh <- &ClientContent{
1784				Err: probe.NewError(err),
1785			}
1786			return
1787		}
1788		isRecursive := false
1789		for _, bucket := range buckets {
1790			for object := range c.api.ListIncompleteUploads(ctx, bucket.Name, o, isRecursive) {
1791				if object.Err != nil {
1792					contentCh <- &ClientContent{
1793						Err: probe.NewError(object.Err),
1794					}
1795					return
1796				}
1797				content := &ClientContent{}
1798				url := c.targetURL.Clone()
1799				// Join bucket with - incoming object key.
1800				url.Path = c.joinPath(bucket.Name, object.Key)
1801				switch {
1802				case strings.HasSuffix(object.Key, string(c.targetURL.Separator)):
1803					// We need to keep the trailing Separator, do not use filepath.Join().
1804					content.URL = url
1805					content.Time = time.Now()
1806					content.Type = os.ModeDir
1807				default:
1808					content.URL = url
1809					content.Size = object.Size
1810					content.Time = object.Initiated
1811					content.Type = os.ModeTemporary
1812				}
1813				contentCh <- content
1814			}
1815		}
1816	default:
1817		isRecursive := false
1818		for object := range c.api.ListIncompleteUploads(ctx, b, o, isRecursive) {
1819			if object.Err != nil {
1820				contentCh <- &ClientContent{
1821					Err: probe.NewError(object.Err),
1822				}
1823				return
1824			}
1825			content := &ClientContent{}
1826			url := c.targetURL.Clone()
1827			// Join bucket with - incoming object key.
1828			url.Path = c.joinPath(b, object.Key)
1829			switch {
1830			case strings.HasSuffix(object.Key, string(c.targetURL.Separator)):
1831				// We need to keep the trailing Separator, do not use filepath.Join().
1832				content.URL = url
1833				content.Time = time.Now()
1834				content.Type = os.ModeDir
1835			default:
1836				content.URL = url
1837				content.Size = object.Size
1838				content.Time = object.Initiated
1839				content.Type = os.ModeTemporary
1840			}
1841			contentCh <- content
1842		}
1843	}
1844}
1845
1846func (c *S3Client) listIncompleteRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) {
1847	// get bucket and object from URL.
1848	b, o := c.url2BucketAndObject()
1849	switch {
1850	case b == "" && o == "":
1851		buckets, err := c.api.ListBuckets(ctx)
1852		if err != nil {
1853			contentCh <- &ClientContent{
1854				Err: probe.NewError(err),
1855			}
1856			return
1857		}
1858		isRecursive := true
1859		for _, bucket := range buckets {
1860			if opts.ShowDir != DirLast {
1861				contentCh <- c.bucketInfo2ClientContent(bucket)
1862			}
1863
1864			for object := range c.api.ListIncompleteUploads(ctx, bucket.Name, o, isRecursive) {
1865				if object.Err != nil {
1866					contentCh <- &ClientContent{
1867						Err: probe.NewError(object.Err),
1868					}
1869					return
1870				}
1871				url := c.targetURL.Clone()
1872				url.Path = c.joinPath(bucket.Name, object.Key)
1873				content := &ClientContent{}
1874				content.URL = url
1875				content.Size = object.Size
1876				content.Time = object.Initiated
1877				content.Type = os.ModeTemporary
1878				contentCh <- content
1879			}
1880
1881			if opts.ShowDir == DirLast {
1882				contentCh <- c.bucketInfo2ClientContent(bucket)
1883			}
1884		}
1885	default:
1886		isRecursive := true
1887		for object := range c.api.ListIncompleteUploads(ctx, b, o, isRecursive) {
1888			if object.Err != nil {
1889				contentCh <- &ClientContent{
1890					Err: probe.NewError(object.Err),
1891				}
1892				return
1893			}
1894			url := c.targetURL.Clone()
1895			// Join bucket and incoming object key.
1896			url.Path = c.joinPath(b, object.Key)
1897			content := &ClientContent{}
1898			content.URL = url
1899			content.Size = object.Size
1900			content.Time = object.Initiated
1901			content.Type = os.ModeTemporary
1902			contentCh <- content
1903		}
1904	}
1905}
1906
1907// Returns new path by joining path segments with URL path separator.
1908func (c *S3Client) joinPath(bucket string, objects ...string) string {
1909	p := string(c.targetURL.Separator) + bucket
1910	for _, o := range objects {
1911		p += string(c.targetURL.Separator) + o
1912	}
1913	return p
1914}
1915
1916// Convert objectInfo to ClientContent
1917func (c *S3Client) bucketInfo2ClientContent(bucket minio.BucketInfo) *ClientContent {
1918	content := &ClientContent{}
1919	url := c.targetURL.Clone()
1920	url.Path = c.joinPath(bucket.Name)
1921	content.URL = url
1922	content.Size = 0
1923	content.Time = bucket.CreationDate
1924	content.Type = os.ModeDir
1925	return content
1926}
1927
1928// Convert objectInfo to ClientContent
1929func (c *S3Client) objectInfo2ClientContent(bucket string, entry minio.ObjectInfo) *ClientContent {
1930	content := &ClientContent{}
1931	url := c.targetURL.Clone()
1932	// Join bucket and incoming object key.
1933	if bucket == "" {
1934		panic("should never happen, bucket cannot be empty")
1935	}
1936	url.Path = c.joinPath(bucket, entry.Key)
1937	content.URL = url
1938	content.Size = entry.Size
1939	content.ETag = entry.ETag
1940	content.Time = entry.LastModified
1941	content.Expires = entry.Expires
1942	content.Expiration = entry.Expiration
1943	content.ExpirationRuleID = entry.ExpirationRuleID
1944	content.VersionID = entry.VersionID
1945	content.StorageClass = entry.StorageClass
1946	content.IsDeleteMarker = entry.IsDeleteMarker
1947	content.IsLatest = entry.IsLatest
1948	content.Restore = entry.Restore
1949	content.Metadata = map[string]string{}
1950	content.UserMetadata = map[string]string{}
1951	content.ReplicationStatus = entry.ReplicationStatus
1952	for k, v := range entry.UserMetadata {
1953		content.UserMetadata[k] = v
1954	}
1955	for k := range entry.Metadata {
1956		content.Metadata[k] = entry.Metadata.Get(k)
1957	}
1958	attr, _ := parseAttribute(content.UserMetadata)
1959	if len(attr) > 0 {
1960		_, mtime, _ := parseAtimeMtime(attr)
1961		if !mtime.IsZero() {
1962			content.Time = mtime
1963		}
1964	}
1965	attr, _ = parseAttribute(content.Metadata)
1966	if len(attr) > 0 {
1967		_, mtime, _ := parseAtimeMtime(attr)
1968		if !mtime.IsZero() {
1969			content.Time = mtime
1970		}
1971	}
1972
1973	if strings.HasSuffix(entry.Key, string(c.targetURL.Separator)) {
1974		content.Type = os.ModeDir
1975		if content.Time.IsZero() {
1976			content.Time = time.Now()
1977		}
1978	} else {
1979		content.Type = os.FileMode(0664)
1980	}
1981
1982	return content
1983}
1984
1985// Returns bucket stat info of current bucket.
1986func (c *S3Client) bucketStat(ctx context.Context, bucket string) (*ClientContent, *probe.Error) {
1987	exists, e := c.api.BucketExists(ctx, bucket)
1988	if e != nil {
1989		return nil, probe.NewError(e)
1990	}
1991	if !exists {
1992		return nil, probe.NewError(BucketDoesNotExist{Bucket: bucket})
1993	}
1994	return &ClientContent{URL: c.targetURL.Clone(), Time: time.Unix(0, 0), Type: os.ModeDir}, nil
1995}
1996
1997func (c *S3Client) listInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) {
1998	// get bucket and object from URL.
1999	b, o := c.url2BucketAndObject()
2000	switch {
2001	case b == "" && o == "":
2002		buckets, e := c.api.ListBuckets(ctx)
2003		if e != nil {
2004			contentCh <- &ClientContent{
2005				Err: probe.NewError(e),
2006			}
2007			return
2008		}
2009		for _, bucket := range buckets {
2010			contentCh <- c.bucketInfo2ClientContent(bucket)
2011		}
2012	case b != "" && !strings.HasSuffix(c.targetURL.Path, string(c.targetURL.Separator)) && o == "":
2013		content, err := c.bucketStat(ctx, b)
2014		if err != nil {
2015			contentCh <- &ClientContent{Err: err.Trace(b)}
2016			return
2017		}
2018		contentCh <- content
2019	default:
2020		isRecursive := false
2021		for object := range c.listObjectWrapper(ctx, b, o, isRecursive, time.Time{}, false, false, opts.WithMetadata, -1) {
2022			if object.Err != nil {
2023				contentCh <- &ClientContent{
2024					Err: probe.NewError(object.Err),
2025				}
2026				return
2027			}
2028
2029			// Avoid sending an empty directory when we are specifically listing it
2030			if strings.HasSuffix(object.Key, string(c.targetURL.Separator)) && o == object.Key {
2031				continue
2032			}
2033
2034			contentCh <- c.objectInfo2ClientContent(b, object)
2035		}
2036	}
2037}
2038
2039// S3 offers a range of storage classes designed for
2040// different use cases, following list captures these.
2041const (
2042	// General purpose.
2043	// s3StorageClassStandard = "STANDARD"
2044	// Infrequent access.
2045	// s3StorageClassInfrequent = "STANDARD_IA"
2046	// Reduced redundancy access.
2047	// s3StorageClassRedundancy = "REDUCED_REDUNDANCY"
2048	// Archive access.
2049	s3StorageClassGlacier = "GLACIER"
2050)
2051
2052func (c *S3Client) listRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) {
2053	// get bucket and object from URL.
2054	b, o := c.url2BucketAndObject()
2055	switch {
2056	case b == "" && o == "":
2057		buckets, err := c.api.ListBuckets(ctx)
2058		if err != nil {
2059			contentCh <- &ClientContent{
2060				Err: probe.NewError(err),
2061			}
2062			return
2063		}
2064		for _, bucket := range buckets {
2065			if opts.ShowDir == DirFirst {
2066				contentCh <- c.bucketInfo2ClientContent(bucket)
2067			}
2068
2069			isRecursive := true
2070			for object := range c.listObjectWrapper(ctx, bucket.Name, o, isRecursive, time.Time{}, false, false, opts.WithMetadata, -1) {
2071				if object.Err != nil {
2072					contentCh <- &ClientContent{
2073						Err: probe.NewError(object.Err),
2074					}
2075					return
2076				}
2077				contentCh <- c.objectInfo2ClientContent(bucket.Name, object)
2078			}
2079
2080			if opts.ShowDir == DirLast {
2081				contentCh <- c.bucketInfo2ClientContent(bucket)
2082			}
2083		}
2084	default:
2085		isRecursive := true
2086		for object := range c.listObjectWrapper(ctx, b, o, isRecursive, time.Time{}, false, false, opts.WithMetadata, -1) {
2087			if object.Err != nil {
2088				contentCh <- &ClientContent{
2089					Err: probe.NewError(object.Err),
2090				}
2091				return
2092			}
2093			contentCh <- c.objectInfo2ClientContent(b, object)
2094		}
2095	}
2096}
2097
2098// ShareDownload - get a usable presigned object url to share.
2099func (c *S3Client) ShareDownload(ctx context.Context, versionID string, expires time.Duration) (string, *probe.Error) {
2100	bucket, object := c.url2BucketAndObject()
2101	// No additional request parameters are set for the time being.
2102	reqParams := make(url.Values)
2103	if versionID != "" {
2104		reqParams.Set("versionId", versionID)
2105	}
2106	presignedURL, e := c.api.PresignedGetObject(ctx, bucket, object, expires, reqParams)
2107	if e != nil {
2108		return "", probe.NewError(e)
2109	}
2110	return presignedURL.String(), nil
2111}
2112
2113// ShareUpload - get data for presigned post http form upload.
2114func (c *S3Client) ShareUpload(ctx context.Context, isRecursive bool, expires time.Duration, contentType string) (string, map[string]string, *probe.Error) {
2115	bucket, object := c.url2BucketAndObject()
2116	p := minio.NewPostPolicy()
2117	if e := p.SetExpires(UTCNow().Add(expires)); e != nil {
2118		return "", nil, probe.NewError(e)
2119	}
2120	if strings.TrimSpace(contentType) != "" || contentType != "" {
2121		// No need to verify for error here, since we have stripped out spaces.
2122		p.SetContentType(contentType)
2123	}
2124	if e := p.SetBucket(bucket); e != nil {
2125		return "", nil, probe.NewError(e)
2126	}
2127	if isRecursive {
2128		if e := p.SetKeyStartsWith(object); e != nil {
2129			return "", nil, probe.NewError(e)
2130		}
2131	} else {
2132		if e := p.SetKey(object); e != nil {
2133			return "", nil, probe.NewError(e)
2134		}
2135	}
2136	u, m, e := c.api.PresignedPostPolicy(ctx, p)
2137	if e != nil {
2138		return "", nil, probe.NewError(e)
2139	}
2140	return u.String(), m, nil
2141}
2142
2143// SetObjectLockConfig - Set object lock configurataion of bucket.
2144func (c *S3Client) SetObjectLockConfig(ctx context.Context, mode minio.RetentionMode, validity uint64, unit minio.ValidityUnit) *probe.Error {
2145	bucket, _ := c.url2BucketAndObject()
2146
2147	// FIXME: This is too ugly, fix minio-go
2148	vuint := (uint)(validity)
2149	if mode != "" && vuint > 0 && unit != "" {
2150		e := c.api.SetBucketObjectLockConfig(ctx, bucket, &mode, &vuint, &unit)
2151		if e != nil {
2152			return probe.NewError(e).Trace(c.GetURL().String())
2153		}
2154		return nil
2155	}
2156	if mode == "" && vuint == 0 && unit == "" {
2157		e := c.api.SetBucketObjectLockConfig(ctx, bucket, nil, nil, nil)
2158		if e != nil {
2159			return probe.NewError(e).Trace(c.GetURL().String())
2160		}
2161		return nil
2162	}
2163	return errInvalidArgument().Trace(c.GetURL().String())
2164}
2165
2166// PutObjectRetention - Set object retention for a given object.
2167func (c *S3Client) PutObjectRetention(ctx context.Context, versionID string, mode minio.RetentionMode, retainUntilDate time.Time, bypassGovernance bool) *probe.Error {
2168	bucket, object := c.url2BucketAndObject()
2169
2170	var (
2171		modePtr            *minio.RetentionMode
2172		retainUntilDatePtr *time.Time
2173	)
2174
2175	if mode != "" && retainUntilDate.IsZero() {
2176		return errInvalidArgument().Trace(c.GetURL().String())
2177	}
2178
2179	if mode != "" {
2180		modePtr = &mode
2181		retainUntilDatePtr = &retainUntilDate
2182	}
2183
2184	opts := minio.PutObjectRetentionOptions{
2185		VersionID:        versionID,
2186		RetainUntilDate:  retainUntilDatePtr,
2187		Mode:             modePtr,
2188		GovernanceBypass: bypassGovernance,
2189	}
2190	e := c.api.PutObjectRetention(ctx, bucket, object, opts)
2191	if e != nil {
2192		return probe.NewError(e).Trace(c.GetURL().String())
2193	}
2194	return nil
2195}
2196
2197// GetObjectRetention - Get object retention for a given object.
2198func (c *S3Client) GetObjectRetention(ctx context.Context, versionID string) (minio.RetentionMode, time.Time, *probe.Error) {
2199	bucket, object := c.url2BucketAndObject()
2200	if object == "" {
2201		return "", time.Time{}, probe.NewError(ObjectNameEmpty{}).Trace(c.GetURL().String())
2202	}
2203	modePtr, untilPtr, e := c.api.GetObjectRetention(ctx, bucket, object, versionID)
2204	if e != nil {
2205		return "", time.Time{}, probe.NewError(e).Trace(c.GetURL().String())
2206	}
2207	var (
2208		mode  minio.RetentionMode
2209		until time.Time
2210	)
2211	if modePtr != nil {
2212		mode = *modePtr
2213	}
2214	if untilPtr != nil {
2215		until = *untilPtr
2216	}
2217	return mode, until, nil
2218}
2219
2220// PutObjectLegalHold - Set object legal hold for a given object.
2221func (c *S3Client) PutObjectLegalHold(ctx context.Context, versionID string, lhold minio.LegalHoldStatus) *probe.Error {
2222	bucket, object := c.url2BucketAndObject()
2223	if lhold.IsValid() {
2224		opts := minio.PutObjectLegalHoldOptions{
2225			Status:    &lhold,
2226			VersionID: versionID,
2227		}
2228		e := c.api.PutObjectLegalHold(ctx, bucket, object, opts)
2229		if e != nil {
2230			return probe.NewError(e).Trace(c.GetURL().String())
2231		}
2232		return nil
2233	}
2234	return errInvalidArgument().Trace(c.GetURL().String())
2235}
2236
2237// GetObjectLegalHold - Get object legal hold for a given object.
2238func (c *S3Client) GetObjectLegalHold(ctx context.Context, versionID string) (minio.LegalHoldStatus, *probe.Error) {
2239	var lhold minio.LegalHoldStatus
2240	bucket, object := c.url2BucketAndObject()
2241	opts := minio.GetObjectLegalHoldOptions{
2242		VersionID: versionID,
2243	}
2244	lhPtr, e := c.api.GetObjectLegalHold(ctx, bucket, object, opts)
2245	if e != nil {
2246		errResp := minio.ToErrorResponse(e)
2247		if errResp.Code != "NoSuchObjectLockConfiguration" {
2248			return "", probe.NewError(e).Trace(c.GetURL().String())
2249		}
2250		return "", nil
2251	}
2252	// lhPtr can be nil if there is no legalhold status set
2253	if lhPtr != nil {
2254		lhold = *lhPtr
2255	}
2256	return lhold, nil
2257}
2258
2259// GetObjectLockConfig - Get object lock configuration of bucket.
2260func (c *S3Client) GetObjectLockConfig(ctx context.Context) (string, minio.RetentionMode, uint64, minio.ValidityUnit, *probe.Error) {
2261	bucket, _ := c.url2BucketAndObject()
2262
2263	status, mode, validity, unit, e := c.api.GetObjectLockConfig(ctx, bucket)
2264	if e != nil {
2265		return "", "", 0, "", probe.NewError(e).Trace(c.GetURL().String())
2266	}
2267
2268	if mode != nil && validity != nil && unit != nil {
2269		// FIXME: this is too ugly, fix minio-go
2270		vuint64 := uint64(*validity)
2271		return status, *mode, vuint64, *unit, nil
2272	}
2273
2274	return status, "", 0, "", nil
2275}
2276
2277// GetTags - Get tags of bucket or object.
2278func (c *S3Client) GetTags(ctx context.Context, versionID string) (map[string]string, *probe.Error) {
2279	bucketName, objectName := c.url2BucketAndObject()
2280	if bucketName == "" {
2281		return nil, probe.NewError(BucketNameEmpty{})
2282	}
2283
2284	if objectName == "" {
2285		if versionID != "" {
2286			return nil, probe.NewError(errors.New("getting bucket tags does not support versioning parameters"))
2287		}
2288
2289		tags, err := c.api.GetBucketTagging(ctx, bucketName)
2290		if err != nil {
2291			return nil, probe.NewError(err)
2292		}
2293
2294		return tags.ToMap(), nil
2295	}
2296
2297	tags, err := c.api.GetObjectTagging(ctx, bucketName, objectName, minio.GetObjectTaggingOptions{VersionID: versionID})
2298	if err != nil {
2299		return nil, probe.NewError(err)
2300	}
2301
2302	return tags.ToMap(), nil
2303}
2304
2305// SetTags - Set tags of bucket or object.
2306func (c *S3Client) SetTags(ctx context.Context, versionID, tagString string) *probe.Error {
2307	bucketName, objectName := c.url2BucketAndObject()
2308	if bucketName == "" {
2309		return probe.NewError(BucketNameEmpty{})
2310	}
2311
2312	tags, err := tags.Parse(tagString, objectName != "")
2313	if err != nil {
2314		return probe.NewError(err)
2315	}
2316
2317	if objectName == "" {
2318		if versionID != "" {
2319			return probe.NewError(errors.New("setting bucket tags does not support versioning parameters"))
2320		}
2321		err = c.api.SetBucketTagging(ctx, bucketName, tags)
2322	} else {
2323		err = c.api.PutObjectTagging(ctx, bucketName, objectName, tags, minio.PutObjectTaggingOptions{VersionID: versionID})
2324	}
2325
2326	if err != nil {
2327		return probe.NewError(err)
2328	}
2329
2330	return nil
2331}
2332
2333// DeleteTags - Delete tags of bucket or object
2334func (c *S3Client) DeleteTags(ctx context.Context, versionID string) *probe.Error {
2335	bucketName, objectName := c.url2BucketAndObject()
2336	if bucketName == "" {
2337		return probe.NewError(BucketNameEmpty{})
2338	}
2339
2340	var err error
2341	if objectName == "" {
2342		if versionID != "" {
2343			return probe.NewError(errors.New("setting bucket tags does not support versioning parameters"))
2344		}
2345		err = c.api.RemoveBucketTagging(ctx, bucketName)
2346	} else {
2347		err = c.api.RemoveObjectTagging(ctx, bucketName, objectName, minio.RemoveObjectTaggingOptions{VersionID: versionID})
2348	}
2349
2350	if err != nil {
2351		return probe.NewError(err)
2352	}
2353
2354	return nil
2355}
2356
2357// GetLifecycle - Get current lifecycle configuration.
2358func (c *S3Client) GetLifecycle(ctx context.Context) (*lifecycle.Configuration, *probe.Error) {
2359	bucket, _ := c.url2BucketAndObject()
2360	if bucket == "" {
2361		return nil, probe.NewError(BucketNameEmpty{})
2362	}
2363
2364	config, e := c.api.GetBucketLifecycle(ctx, bucket)
2365	if e != nil {
2366		return nil, probe.NewError(e)
2367	}
2368
2369	return config, nil
2370}
2371
2372// SetLifecycle - Set lifecycle configuration on a bucket
2373func (c *S3Client) SetLifecycle(ctx context.Context, config *lifecycle.Configuration) *probe.Error {
2374	bucket, _ := c.url2BucketAndObject()
2375	if bucket == "" {
2376		return probe.NewError(BucketNameEmpty{})
2377	}
2378
2379	if e := c.api.SetBucketLifecycle(ctx, bucket, config); e != nil {
2380		return probe.NewError(e)
2381	}
2382
2383	return nil
2384}
2385
2386// GetVersion - gets bucket version info.
2387func (c *S3Client) GetVersion(ctx context.Context) (config minio.BucketVersioningConfiguration, err *probe.Error) {
2388	bucket, _ := c.url2BucketAndObject()
2389	if bucket == "" {
2390		return config, probe.NewError(BucketNameEmpty{})
2391	}
2392	var e error
2393	config, e = c.api.GetBucketVersioning(ctx, bucket)
2394	if e != nil {
2395		return config, probe.NewError(e)
2396	}
2397
2398	return config, nil
2399}
2400
2401// SetVersion - Set version configuration on a bucket
2402func (c *S3Client) SetVersion(ctx context.Context, status string) *probe.Error {
2403	bucket, _ := c.url2BucketAndObject()
2404	if bucket == "" {
2405		return probe.NewError(BucketNameEmpty{})
2406	}
2407	var err error
2408	switch status {
2409	case "enable":
2410		err = c.api.EnableVersioning(ctx, bucket)
2411	case "suspend":
2412		err = c.api.SuspendVersioning(ctx, bucket)
2413	default:
2414		return probe.NewError(fmt.Errorf("Invalid versioning status"))
2415	}
2416	return probe.NewError(err)
2417}
2418
2419// GetReplication - gets replication configuration for a given bucket.
2420func (c *S3Client) GetReplication(ctx context.Context) (replication.Config, *probe.Error) {
2421	bucket, _ := c.url2BucketAndObject()
2422	if bucket == "" {
2423		return replication.Config{}, probe.NewError(BucketNameEmpty{})
2424	}
2425
2426	replicationCfg, e := c.api.GetBucketReplication(ctx, bucket)
2427	if e != nil {
2428		return replication.Config{}, probe.NewError(e)
2429	}
2430	return replicationCfg, nil
2431}
2432
2433// RemoveReplication - removes replication configuration for a given bucket.
2434func (c *S3Client) RemoveReplication(ctx context.Context) *probe.Error {
2435	bucket, _ := c.url2BucketAndObject()
2436	if bucket == "" {
2437		return probe.NewError(BucketNameEmpty{})
2438	}
2439
2440	e := c.api.RemoveBucketReplication(ctx, bucket)
2441	return probe.NewError(e)
2442}
2443
2444// SetReplication sets replication configuration for a given bucket.
2445func (c *S3Client) SetReplication(ctx context.Context, cfg *replication.Config, opts replication.Options) *probe.Error {
2446	bucket, objectPrefix := c.url2BucketAndObject()
2447	if bucket == "" {
2448		return probe.NewError(BucketNameEmpty{})
2449	}
2450	opts.Prefix = objectPrefix
2451	switch opts.Op {
2452	case replication.AddOption:
2453		if e := cfg.AddRule(opts); e != nil {
2454			return probe.NewError(e)
2455		}
2456	case replication.SetOption:
2457		if e := cfg.EditRule(opts); e != nil {
2458			return probe.NewError(e)
2459		}
2460	case replication.RemoveOption:
2461		if e := cfg.RemoveRule(opts); e != nil {
2462			return probe.NewError(e)
2463		}
2464	case replication.ImportOption:
2465	default:
2466		return probe.NewError(fmt.Errorf("Invalid replication option"))
2467	}
2468	if e := c.api.SetBucketReplication(ctx, bucket, *cfg); e != nil {
2469		return probe.NewError(e)
2470	}
2471	return nil
2472}
2473
2474// GetReplicationMetrics - Get replication metrics for a given bucket.
2475func (c *S3Client) GetReplicationMetrics(ctx context.Context) (replication.Metrics, *probe.Error) {
2476	bucket, _ := c.url2BucketAndObject()
2477	if bucket == "" {
2478		return replication.Metrics{}, probe.NewError(BucketNameEmpty{})
2479	}
2480
2481	metrics, e := c.api.GetBucketReplicationMetrics(ctx, bucket)
2482	if e != nil {
2483		return replication.Metrics{}, probe.NewError(e)
2484	}
2485	return metrics, nil
2486}
2487
2488// ResetReplication - kicks off replication again on previously replicated objects if existing object
2489// replication is enabled in the replication config.Optional to provide a timestamp
2490func (c *S3Client) ResetReplication(ctx context.Context, before time.Duration, tgtArn string) (rinfo replication.ResyncTargetsInfo, err *probe.Error) {
2491	bucket, _ := c.url2BucketAndObject()
2492	if bucket == "" {
2493		return rinfo, probe.NewError(BucketNameEmpty{})
2494	}
2495
2496	rinfo, e := c.api.ResetBucketReplicationOnTarget(ctx, bucket, before, tgtArn)
2497	if e != nil {
2498		return rinfo, probe.NewError(e)
2499	}
2500	return rinfo, nil
2501}
2502
2503// GetEncryption - gets bucket encryption info.
2504func (c *S3Client) GetEncryption(ctx context.Context) (algorithm, keyID string, err *probe.Error) {
2505	bucket, _ := c.url2BucketAndObject()
2506	if bucket == "" {
2507		return "", "", probe.NewError(BucketNameEmpty{})
2508	}
2509
2510	config, e := c.api.GetBucketEncryption(ctx, bucket)
2511	if e != nil {
2512		return "", "", probe.NewError(e)
2513	}
2514	for _, rule := range config.Rules {
2515		algorithm = rule.Apply.SSEAlgorithm
2516		if rule.Apply.KmsMasterKeyID != "" {
2517			keyID = rule.Apply.KmsMasterKeyID
2518			break
2519		}
2520	}
2521	return algorithm, keyID, nil
2522}
2523
2524// SetEncryption - Set encryption configuration on a bucket
2525func (c *S3Client) SetEncryption(ctx context.Context, encType string, kmsKeyID string) *probe.Error {
2526	bucket, _ := c.url2BucketAndObject()
2527	if bucket == "" {
2528		return probe.NewError(BucketNameEmpty{})
2529	}
2530	var config *sse.Configuration
2531	switch strings.ToLower(encType) {
2532	case "sse-kms":
2533		config = sse.NewConfigurationSSEKMS(kmsKeyID)
2534	case "sse-s3":
2535		config = sse.NewConfigurationSSES3()
2536	default:
2537		return probe.NewError(fmt.Errorf("Invalid encryption algorithm %s", encType))
2538	}
2539	if err := c.api.SetBucketEncryption(ctx, bucket, config); err != nil {
2540		return probe.NewError(err)
2541	}
2542	return nil
2543}
2544
2545// DeleteEncryption - removes encryption configuration on a bucket
2546func (c *S3Client) DeleteEncryption(ctx context.Context) *probe.Error {
2547	bucket, _ := c.url2BucketAndObject()
2548	if bucket == "" {
2549		return probe.NewError(BucketNameEmpty{})
2550	}
2551	if err := c.api.RemoveBucketEncryption(ctx, bucket); err != nil {
2552		return probe.NewError(err)
2553	}
2554	return nil
2555}
2556
2557// GetBucketInfo gets info about a bucket
2558func (c *S3Client) GetBucketInfo(ctx context.Context) (BucketInfo, *probe.Error) {
2559	var b BucketInfo
2560	bucket, _ := c.url2BucketAndObject()
2561	if bucket == "" {
2562		return b, probe.NewError(BucketNameEmpty{})
2563	}
2564	content, err := c.bucketStat(ctx, bucket)
2565	if err != nil {
2566		return b, err.Trace(bucket)
2567	}
2568	b.URL = content.URL
2569	b.Size = content.Size
2570	b.Type = content.Type
2571	b.Date = content.Time
2572	if vcfg, err := c.GetVersion(ctx); err == nil {
2573		b.Versioning.Status = vcfg.Status
2574		b.Versioning.MFADelete = vcfg.MFADelete
2575	}
2576	if enabled, mode, validity, unit, err := c.api.GetObjectLockConfig(ctx, bucket); err == nil {
2577		if mode != nil {
2578			b.Locking.Mode = *mode
2579		}
2580		b.Locking.Enabled = enabled
2581		if validity != nil && unit != nil {
2582			vuint64 := uint64(*validity)
2583			b.Locking.Validity = fmt.Sprintf("%d%s", vuint64, unit)
2584		}
2585	}
2586
2587	if rcfg, err := c.GetReplication(ctx); err == nil {
2588		if !rcfg.Empty() {
2589			b.Replication.Enabled = true
2590		}
2591	}
2592	if algo, keyID, err := c.GetEncryption(ctx); err == nil {
2593		b.Encryption.Algorithm = algo
2594		b.Encryption.KeyID = keyID
2595	}
2596
2597	if pType, policyStr, err := c.GetAccess(ctx); err == nil {
2598		b.Policy.Type = pType
2599		b.Policy.Text = policyStr
2600	}
2601	location, e := c.api.GetBucketLocation(ctx, bucket)
2602	if e != nil {
2603		return b, probe.NewError(e)
2604	}
2605	b.Location = location
2606	if tags, err := c.GetTags(ctx, ""); err == nil {
2607		b.Tagging = tags
2608	}
2609	if lfc, err := c.GetLifecycle(ctx); err == nil {
2610		b.ILM.Config = lfc
2611	}
2612	if nfc, err := c.api.GetBucketNotification(ctx, bucket); err == nil {
2613		b.Notification.Config = nfc
2614	}
2615	return b, nil
2616}
2617
2618// Restore gets a copy of an archived object
2619func (c *S3Client) Restore(ctx context.Context, versionID string, days int) *probe.Error {
2620	bucket, object := c.url2BucketAndObject()
2621	if bucket == "" {
2622		return probe.NewError(BucketNameEmpty{})
2623	}
2624	if object == "" {
2625		return probe.NewError(ObjectNameEmpty{})
2626	}
2627
2628	req := minio.RestoreRequest{}
2629	req.SetDays(days)
2630	req.SetGlacierJobParameters(minio.GlacierJobParameters{Tier: minio.TierExpedited})
2631	if err := c.api.RestoreObject(ctx, bucket, object, versionID, req); err != nil {
2632		return probe.NewError(err)
2633	}
2634	return nil
2635}
2636