1// Copyright (c) 2015-2021 MinIO, Inc. 2// 3// This file is part of MinIO Object Storage stack 4// 5// This program is free software: you can redistribute it and/or modify 6// it under the terms of the GNU Affero General Public License as published by 7// the Free Software Foundation, either version 3 of the License, or 8// (at your option) any later version. 9// 10// This program is distributed in the hope that it will be useful 11// but WITHOUT ANY WARRANTY; without even the implied warranty of 12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13// GNU Affero General Public License for more details. 14// 15// You should have received a copy of the GNU Affero General Public License 16// along with this program. If not, see <http://www.gnu.org/licenses/>. 17 18package cmd 19 20import ( 21 "bytes" 22 "context" 23 "crypto/tls" 24 "encoding/json" 25 "errors" 26 "fmt" 27 "hash/fnv" 28 "io" 29 "net" 30 "net/http" 31 "net/url" 32 "os" 33 "path" 34 "path/filepath" 35 "regexp" 36 "strings" 37 "sync" 38 "time" 39 40 "github.com/minio/mc/pkg/httptracer" 41 "github.com/minio/mc/pkg/probe" 42 minio "github.com/minio/minio-go/v7" 43 "github.com/minio/minio-go/v7/pkg/credentials" 44 "github.com/minio/minio-go/v7/pkg/encrypt" 45 "github.com/minio/minio-go/v7/pkg/lifecycle" 46 "github.com/minio/minio-go/v7/pkg/notification" 47 "github.com/minio/minio-go/v7/pkg/policy" 48 "github.com/minio/minio-go/v7/pkg/replication" 49 "github.com/minio/minio-go/v7/pkg/sse" 50 51 "github.com/minio/minio-go/v7/pkg/s3utils" 52 "github.com/minio/minio-go/v7/pkg/tags" 53 "github.com/minio/pkg/mimedb" 54) 55 56// S3Client construct 57type S3Client struct { 58 sync.Mutex 59 targetURL *ClientURL 60 api *minio.Client 61 virtualStyle bool 62} 63 64const ( 65 amazonHostNameAccelerated = "s3-accelerate.amazonaws.com" 66 googleHostName = "storage.googleapis.com" 67 serverEncryptionKeyPrefix = "x-amz-server-side-encryption" 68 69 defaultRecordDelimiter = "\n" 70 defaultFieldDelimiter = "," 71) 72 73const ( 74 recordDelimiterType = "recorddelimiter" 75 fieldDelimiterType = "fielddelimiter" 76 quoteCharacterType = "quotechar" 77 quoteEscapeCharacterType = "quoteescchar" 78 quoteFieldsType = "quotefields" 79 fileHeaderType = "fileheader" 80 commentCharType = "commentchar" 81 typeJSONType = "type" 82 // AmzObjectLockMode sets object lock mode 83 AmzObjectLockMode = "X-Amz-Object-Lock-Mode" 84 // AmzObjectLockRetainUntilDate sets object lock retain until date 85 AmzObjectLockRetainUntilDate = "X-Amz-Object-Lock-Retain-Until-Date" 86 // AmzObjectLockLegalHold sets object lock legal hold 87 AmzObjectLockLegalHold = "X-Amz-Object-Lock-Legal-Hold" 88) 89 90var timeSentinel = time.Unix(0, 0).UTC() 91 92// newFactory encloses New function with client cache. 93func newFactory() func(config *Config) (Client, *probe.Error) { 94 clientCache := make(map[uint32]*minio.Client) 95 var mutex sync.Mutex 96 97 // Return New function. 98 return func(config *Config) (Client, *probe.Error) { 99 // Creates a parsed URL. 100 targetURL := newClientURL(config.HostURL) 101 // By default enable HTTPs. 102 useTLS := true 103 if targetURL.Scheme == "http" { 104 useTLS = false 105 } 106 107 // Instantiate s3 108 s3Clnt := &S3Client{} 109 // Save the target URL. 110 s3Clnt.targetURL = targetURL 111 112 // Save if target supports virtual host style. 113 hostName := targetURL.Host 114 s3Clnt.virtualStyle = isVirtualHostStyle(hostName, config.Lookup) 115 isS3AcceleratedEndpoint := isAmazonAccelerated(hostName) 116 117 if s3Clnt.virtualStyle { 118 // If Google URL replace it with 'storage.googleapis.com' 119 if isGoogle(hostName) { 120 hostName = googleHostName 121 } 122 } 123 // Generate a hash out of s3Conf. 124 confHash := fnv.New32a() 125 confHash.Write([]byte(hostName + config.AccessKey + config.SecretKey + config.SessionToken)) 126 confSum := confHash.Sum32() 127 128 // Lookup previous cache by hash. 129 mutex.Lock() 130 defer mutex.Unlock() 131 var api *minio.Client 132 var found bool 133 if api, found = clientCache[confSum]; !found { 134 // if Signature version '4' use NewV4 directly. 135 creds := credentials.NewStaticV4(config.AccessKey, config.SecretKey, config.SessionToken) 136 // if Signature version '2' use NewV2 directly. 137 if strings.ToUpper(config.Signature) == "S3V2" { 138 creds = credentials.NewStaticV2(config.AccessKey, config.SecretKey, "") 139 } 140 141 var transport http.RoundTripper 142 143 if config.Transport != nil { 144 transport = config.Transport 145 } else { 146 tr := &http.Transport{ 147 Proxy: http.ProxyFromEnvironment, 148 DialContext: (&net.Dialer{ 149 Timeout: 10 * time.Second, 150 KeepAlive: 15 * time.Second, 151 }).DialContext, 152 MaxIdleConnsPerHost: 256, 153 IdleConnTimeout: 90 * time.Second, 154 TLSHandshakeTimeout: 10 * time.Second, 155 ExpectContinueTimeout: 10 * time.Second, 156 // Set this value so that the underlying transport round-tripper 157 // doesn't try to auto decode the body of objects with 158 // content-encoding set to `gzip`. 159 // 160 // Refer: 161 // https://golang.org/src/net/http/transport.go?h=roundTrip#L1843 162 DisableCompression: true, 163 } 164 if useTLS { 165 // Keep TLS config. 166 tlsConfig := &tls.Config{ 167 RootCAs: globalRootCAs, 168 // Can't use SSLv3 because of POODLE and BEAST 169 // Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher 170 // Can't use TLSv1.1 because of RC4 cipher usage 171 MinVersion: tls.VersionTLS12, 172 } 173 if config.Insecure { 174 tlsConfig.InsecureSkipVerify = true 175 } 176 tr.TLSClientConfig = tlsConfig 177 178 // Because we create a custom TLSClientConfig, we have to opt-in to HTTP/2. 179 // See https://github.com/golang/go/issues/14275 180 // 181 // TODO: Enable http2.0 when upstream issues related to HTTP/2 are fixed. 182 // 183 // if e = http2.ConfigureTransport(tr); e != nil { 184 // return nil, probe.NewError(e) 185 // } 186 } 187 transport = tr 188 } 189 190 if config.Debug { 191 if strings.EqualFold(config.Signature, "S3v4") { 192 transport = httptracer.GetNewTraceTransport(newTraceV4(), transport) 193 } else if strings.EqualFold(config.Signature, "S3v2") { 194 transport = httptracer.GetNewTraceTransport(newTraceV2(), transport) 195 } 196 } 197 198 // Not found. Instantiate a new MinIO 199 var e error 200 201 options := minio.Options{ 202 Creds: creds, 203 Secure: useTLS, 204 Region: os.Getenv("MC_REGION"), 205 BucketLookup: config.Lookup, 206 Transport: transport, 207 } 208 209 api, e = minio.New(hostName, &options) 210 if e != nil { 211 return nil, probe.NewError(e) 212 } 213 214 // If Amazon Accelerated URL is requested enable it. 215 if isS3AcceleratedEndpoint { 216 api.SetS3TransferAccelerate(amazonHostNameAccelerated) 217 } 218 219 // Set app info. 220 api.SetAppInfo(config.AppName, config.AppVersion) 221 222 // Cache the new MinIO Client with hash of config as key. 223 clientCache[confSum] = api 224 } 225 226 // Store the new api object. 227 s3Clnt.api = api 228 229 return s3Clnt, nil 230 } 231} 232 233// S3New returns an initialized S3Client structure. If debug is enabled, 234// it also enables an internal trace transport. 235var S3New = newFactory() 236 237// GetURL get url. 238func (c *S3Client) GetURL() ClientURL { 239 return c.targetURL.Clone() 240} 241 242// AddNotificationConfig - Add bucket notification 243func (c *S3Client) AddNotificationConfig(ctx context.Context, arn string, events []string, prefix, suffix string, ignoreExisting bool) *probe.Error { 244 bucket, _ := c.url2BucketAndObject() 245 // Validate total fields in ARN. 246 fields := strings.Split(arn, ":") 247 if len(fields) != 6 { 248 return errInvalidArgument() 249 } 250 251 // Get any enabled notification. 252 mb, e := c.api.GetBucketNotification(ctx, bucket) 253 if e != nil { 254 return probe.NewError(e) 255 } 256 257 accountArn := notification.NewArn(fields[1], fields[2], fields[3], fields[4], fields[5]) 258 nc := notification.NewConfig(accountArn) 259 260 // Configure events 261 for _, event := range events { 262 switch event { 263 case "put": 264 nc.AddEvents(notification.ObjectCreatedAll) 265 case "delete": 266 nc.AddEvents(notification.ObjectRemovedAll) 267 case "get": 268 nc.AddEvents(notification.ObjectAccessedAll) 269 case "replica": 270 nc.AddEvents(notification.EventType("s3:Replication:*")) 271 case "ilm": 272 nc.AddEvents(notification.EventType("s3:ObjectRestore:*")) 273 nc.AddEvents(notification.EventType("s3:ObjectTransition:*")) 274 default: 275 return errInvalidArgument().Trace(events...) 276 } 277 } 278 if prefix != "" { 279 nc.AddFilterPrefix(prefix) 280 } 281 if suffix != "" { 282 nc.AddFilterSuffix(suffix) 283 } 284 285 switch fields[2] { 286 case "sns": 287 if !mb.AddTopic(nc) { 288 return errInvalidArgument().Trace("Overlapping Topic configs") 289 } 290 case "sqs": 291 if !mb.AddQueue(nc) { 292 return errInvalidArgument().Trace("Overlapping Queue configs") 293 } 294 case "lambda": 295 if !mb.AddLambda(nc) { 296 return errInvalidArgument().Trace("Overlapping lambda configs") 297 } 298 default: 299 return errInvalidArgument().Trace(fields[2]) 300 } 301 302 // Set the new bucket configuration 303 if err := c.api.SetBucketNotification(ctx, bucket, mb); err != nil { 304 if ignoreExisting && strings.Contains(err.Error(), "An object key name filtering rule defined with overlapping prefixes, overlapping suffixes, or overlapping combinations of prefixes and suffixes for the same event types") { 305 return nil 306 } 307 return probe.NewError(err) 308 } 309 return nil 310} 311 312// RemoveNotificationConfig - Remove bucket notification 313func (c *S3Client) RemoveNotificationConfig(ctx context.Context, arn string, event string, prefix string, suffix string) *probe.Error { 314 bucket, _ := c.url2BucketAndObject() 315 // Remove all notification configs if arn is empty 316 if arn == "" { 317 if err := c.api.RemoveAllBucketNotification(ctx, bucket); err != nil { 318 return probe.NewError(err) 319 } 320 return nil 321 } 322 323 mb, e := c.api.GetBucketNotification(ctx, bucket) 324 if e != nil { 325 return probe.NewError(e) 326 } 327 328 fields := strings.Split(arn, ":") 329 if len(fields) != 6 { 330 return errInvalidArgument().Trace(fields...) 331 } 332 accountArn := notification.NewArn(fields[1], fields[2], fields[3], fields[4], fields[5]) 333 334 // if we are passed filters for either events, suffix or prefix, then only delete the single event that matches 335 // the arguments 336 if event != "" || suffix != "" || prefix != "" { 337 // Translate events to type events for comparison 338 events := strings.Split(event, ",") 339 var eventsTyped []notification.EventType 340 for _, e := range events { 341 switch e { 342 case "put": 343 eventsTyped = append(eventsTyped, notification.ObjectCreatedAll) 344 case "delete": 345 eventsTyped = append(eventsTyped, notification.ObjectRemovedAll) 346 case "get": 347 eventsTyped = append(eventsTyped, notification.ObjectAccessedAll) 348 case "replica": 349 eventsTyped = append(eventsTyped, notification.EventType("s3:Replication:*")) 350 case "ilm": 351 eventsTyped = append(eventsTyped, notification.EventType("s3:ObjectRestore:*")) 352 eventsTyped = append(eventsTyped, notification.EventType("s3:ObjectTransition:*")) 353 default: 354 return errInvalidArgument().Trace(events...) 355 } 356 } 357 var err error 358 // based on the arn type, we'll look for the event in the corresponding sublist and delete it if there's a match 359 switch fields[2] { 360 case "sns": 361 err = mb.RemoveTopicByArnEventsPrefixSuffix(accountArn, eventsTyped, prefix, suffix) 362 case "sqs": 363 err = mb.RemoveQueueByArnEventsPrefixSuffix(accountArn, eventsTyped, prefix, suffix) 364 case "lambda": 365 err = mb.RemoveLambdaByArnEventsPrefixSuffix(accountArn, eventsTyped, prefix, suffix) 366 default: 367 return errInvalidArgument().Trace(fields[2]) 368 } 369 if err != nil { 370 return probe.NewError(err) 371 } 372 373 } else { 374 // remove all events for matching arn 375 switch fields[2] { 376 case "sns": 377 mb.RemoveTopicByArn(accountArn) 378 case "sqs": 379 mb.RemoveQueueByArn(accountArn) 380 case "lambda": 381 mb.RemoveLambdaByArn(accountArn) 382 default: 383 return errInvalidArgument().Trace(fields[2]) 384 } 385 } 386 387 // Set the new bucket configuration 388 if e := c.api.SetBucketNotification(ctx, bucket, mb); e != nil { 389 return probe.NewError(e) 390 } 391 return nil 392} 393 394// NotificationConfig notification config 395type NotificationConfig struct { 396 ID string `json:"id"` 397 Arn string `json:"arn"` 398 Events []string `json:"events"` 399 Prefix string `json:"prefix"` 400 Suffix string `json:"suffix"` 401} 402 403// ListNotificationConfigs - List notification configs 404func (c *S3Client) ListNotificationConfigs(ctx context.Context, arn string) ([]NotificationConfig, *probe.Error) { 405 var configs []NotificationConfig 406 bucket, _ := c.url2BucketAndObject() 407 mb, e := c.api.GetBucketNotification(ctx, bucket) 408 if e != nil { 409 return nil, probe.NewError(e) 410 } 411 412 // Generate pretty event names from event types 413 prettyEventNames := func(eventsTypes []notification.EventType) []string { 414 var result []string 415 for _, eventType := range eventsTypes { 416 result = append(result, string(eventType)) 417 } 418 return result 419 } 420 421 getFilters := func(config notification.Config) (prefix, suffix string) { 422 if config.Filter == nil { 423 return 424 } 425 for _, filter := range config.Filter.S3Key.FilterRules { 426 if strings.ToLower(filter.Name) == "prefix" { 427 prefix = filter.Value 428 } 429 if strings.ToLower(filter.Name) == "suffix" { 430 suffix = filter.Value 431 } 432 433 } 434 return prefix, suffix 435 } 436 437 for _, config := range mb.TopicConfigs { 438 if arn != "" && config.Topic != arn { 439 continue 440 } 441 prefix, suffix := getFilters(config.Config) 442 configs = append(configs, NotificationConfig{ID: config.ID, 443 Arn: config.Topic, 444 Events: prettyEventNames(config.Events), 445 Prefix: prefix, 446 Suffix: suffix}) 447 } 448 449 for _, config := range mb.QueueConfigs { 450 if arn != "" && config.Queue != arn { 451 continue 452 } 453 prefix, suffix := getFilters(config.Config) 454 configs = append(configs, NotificationConfig{ID: config.ID, 455 Arn: config.Queue, 456 Events: prettyEventNames(config.Events), 457 Prefix: prefix, 458 Suffix: suffix}) 459 } 460 461 for _, config := range mb.LambdaConfigs { 462 if arn != "" && config.Lambda != arn { 463 continue 464 } 465 prefix, suffix := getFilters(config.Config) 466 configs = append(configs, NotificationConfig{ID: config.ID, 467 Arn: config.Lambda, 468 Events: prettyEventNames(config.Events), 469 Prefix: prefix, 470 Suffix: suffix}) 471 } 472 473 return configs, nil 474} 475 476// Supported content types 477var supportedContentTypes = []string{ 478 "csv", 479 "json", 480 "gzip", 481 "bzip2", 482} 483 484// set the SelectObjectOutputSerialization struct using options passed in by client. If unspecified, 485// default S3 API specified defaults 486func selectObjectOutputOpts(selOpts SelectObjectOpts, i minio.SelectObjectInputSerialization) minio.SelectObjectOutputSerialization { 487 var isOK bool 488 var recDelim, fldDelim, quoteChar, quoteEscChar, qf string 489 490 o := minio.SelectObjectOutputSerialization{} 491 if _, ok := selOpts.OutputSerOpts["json"]; ok { 492 jo := minio.JSONOutputOptions{} 493 if recDelim, isOK = selOpts.OutputSerOpts["json"][recordDelimiterType]; !isOK { 494 recDelim = "\n" 495 } 496 jo.SetRecordDelimiter(recDelim) 497 o.JSON = &jo 498 } 499 if _, ok := selOpts.OutputSerOpts["csv"]; ok { 500 ocsv := minio.CSVOutputOptions{} 501 if recDelim, isOK = selOpts.OutputSerOpts["csv"][recordDelimiterType]; !isOK { 502 recDelim = defaultRecordDelimiter 503 } 504 ocsv.SetRecordDelimiter(recDelim) 505 if fldDelim, isOK = selOpts.OutputSerOpts["csv"][fieldDelimiterType]; !isOK { 506 fldDelim = defaultFieldDelimiter 507 } 508 ocsv.SetFieldDelimiter(fldDelim) 509 if quoteChar, isOK = selOpts.OutputSerOpts["csv"][quoteCharacterType]; isOK { 510 ocsv.SetQuoteCharacter(quoteChar) 511 } 512 if quoteEscChar, isOK = selOpts.OutputSerOpts["csv"][quoteEscapeCharacterType]; isOK { 513 ocsv.SetQuoteEscapeCharacter(quoteEscChar) 514 } 515 if qf, isOK = selOpts.OutputSerOpts["csv"][quoteFieldsType]; isOK { 516 ocsv.SetQuoteFields(minio.CSVQuoteFields(qf)) 517 } 518 o.CSV = &ocsv 519 } 520 // default to CSV output if options left unspecified 521 if o.CSV == nil && o.JSON == nil { 522 if i.JSON != nil { 523 j := minio.JSONOutputOptions{} 524 j.SetRecordDelimiter("\n") 525 o.JSON = &j 526 } else { 527 ocsv := minio.CSVOutputOptions{} 528 ocsv.SetRecordDelimiter(defaultRecordDelimiter) 529 ocsv.SetFieldDelimiter(defaultFieldDelimiter) 530 o.CSV = &ocsv 531 } 532 } 533 return o 534} 535 536func trimCompressionFileExts(name string) string { 537 return strings.TrimSuffix(strings.TrimSuffix(strings.TrimSuffix(name, ".gz"), ".bz"), ".bz2") 538} 539 540// set the SelectObjectInputSerialization struct using options passed in by client. If unspecified, 541// default S3 API specified defaults 542func selectObjectInputOpts(selOpts SelectObjectOpts, object string) minio.SelectObjectInputSerialization { 543 var isOK bool 544 var recDelim, fldDelim, quoteChar, quoteEscChar, fileHeader, commentChar, typ string 545 546 i := minio.SelectObjectInputSerialization{} 547 if _, ok := selOpts.InputSerOpts["parquet"]; ok { 548 iparquet := minio.ParquetInputOptions{} 549 i.Parquet = &iparquet 550 } 551 if _, ok := selOpts.InputSerOpts["json"]; ok { 552 j := minio.JSONInputOptions{} 553 if typ = selOpts.InputSerOpts["json"][typeJSONType]; typ != "" { 554 j.SetType(minio.JSONType(typ)) 555 } 556 i.JSON = &j 557 } 558 if _, ok := selOpts.InputSerOpts["csv"]; ok { 559 icsv := minio.CSVInputOptions{} 560 icsv.SetRecordDelimiter(defaultRecordDelimiter) 561 if recDelim, isOK = selOpts.InputSerOpts["csv"][recordDelimiterType]; isOK { 562 icsv.SetRecordDelimiter(recDelim) 563 } 564 if fldDelim, isOK = selOpts.InputSerOpts["csv"][fieldDelimiterType]; isOK { 565 icsv.SetFieldDelimiter(fldDelim) 566 } 567 if quoteChar, isOK = selOpts.InputSerOpts["csv"][quoteCharacterType]; isOK { 568 icsv.SetQuoteCharacter(quoteChar) 569 } 570 if quoteEscChar, isOK = selOpts.InputSerOpts["csv"][quoteEscapeCharacterType]; isOK { 571 icsv.SetQuoteEscapeCharacter(quoteEscChar) 572 } 573 if fileHeader, isOK = selOpts.InputSerOpts["csv"][fileHeaderType]; isOK { 574 icsv.SetFileHeaderInfo(minio.CSVFileHeaderInfo(fileHeader)) 575 } 576 if commentChar, isOK = selOpts.InputSerOpts["csv"][commentCharType]; isOK { 577 icsv.SetComments(commentChar) 578 } 579 i.CSV = &icsv 580 } 581 if i.CSV == nil && i.JSON == nil && i.Parquet == nil { 582 ext := filepath.Ext(trimCompressionFileExts(object)) 583 if strings.Contains(ext, "csv") { 584 icsv := minio.CSVInputOptions{} 585 icsv.SetRecordDelimiter(defaultRecordDelimiter) 586 icsv.SetFieldDelimiter(defaultFieldDelimiter) 587 icsv.SetFileHeaderInfo(minio.CSVFileHeaderInfoUse) 588 i.CSV = &icsv 589 } 590 if strings.Contains(ext, "parquet") || strings.Contains(object, ".parquet") { 591 iparquet := minio.ParquetInputOptions{} 592 i.Parquet = &iparquet 593 } 594 if strings.Contains(ext, "json") { 595 ijson := minio.JSONInputOptions{} 596 ijson.SetType(minio.JSONLinesType) 597 i.JSON = &ijson 598 } 599 } 600 if i.CompressionType == "" { 601 i.CompressionType = selectCompressionType(selOpts, object) 602 } 603 return i 604} 605 606// get client specified compression type or default compression type from file extension 607func selectCompressionType(selOpts SelectObjectOpts, object string) minio.SelectCompressionType { 608 ext := filepath.Ext(object) 609 contentType := mimedb.TypeByExtension(ext) 610 611 if selOpts.CompressionType != "" { 612 return selOpts.CompressionType 613 } 614 if strings.Contains(ext, "parquet") || strings.Contains(object, ".parquet") { 615 return minio.SelectCompressionNONE 616 } 617 if contentType != "" { 618 if strings.Contains(contentType, "gzip") { 619 return minio.SelectCompressionGZIP 620 } else if strings.Contains(contentType, "bzip") { 621 return minio.SelectCompressionBZIP 622 } 623 } 624 return minio.SelectCompressionNONE 625} 626 627// Select - select object content wrapper. 628func (c *S3Client) Select(ctx context.Context, expression string, sse encrypt.ServerSide, selOpts SelectObjectOpts) (io.ReadCloser, *probe.Error) { 629 opts := minio.SelectObjectOptions{ 630 Expression: expression, 631 ExpressionType: minio.QueryExpressionTypeSQL, 632 // Set any encryption headers 633 ServerSideEncryption: sse, 634 } 635 636 bucket, object := c.url2BucketAndObject() 637 638 opts.InputSerialization = selectObjectInputOpts(selOpts, object) 639 opts.OutputSerialization = selectObjectOutputOpts(selOpts, opts.InputSerialization) 640 reader, e := c.api.SelectObjectContent(ctx, bucket, object, opts) 641 if e != nil { 642 return nil, probe.NewError(e) 643 } 644 return reader, nil 645} 646 647func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo { 648 var eventsInfo = make([]EventInfo, len(ninfo.Records)) 649 for i, record := range ninfo.Records { 650 bucketName := record.S3.Bucket.Name 651 var key string 652 // Unescape only if needed, look for URL encoded content. 653 if strings.Contains(record.S3.Object.Key, "%2F") { 654 var e error 655 key, e = url.QueryUnescape(record.S3.Object.Key) 656 if e != nil { 657 key = record.S3.Object.Key 658 } 659 } else { 660 key = record.S3.Object.Key 661 } 662 u := c.targetURL.Clone() 663 u.Path = path.Join(string(u.Separator), bucketName, key) 664 if strings.HasPrefix(record.EventName, "s3:ObjectCreated:") { 665 if strings.HasPrefix(record.EventName, "s3:ObjectCreated:Copy") { 666 eventsInfo[i] = EventInfo{ 667 Time: record.EventTime, 668 Size: record.S3.Object.Size, 669 UserMetadata: record.S3.Object.UserMetadata, 670 Path: u.String(), 671 Type: notification.ObjectCreatedCopy, 672 Host: record.Source.Host, 673 Port: record.Source.Port, 674 UserAgent: record.Source.UserAgent, 675 } 676 } else if strings.HasPrefix(record.EventName, "s3:ObjectCreated:PutRetention") { 677 eventsInfo[i] = EventInfo{ 678 Time: record.EventTime, 679 Size: record.S3.Object.Size, 680 UserMetadata: record.S3.Object.UserMetadata, 681 Path: u.String(), 682 Type: notification.EventType("s3:ObjectCreated:PutRetention"), 683 Host: record.Source.Host, 684 Port: record.Source.Port, 685 UserAgent: record.Source.UserAgent, 686 } 687 } else if strings.HasPrefix(record.EventName, "s3:ObjectCreated:PutLegalHold") { 688 eventsInfo[i] = EventInfo{ 689 Time: record.EventTime, 690 Size: record.S3.Object.Size, 691 UserMetadata: record.S3.Object.UserMetadata, 692 Path: u.String(), 693 Type: notification.EventType("s3:ObjectCreated:PutLegalHold"), 694 Host: record.Source.Host, 695 Port: record.Source.Port, 696 UserAgent: record.Source.UserAgent, 697 } 698 } else { 699 eventsInfo[i] = EventInfo{ 700 Time: record.EventTime, 701 Size: record.S3.Object.Size, 702 UserMetadata: record.S3.Object.UserMetadata, 703 Path: u.String(), 704 Type: notification.ObjectCreatedPut, 705 Host: record.Source.Host, 706 Port: record.Source.Port, 707 UserAgent: record.Source.UserAgent, 708 } 709 } 710 } else { 711 eventsInfo[i] = EventInfo{ 712 Time: record.EventTime, 713 Size: record.S3.Object.Size, 714 UserMetadata: record.S3.Object.UserMetadata, 715 Path: u.String(), 716 Type: notification.EventType(record.EventName), 717 Host: record.Source.Host, 718 Port: record.Source.Port, 719 UserAgent: record.Source.UserAgent, 720 } 721 } 722 } 723 return eventsInfo 724} 725 726// Watch - Start watching on all bucket events for a given account ID. 727func (c *S3Client) Watch(ctx context.Context, options WatchOptions) (*WatchObject, *probe.Error) { 728 // Extract bucket and object. 729 bucket, object := c.url2BucketAndObject() 730 731 // Validation 732 if bucket == "" && object != "" { 733 return nil, errInvalidArgument().Trace(bucket, object) 734 } 735 if object != "" && options.Prefix != "" { 736 return nil, errInvalidArgument().Trace(options.Prefix, object) 737 } 738 739 // Flag set to set the notification. 740 var events []string 741 for _, event := range options.Events { 742 switch event { 743 case "put": 744 events = append(events, string(notification.ObjectCreatedAll)) 745 case "delete": 746 events = append(events, string(notification.ObjectRemovedAll)) 747 case "get": 748 events = append(events, string(notification.ObjectAccessedAll)) 749 case "replica": 750 events = append(events, "s3:Replication:*") // TODO: add it to minio-go as constant 751 case "ilm": 752 events = append(events, "s3:ObjectRestore:*", "s3:ObjectTransition:*") // TODO: add it to minio-go as constant 753 case "bucket-creation": 754 events = append(events, string(notification.BucketCreatedAll)) 755 case "bucket-removal": 756 events = append(events, string(notification.BucketRemovedAll)) 757 default: 758 return nil, errInvalidArgument().Trace(event) 759 } 760 } 761 762 wo := &WatchObject{ 763 EventInfoChan: make(chan []EventInfo), 764 ErrorChan: make(chan *probe.Error), 765 DoneChan: make(chan struct{}), 766 } 767 768 var eventsCh <-chan notification.Info 769 if bucket != "" { 770 if object != "" && options.Prefix == "" { 771 options.Prefix = object 772 } 773 eventsCh = c.api.ListenBucketNotification(ctx, bucket, options.Prefix, options.Suffix, events) 774 } else { 775 eventsCh = c.api.ListenNotification(ctx, "", "", events) 776 } 777 778 go func() { 779 // Start listening on all bucket events. 780 for notificationInfo := range eventsCh { 781 if notificationInfo.Err != nil { 782 var perr *probe.Error 783 if minio.ToErrorResponse(notificationInfo.Err).Code == "NotImplemented" { 784 perr = probe.NewError(APINotImplemented{ 785 API: "Watch", 786 APIType: c.GetURL().String(), 787 }) 788 } else { 789 perr = probe.NewError(notificationInfo.Err) 790 } 791 wo.Errors() <- perr 792 } else { 793 wo.Events() <- c.notificationToEventsInfo(notificationInfo) 794 } 795 } 796 797 close(wo.EventInfoChan) 798 close(wo.ErrorChan) 799 }() 800 801 return wo, nil 802} 803 804// Get - get object with GET options. 805func (c *S3Client) Get(ctx context.Context, opts GetOptions) (io.ReadCloser, *probe.Error) { 806 bucket, object := c.url2BucketAndObject() 807 808 reader, e := c.api.GetObject(ctx, bucket, object, 809 minio.GetObjectOptions{ 810 ServerSideEncryption: opts.SSE, 811 VersionID: opts.VersionID, 812 }) 813 if e != nil { 814 errResponse := minio.ToErrorResponse(e) 815 if errResponse.Code == "NoSuchBucket" { 816 return nil, probe.NewError(BucketDoesNotExist{ 817 Bucket: bucket, 818 }) 819 } 820 if errResponse.Code == "InvalidBucketName" { 821 return nil, probe.NewError(BucketInvalid{ 822 Bucket: bucket, 823 }) 824 } 825 if errResponse.Code == "NoSuchKey" { 826 return nil, probe.NewError(ObjectMissing{}) 827 } 828 return nil, probe.NewError(e) 829 } 830 return reader, nil 831} 832 833// Copy - copy object, uses server side copy API. Also uses an abstracted API 834// such that large file sizes will be copied in multipart manner on server 835// side. 836func (c *S3Client) Copy(ctx context.Context, source string, opts CopyOptions, progress io.Reader) *probe.Error { 837 dstBucket, dstObject := c.url2BucketAndObject() 838 if dstBucket == "" { 839 return probe.NewError(BucketNameEmpty{}) 840 } 841 842 metadata := make(map[string]string, len(opts.metadata)) 843 for k, v := range opts.metadata { 844 metadata[k] = v 845 } 846 847 delete(metadata, "X-Amz-Storage-Class") 848 if opts.storageClass != "" { 849 metadata["X-Amz-Storage-Class"] = opts.storageClass 850 } 851 852 tokens := splitStr(source, string(c.targetURL.Separator), 3) 853 854 // Source object 855 srcOpts := minio.CopySrcOptions{ 856 Bucket: tokens[1], 857 Object: tokens[2], 858 Encryption: opts.srcSSE, 859 VersionID: opts.versionID, 860 } 861 862 destOpts := minio.CopyDestOptions{ 863 Bucket: dstBucket, 864 Object: dstObject, 865 Encryption: opts.tgtSSE, 866 Progress: progress, 867 Size: opts.size, 868 } 869 870 if lockModeStr, ok := metadata[AmzObjectLockMode]; ok { 871 destOpts.Mode = minio.RetentionMode(strings.ToUpper(lockModeStr)) 872 delete(metadata, AmzObjectLockMode) 873 } 874 875 if retainUntilDateStr, ok := metadata[AmzObjectLockRetainUntilDate]; ok { 876 delete(metadata, AmzObjectLockRetainUntilDate) 877 if t, e := time.Parse(time.RFC3339, retainUntilDateStr); e == nil { 878 destOpts.RetainUntilDate = t.UTC() 879 } 880 } 881 882 if lh, ok := metadata[AmzObjectLockLegalHold]; ok { 883 destOpts.LegalHold = minio.LegalHoldStatus(lh) 884 delete(metadata, AmzObjectLockLegalHold) 885 } 886 887 // Assign metadata after irrelevant parts are delete above 888 destOpts.UserMetadata = metadata 889 destOpts.ReplaceMetadata = len(metadata) > 0 890 891 var e error 892 if opts.disableMultipart || opts.size < 64*1024*1024 { 893 _, e = c.api.CopyObject(ctx, destOpts, srcOpts) 894 } else { 895 _, e = c.api.ComposeObject(ctx, destOpts, srcOpts) 896 } 897 898 if e != nil { 899 errResponse := minio.ToErrorResponse(e) 900 if errResponse.Code == "AccessDenied" { 901 return probe.NewError(PathInsufficientPermission{ 902 Path: c.targetURL.String(), 903 }) 904 } 905 if errResponse.Code == "NoSuchBucket" { 906 return probe.NewError(BucketDoesNotExist{ 907 Bucket: dstBucket, 908 }) 909 } 910 if errResponse.Code == "InvalidBucketName" { 911 return probe.NewError(BucketInvalid{ 912 Bucket: dstBucket, 913 }) 914 } 915 if errResponse.Code == "NoSuchKey" { 916 return probe.NewError(ObjectMissing{}) 917 } 918 return probe.NewError(e) 919 } 920 return nil 921} 922 923// Put - upload an object with custom metadata. 924func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progress io.Reader, putOpts PutOptions) (int64, *probe.Error) { 925 bucket, object := c.url2BucketAndObject() 926 if bucket == "" { 927 return 0, probe.NewError(BucketNameEmpty{}) 928 } 929 930 metadata := make(map[string]string, len(putOpts.metadata)) 931 for k, v := range putOpts.metadata { 932 metadata[k] = v 933 } 934 935 // Do not copy storage class, it needs to be specified in putOpts 936 delete(metadata, "X-Amz-Storage-Class") 937 938 contentType, ok := metadata["Content-Type"] 939 if ok { 940 delete(metadata, "Content-Type") 941 } else { 942 // Set content-type if not specified. 943 contentType = "application/octet-stream" 944 } 945 946 cacheControl, ok := metadata["Cache-Control"] 947 if ok { 948 delete(metadata, "Cache-Control") 949 } 950 951 contentEncoding, ok := metadata["Content-Encoding"] 952 if ok { 953 delete(metadata, "Content-Encoding") 954 } 955 956 contentDisposition, ok := metadata["Content-Disposition"] 957 if ok { 958 delete(metadata, "Content-Disposition") 959 } 960 961 contentLanguage, ok := metadata["Content-Language"] 962 if ok { 963 delete(metadata, "Content-Language") 964 } 965 966 var tagsMap map[string]string 967 tagsHdr, ok := metadata["X-Amz-Tagging"] 968 if ok { 969 tagsSet, e := tags.Parse(tagsHdr, true) 970 if e != nil { 971 return 0, probe.NewError(e) 972 } 973 tagsMap = tagsSet.ToMap() 974 delete(metadata, "X-Amz-Tagging") 975 } 976 977 lockModeStr, ok := metadata[AmzObjectLockMode] 978 lockMode := minio.RetentionMode("") 979 if ok { 980 lockMode = minio.RetentionMode(strings.ToUpper(lockModeStr)) 981 delete(metadata, AmzObjectLockMode) 982 } 983 984 retainUntilDate := timeSentinel 985 retainUntilDateStr, ok := metadata[AmzObjectLockRetainUntilDate] 986 if ok { 987 delete(metadata, AmzObjectLockRetainUntilDate) 988 if t, e := time.Parse(time.RFC3339, retainUntilDateStr); e == nil { 989 retainUntilDate = t.UTC() 990 } 991 } 992 993 opts := minio.PutObjectOptions{ 994 UserMetadata: metadata, 995 UserTags: tagsMap, 996 Progress: progress, 997 ContentType: contentType, 998 CacheControl: cacheControl, 999 ContentDisposition: contentDisposition, 1000 ContentEncoding: contentEncoding, 1001 ContentLanguage: contentLanguage, 1002 StorageClass: strings.ToUpper(putOpts.storageClass), 1003 ServerSideEncryption: putOpts.sse, 1004 SendContentMd5: putOpts.md5, 1005 DisableMultipart: putOpts.disableMultipart, 1006 PartSize: putOpts.multipartSize, 1007 NumThreads: putOpts.multipartThreads, 1008 } 1009 1010 if !retainUntilDate.IsZero() && !retainUntilDate.Equal(timeSentinel) { 1011 opts.RetainUntilDate = retainUntilDate 1012 } 1013 1014 if lockModeStr != "" { 1015 opts.Mode = lockMode 1016 opts.SendContentMd5 = true 1017 } 1018 1019 if lh, ok := metadata[AmzObjectLockLegalHold]; ok { 1020 delete(metadata, AmzObjectLockLegalHold) 1021 opts.LegalHold = minio.LegalHoldStatus(strings.ToUpper(lh)) 1022 opts.SendContentMd5 = true 1023 } 1024 1025 ui, e := c.api.PutObject(ctx, bucket, object, reader, size, opts) 1026 if e != nil { 1027 errResponse := minio.ToErrorResponse(e) 1028 if errResponse.Code == "UnexpectedEOF" || e == io.EOF { 1029 return ui.Size, probe.NewError(UnexpectedEOF{ 1030 TotalSize: size, 1031 TotalWritten: ui.Size, 1032 }) 1033 } 1034 if errResponse.Code == "AccessDenied" { 1035 return ui.Size, probe.NewError(PathInsufficientPermission{ 1036 Path: c.targetURL.String(), 1037 }) 1038 } 1039 if errResponse.Code == "MethodNotAllowed" { 1040 return ui.Size, probe.NewError(ObjectAlreadyExists{ 1041 Object: object, 1042 }) 1043 } 1044 if errResponse.Code == "XMinioObjectExistsAsDirectory" { 1045 return ui.Size, probe.NewError(ObjectAlreadyExistsAsDirectory{ 1046 Object: object, 1047 }) 1048 } 1049 if errResponse.Code == "NoSuchBucket" { 1050 return ui.Size, probe.NewError(BucketDoesNotExist{ 1051 Bucket: bucket, 1052 }) 1053 } 1054 if errResponse.Code == "InvalidBucketName" { 1055 return ui.Size, probe.NewError(BucketInvalid{ 1056 Bucket: bucket, 1057 }) 1058 } 1059 if errResponse.Code == "NoSuchKey" { 1060 return ui.Size, probe.NewError(ObjectMissing{}) 1061 } 1062 return ui.Size, probe.NewError(e) 1063 } 1064 return ui.Size, nil 1065} 1066 1067// Remove incomplete uploads. 1068func (c *S3Client) removeIncompleteObjects(ctx context.Context, bucket string, objectsCh <-chan minio.ObjectInfo) <-chan minio.RemoveObjectError { 1069 removeObjectErrorCh := make(chan minio.RemoveObjectError) 1070 1071 // Goroutine reads from objectsCh and sends error to removeObjectErrorCh if any. 1072 go func() { 1073 defer close(removeObjectErrorCh) 1074 1075 for info := range objectsCh { 1076 if err := c.api.RemoveIncompleteUpload(ctx, bucket, info.Key); err != nil { 1077 removeObjectErrorCh <- minio.RemoveObjectError{ObjectName: info.Key, Err: err} 1078 } 1079 } 1080 }() 1081 1082 return removeObjectErrorCh 1083} 1084 1085// AddUserAgent - add custom user agent. 1086func (c *S3Client) AddUserAgent(app string, version string) { 1087 c.api.SetAppInfo(app, version) 1088} 1089 1090// Remove - remove object or bucket(s). 1091func (c *S3Client) Remove(ctx context.Context, isIncomplete, isRemoveBucket, isBypass bool, contentCh <-chan *ClientContent) <-chan *probe.Error { 1092 errorCh := make(chan *probe.Error) 1093 1094 prevBucket := "" 1095 // Maintain objectsCh, statusCh for each bucket 1096 var objectsCh chan minio.ObjectInfo 1097 var statusCh <-chan minio.RemoveObjectError 1098 opts := minio.RemoveObjectsOptions{ 1099 GovernanceBypass: isBypass, 1100 } 1101 1102 go func() { 1103 defer close(errorCh) 1104 if isRemoveBucket { 1105 if _, object := c.url2BucketAndObject(); object != "" { 1106 errorCh <- probe.NewError(errors.New( 1107 "use `mc rm` command to delete prefixes, or point your" + 1108 " bucket directly, `mc rb <alias>/<bucket-name>/`")) 1109 return 1110 } 1111 } 1112 for { 1113 select { 1114 case <-ctx.Done(): 1115 errorCh <- probe.NewError(ctx.Err()) 1116 return 1117 case content, ok := <-contentCh: 1118 if !ok { 1119 goto breakout 1120 } 1121 1122 // Convert content.URL.Path to objectName for objectsCh. 1123 bucket, objectName := c.splitPath(content.URL.Path) 1124 objectVersionID := content.VersionID 1125 1126 // We don't treat path when bucket is 1127 // empty, just skip it when it happens. 1128 if bucket == "" { 1129 continue 1130 } 1131 1132 // Init objectsCh the first time. 1133 if prevBucket == "" { 1134 objectsCh = make(chan minio.ObjectInfo) 1135 prevBucket = bucket 1136 if isIncomplete { 1137 statusCh = c.removeIncompleteObjects(ctx, bucket, objectsCh) 1138 } else { 1139 statusCh = c.api.RemoveObjects(ctx, bucket, objectsCh, opts) 1140 } 1141 } 1142 1143 if prevBucket != bucket { 1144 if objectsCh != nil { 1145 close(objectsCh) 1146 } 1147 for removeStatus := range statusCh { 1148 errorCh <- probe.NewError(removeStatus.Err) 1149 } 1150 // Remove bucket if it qualifies. 1151 if isRemoveBucket && !isIncomplete { 1152 if err := c.api.RemoveBucket(ctx, prevBucket); err != nil { 1153 errorCh <- probe.NewError(err) 1154 } 1155 } 1156 // Re-init objectsCh for next bucket 1157 objectsCh = make(chan minio.ObjectInfo) 1158 if isIncomplete { 1159 statusCh = c.removeIncompleteObjects(ctx, bucket, objectsCh) 1160 } else { 1161 statusCh = c.api.RemoveObjects(ctx, bucket, objectsCh, opts) 1162 } 1163 prevBucket = bucket 1164 } 1165 1166 if objectName != "" { 1167 // Send object name once but continuously checks for pending 1168 // errors in parallel, the reason is that minio-go RemoveObjects 1169 // can block if there is any pending error not received yet. 1170 sent := false 1171 for !sent { 1172 select { 1173 case objectsCh <- minio.ObjectInfo{Key: objectName, VersionID: objectVersionID}: 1174 sent = true 1175 case removeStatus := <-statusCh: 1176 errorCh <- probe.NewError(removeStatus.Err) 1177 } 1178 } 1179 } else { 1180 // end of bucket - close the objectsCh 1181 if objectsCh != nil { 1182 close(objectsCh) 1183 } 1184 objectsCh = nil 1185 } 1186 } 1187 } 1188 1189 breakout: 1190 // Close objectsCh at end of contentCh 1191 if objectsCh != nil { 1192 close(objectsCh) 1193 } 1194 // Write remove objects status to errorCh 1195 if statusCh != nil { 1196 for removeStatus := range statusCh { 1197 // If the removeStatus error message is: 1198 // "Object is WORM protected and cannot be overwritten", 1199 // it is too generic. We have the object's name and vid. 1200 // Adding the object's name and version id into the error msg 1201 removeStatus.Err = errors.New(strings.Replace( 1202 removeStatus.Err.Error(), "Object is WORM protected", 1203 "Object, '"+removeStatus.ObjectName+" (Version ID="+ 1204 removeStatus.VersionID+")' is WORM protected", 1)) 1205 errorCh <- probe.NewError(removeStatus.Err) 1206 } 1207 } 1208 // Remove last bucket if it qualifies. 1209 if isRemoveBucket && prevBucket != "" && !isIncomplete { 1210 if err := c.api.RemoveBucket(ctx, prevBucket); err != nil { 1211 errorCh <- probe.NewError(err) 1212 } 1213 } 1214 }() 1215 return errorCh 1216} 1217 1218// MakeBucket - make a new bucket. 1219func (c *S3Client) MakeBucket(ctx context.Context, region string, ignoreExisting, withLock bool) *probe.Error { 1220 bucket, object := c.url2BucketAndObject() 1221 if bucket == "" { 1222 return probe.NewError(BucketNameEmpty{}) 1223 } 1224 if object != "" { 1225 if !strings.HasSuffix(object, string(c.targetURL.Separator)) { 1226 object += string(c.targetURL.Separator) 1227 } 1228 var retried bool 1229 for { 1230 _, e := c.api.PutObject(ctx, bucket, object, bytes.NewReader([]byte("")), 0, 1231 // Always send Content-MD5 to succeed with bucket with 1232 // locking enabled. There is no performance hit since 1233 // this is always an empty object 1234 minio.PutObjectOptions{SendContentMd5: true}, 1235 ) 1236 if e == nil { 1237 return nil 1238 } 1239 if retried { 1240 return probe.NewError(e) 1241 } 1242 switch minio.ToErrorResponse(e).Code { 1243 case "NoSuchBucket": 1244 opts := minio.MakeBucketOptions{Region: region, ObjectLocking: withLock} 1245 if e = c.api.MakeBucket(ctx, bucket, opts); e != nil { 1246 return probe.NewError(e) 1247 } 1248 retried = true 1249 continue 1250 } 1251 return probe.NewError(e) 1252 } 1253 } 1254 1255 var e error 1256 opts := minio.MakeBucketOptions{Region: region, ObjectLocking: withLock} 1257 if e = c.api.MakeBucket(ctx, bucket, opts); e != nil { 1258 // Ignore bucket already existing error when ignoreExisting flag is enabled 1259 if ignoreExisting { 1260 switch minio.ToErrorResponse(e).Code { 1261 case "BucketAlreadyOwnedByYou": 1262 fallthrough 1263 case "BucketAlreadyExists": 1264 return nil 1265 } 1266 } 1267 return probe.NewError(e) 1268 } 1269 return nil 1270} 1271 1272// RemoveBucket removes a bucket, forcibly if asked 1273func (c *S3Client) RemoveBucket(ctx context.Context, forceRemove bool) *probe.Error { 1274 bucket, object := c.url2BucketAndObject() 1275 if bucket == "" { 1276 return probe.NewError(BucketNameEmpty{}) 1277 } 1278 if object != "" { 1279 return errInvalidArgument() 1280 } 1281 1282 opts := minio.BucketOptions{ForceDelete: forceRemove} 1283 if e := c.api.RemoveBucketWithOptions(ctx, bucket, opts); e != nil { 1284 return probe.NewError(e) 1285 } 1286 return nil 1287} 1288 1289// GetAccessRules - get configured policies from the server 1290func (c *S3Client) GetAccessRules(ctx context.Context) (map[string]string, *probe.Error) { 1291 bucket, object := c.url2BucketAndObject() 1292 if bucket == "" { 1293 return map[string]string{}, probe.NewError(BucketNameEmpty{}) 1294 } 1295 policies := map[string]string{} 1296 policyStr, e := c.api.GetBucketPolicy(ctx, bucket) 1297 if e != nil { 1298 return nil, probe.NewError(e) 1299 } 1300 if policyStr == "" { 1301 return policies, nil 1302 } 1303 var p policy.BucketAccessPolicy 1304 if e = json.Unmarshal([]byte(policyStr), &p); e != nil { 1305 return nil, probe.NewError(e) 1306 } 1307 policyRules := policy.GetPolicies(p.Statements, bucket, object) 1308 // Hide policy data structure at this level 1309 for k, v := range policyRules { 1310 policies[k] = string(v) 1311 } 1312 return policies, nil 1313} 1314 1315// GetAccess get access policy permissions. 1316func (c *S3Client) GetAccess(ctx context.Context) (string, string, *probe.Error) { 1317 bucket, object := c.url2BucketAndObject() 1318 if bucket == "" { 1319 return "", "", probe.NewError(BucketNameEmpty{}) 1320 } 1321 policyStr, e := c.api.GetBucketPolicy(ctx, bucket) 1322 if e != nil { 1323 return "", "", probe.NewError(e) 1324 } 1325 if policyStr == "" { 1326 return string(policy.BucketPolicyNone), policyStr, nil 1327 } 1328 var p policy.BucketAccessPolicy 1329 if e = json.Unmarshal([]byte(policyStr), &p); e != nil { 1330 return "", "", probe.NewError(e) 1331 } 1332 pType := string(policy.GetPolicy(p.Statements, bucket, object)) 1333 if pType == string(policy.BucketPolicyNone) && policyStr != "" { 1334 pType = "custom" 1335 } 1336 return pType, policyStr, nil 1337} 1338 1339// SetAccess set access policy permissions. 1340func (c *S3Client) SetAccess(ctx context.Context, bucketPolicy string, isJSON bool) *probe.Error { 1341 bucket, object := c.url2BucketAndObject() 1342 if bucket == "" { 1343 return probe.NewError(BucketNameEmpty{}) 1344 } 1345 if isJSON { 1346 if e := c.api.SetBucketPolicy(ctx, bucket, bucketPolicy); e != nil { 1347 return probe.NewError(e) 1348 } 1349 return nil 1350 } 1351 policyStr, e := c.api.GetBucketPolicy(ctx, bucket) 1352 if e != nil { 1353 return probe.NewError(e) 1354 } 1355 var p = policy.BucketAccessPolicy{Version: "2012-10-17"} 1356 if policyStr != "" { 1357 if e = json.Unmarshal([]byte(policyStr), &p); e != nil { 1358 return probe.NewError(e) 1359 } 1360 } 1361 p.Statements = policy.SetPolicy(p.Statements, policy.BucketPolicy(bucketPolicy), bucket, object) 1362 if len(p.Statements) == 0 { 1363 if e = c.api.SetBucketPolicy(ctx, bucket, ""); e != nil { 1364 return probe.NewError(e) 1365 } 1366 return nil 1367 } 1368 policyB, e := json.Marshal(p) 1369 if e != nil { 1370 return probe.NewError(e) 1371 } 1372 if e = c.api.SetBucketPolicy(ctx, bucket, string(policyB)); e != nil { 1373 return probe.NewError(e) 1374 } 1375 return nil 1376} 1377 1378// listObjectWrapper - select ObjectList mode depending on arguments 1379func (c *S3Client) listObjectWrapper(ctx context.Context, bucket, object string, isRecursive bool, timeRef time.Time, withVersions, withDeleteMarkers bool, metadata bool, maxKeys int) <-chan minio.ObjectInfo { 1380 if !timeRef.IsZero() || withVersions { 1381 return c.listVersions(ctx, bucket, object, isRecursive, timeRef, withVersions, withDeleteMarkers) 1382 } 1383 1384 if isGoogle(c.targetURL.Host) { 1385 // Google Cloud S3 layer doesn't implement ListObjectsV2 implementation 1386 // https://github.com/minio/mc/issues/3073 1387 return c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, UseV1: true, MaxKeys: maxKeys}) 1388 } 1389 return c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, WithMetadata: metadata, MaxKeys: maxKeys}) 1390} 1391 1392func (c *S3Client) statIncompleteUpload(ctx context.Context, bucket, object string) (*ClientContent, *probe.Error) { 1393 nonRecursive := false 1394 objectMetadata := &ClientContent{} 1395 // Prefix to pass to minio-go listing in order to fetch a given object/directory 1396 prefix := strings.TrimRight(object, string(c.targetURL.Separator)) 1397 1398 for objectMultipartInfo := range c.api.ListIncompleteUploads(ctx, bucket, prefix, nonRecursive) { 1399 if objectMultipartInfo.Err != nil { 1400 return nil, probe.NewError(objectMultipartInfo.Err) 1401 } 1402 1403 if objectMultipartInfo.Key == object { 1404 objectMetadata.URL = c.targetURL.Clone() 1405 objectMetadata.Time = objectMultipartInfo.Initiated 1406 objectMetadata.Size = objectMultipartInfo.Size 1407 objectMetadata.Type = os.FileMode(0664) 1408 objectMetadata.Metadata = map[string]string{} 1409 return objectMetadata, nil 1410 } 1411 1412 if strings.HasSuffix(objectMultipartInfo.Key, string(c.targetURL.Separator)) { 1413 objectMetadata.URL = c.targetURL.Clone() 1414 objectMetadata.Type = os.ModeDir 1415 objectMetadata.Metadata = map[string]string{} 1416 return objectMetadata, nil 1417 } 1418 } 1419 return nil, probe.NewError(ObjectMissing{}) 1420} 1421 1422// Stat - send a 'HEAD' on a bucket or object to fetch its metadata. It also returns 1423// a DIR type content if a prefix does exist in the server. 1424func (c *S3Client) Stat(ctx context.Context, opts StatOptions) (*ClientContent, *probe.Error) { 1425 c.Lock() 1426 defer c.Unlock() 1427 bucket, object := c.url2BucketAndObject() 1428 1429 // Bucket name cannot be empty, stat on URL has no meaning. 1430 if bucket == "" { 1431 url := c.targetURL.Clone() 1432 url.Path = string(c.targetURL.Separator) 1433 return &ClientContent{URL: url, 1434 Size: 0, 1435 Type: os.ModeDir, 1436 }, nil 1437 } 1438 1439 if object == "" { 1440 content, err := c.bucketStat(ctx, bucket) 1441 if err != nil { 1442 return nil, err.Trace(bucket) 1443 } 1444 return content, nil 1445 } 1446 1447 // If the request is for incomplete upload stat, handle it here. 1448 if opts.incomplete { 1449 return c.statIncompleteUpload(ctx, bucket, object) 1450 } 1451 1452 // The following code tries to calculate if a given prefix/object does really exist 1453 // using minio-go listing API. The following inputs are supported: 1454 // - /path/to/existing/object 1455 // - /path/to/existing_directory 1456 // - /path/to/existing_directory/ 1457 // - /path/to/empty_directory 1458 // - /path/to/empty_directory/ 1459 1460 // First an HEAD call is issued, this is faster than doing listing even if the object exists 1461 // because the list could be very large. At the same time, the HEAD call is avoided if the 1462 // object already contains a trailing prefix or we passed rewind flag to know the object version 1463 // created just before the rewind parameter. 1464 if !strings.HasSuffix(object, string(c.targetURL.Separator)) && opts.timeRef.IsZero() { 1465 // Issue HEAD request first but ignore no such key error 1466 // so we can check if there is such prefix which exists 1467 ctnt, err := c.getObjectStat(ctx, bucket, object, minio.StatObjectOptions{ServerSideEncryption: opts.sse, VersionID: opts.versionID}) 1468 if err == nil { 1469 return ctnt, nil 1470 } 1471 1472 // Ignore object missing error but return for other errors 1473 if !errors.As(err.ToGoError(), &ObjectMissing{}) && !errors.As(err.ToGoError(), &ObjectIsDeleteMarker{}) { 1474 return nil, err 1475 } 1476 } 1477 1478 nonRecursive := false 1479 // Prefix to pass to minio-go listing in order to fetch if a prefix exists 1480 prefix := strings.TrimRight(object, string(c.targetURL.Separator)) 1481 1482 for objectStat := range c.listObjectWrapper(ctx, bucket, prefix, nonRecursive, opts.timeRef, false, false, false, 1) { 1483 if objectStat.Err != nil { 1484 return nil, probe.NewError(objectStat.Err) 1485 } 1486 1487 if object == objectStat.Key || object == strings.TrimSuffix(objectStat.Key, string(c.targetURL.Separator)) { 1488 return c.objectInfo2ClientContent(bucket, objectStat), nil 1489 } 1490 break 1491 } 1492 1493 return nil, probe.NewError(ObjectMissing{opts.timeRef}) 1494} 1495 1496// getObjectStat returns the metadata of an object from a HEAD call. 1497func (c *S3Client) getObjectStat(ctx context.Context, bucket, object string, opts minio.StatObjectOptions) (*ClientContent, *probe.Error) { 1498 objectStat, e := c.api.StatObject(ctx, bucket, object, opts) 1499 objectMetadata := c.objectInfo2ClientContent(bucket, objectStat) 1500 if e != nil { 1501 errResponse := minio.ToErrorResponse(e) 1502 if errResponse.Code == "AccessDenied" { 1503 return nil, probe.NewError(PathInsufficientPermission{Path: c.targetURL.String()}) 1504 } 1505 if errResponse.Code == "NoSuchBucket" { 1506 return nil, probe.NewError(BucketDoesNotExist{ 1507 Bucket: bucket, 1508 }) 1509 } 1510 if errResponse.Code == "InvalidBucketName" { 1511 return nil, probe.NewError(BucketInvalid{ 1512 Bucket: bucket, 1513 }) 1514 } 1515 if errResponse.Code == "NoSuchKey" { 1516 if objectMetadata.IsDeleteMarker { 1517 return nil, probe.NewError(ObjectIsDeleteMarker{}) 1518 } 1519 return nil, probe.NewError(ObjectMissing{}) 1520 } 1521 return nil, probe.NewError(e) 1522 } 1523 // HEAD with a version ID will not return version in the response headers 1524 if objectMetadata.VersionID == "" { 1525 objectMetadata.VersionID = opts.VersionID 1526 } 1527 return objectMetadata, nil 1528} 1529 1530func isAmazon(host string) bool { 1531 return s3utils.IsAmazonEndpoint(url.URL{Host: host}) 1532} 1533 1534func isAmazonChina(host string) bool { 1535 amazonS3ChinaHost := regexp.MustCompile(`^s3\.(cn.*?)\.amazonaws\.com\.cn$`) 1536 parts := amazonS3ChinaHost.FindStringSubmatch(host) 1537 return len(parts) > 1 1538} 1539 1540func isAmazonAccelerated(host string) bool { 1541 return host == "s3-accelerate.amazonaws.com" 1542} 1543 1544func isGoogle(host string) bool { 1545 return s3utils.IsGoogleEndpoint(url.URL{Host: host}) 1546} 1547 1548// Figure out if the URL is of 'virtual host' style. 1549// Use lookup from config to see if dns/path style look 1550// up should be used. If it is set to "auto", use virtual 1551// style for supported hosts such as Amazon S3 and Google 1552// Cloud Storage. Otherwise, default to path style 1553func isVirtualHostStyle(host string, lookup minio.BucketLookupType) bool { 1554 if lookup == minio.BucketLookupDNS { 1555 return true 1556 } 1557 if lookup == minio.BucketLookupPath { 1558 return false 1559 } 1560 return isAmazon(host) && !isAmazonChina(host) || isGoogle(host) || isAmazonAccelerated(host) 1561} 1562 1563// url2BucketAndObject gives bucketName and objectName from URL path. 1564func (c *S3Client) url2BucketAndObject() (bucketName, objectName string) { 1565 path := c.targetURL.Path 1566 // Convert any virtual host styled requests. 1567 // 1568 // For the time being this check is introduced for S3, 1569 // If you have custom virtual styled hosts please. 1570 // List them below. 1571 if c.virtualStyle { 1572 var bucket string 1573 hostIndex := strings.Index(c.targetURL.Host, "s3") 1574 if hostIndex != -1 && !matchS3InHost(c.targetURL.Host) { 1575 hostIndex = -1 1576 } 1577 if hostIndex == -1 { 1578 hostIndex = strings.Index(c.targetURL.Host, "s3-accelerate") 1579 } 1580 if hostIndex == -1 { 1581 hostIndex = strings.Index(c.targetURL.Host, "storage.googleapis") 1582 } 1583 if hostIndex > 0 { 1584 bucket = c.targetURL.Host[:hostIndex-1] 1585 path = string(c.targetURL.Separator) + bucket + c.targetURL.Path 1586 } 1587 } 1588 tokens := splitStr(path, string(c.targetURL.Separator), 3) 1589 return tokens[1], tokens[2] 1590} 1591 1592// splitPath split path into bucket and object. 1593func (c *S3Client) splitPath(path string) (bucketName, objectName string) { 1594 path = strings.TrimPrefix(path, string(c.targetURL.Separator)) 1595 1596 // Handle path if its virtual style. 1597 if c.virtualStyle { 1598 hostIndex := strings.Index(c.targetURL.Host, "s3") 1599 if hostIndex == -1 { 1600 hostIndex = strings.Index(c.targetURL.Host, "s3-accelerate") 1601 } 1602 if hostIndex == -1 { 1603 hostIndex = strings.Index(c.targetURL.Host, "storage.googleapis") 1604 } 1605 if hostIndex > 0 { 1606 bucketName = c.targetURL.Host[:hostIndex-1] 1607 objectName = path 1608 return bucketName, objectName 1609 } 1610 } 1611 1612 tokens := splitStr(path, string(c.targetURL.Separator), 2) 1613 return tokens[0], tokens[1] 1614} 1615 1616/// Bucket API operations. 1617 1618func (c *S3Client) listVersions(ctx context.Context, b, o string, isRecursive bool, timeRef time.Time, includeOlderVersions, withDeleteMarkers bool) chan minio.ObjectInfo { 1619 objectInfoCh := make(chan minio.ObjectInfo) 1620 go func() { 1621 defer close(objectInfoCh) 1622 c.listVersionsRoutine(ctx, b, o, isRecursive, timeRef, includeOlderVersions, withDeleteMarkers, objectInfoCh) 1623 }() 1624 return objectInfoCh 1625} 1626 1627func (c *S3Client) listVersionsRoutine(ctx context.Context, b, o string, isRecursive bool, timeRef time.Time, includeOlderVersions, withDeleteMarkers bool, objectInfoCh chan minio.ObjectInfo) { 1628 if timeRef.IsZero() { 1629 timeRef = time.Now().UTC() 1630 } 1631 1632 var buckets []string 1633 if b == "" { 1634 bucketsInfo, err := c.api.ListBuckets(ctx) 1635 if err != nil { 1636 objectInfoCh <- minio.ObjectInfo{ 1637 Err: err, 1638 } 1639 return 1640 } 1641 for _, b := range bucketsInfo { 1642 buckets = append(buckets, b.Name) 1643 } 1644 } else { 1645 buckets = append(buckets, b) 1646 } 1647 1648 for _, b := range buckets { 1649 var skipKey string 1650 for objectVersion := range c.api.ListObjects(ctx, b, minio.ListObjectsOptions{ 1651 Prefix: o, 1652 Recursive: isRecursive, 1653 WithVersions: true, 1654 }) { 1655 if objectVersion.Err != nil { 1656 objectInfoCh <- objectVersion 1657 continue 1658 } 1659 1660 if !includeOlderVersions && skipKey == objectVersion.Key { 1661 // Skip current version if not asked to list all versions 1662 // and we already listed the current object key name 1663 continue 1664 } 1665 1666 if objectVersion.LastModified.Before(timeRef) { 1667 skipKey = objectVersion.Key 1668 1669 // Skip if this is a delete marker and we are not asked to list it 1670 if !withDeleteMarkers && objectVersion.IsDeleteMarker { 1671 continue 1672 } 1673 1674 objectInfoCh <- objectVersion 1675 } 1676 } 1677 } 1678} 1679 1680// List - list at delimited path, if not recursive. 1681func (c *S3Client) List(ctx context.Context, opts ListOptions) <-chan *ClientContent { 1682 c.Lock() 1683 defer c.Unlock() 1684 1685 contentCh := make(chan *ClientContent) 1686 go func() { 1687 defer close(contentCh) 1688 if !opts.TimeRef.IsZero() || opts.WithOlderVersions { 1689 c.versionedList(ctx, contentCh, opts) 1690 } else { 1691 c.unversionedList(ctx, contentCh, opts) 1692 } 1693 }() 1694 1695 return contentCh 1696} 1697 1698// versionedList returns objects versions if the S3 backend supports versioning, 1699// it falls back to the regular listing if not. 1700func (c *S3Client) versionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) { 1701 b, o := c.url2BucketAndObject() 1702 switch { 1703 case b == "" && o == "": 1704 buckets, err := c.api.ListBuckets(ctx) 1705 if err != nil { 1706 contentCh <- &ClientContent{ 1707 Err: probe.NewError(err), 1708 } 1709 return 1710 } 1711 1712 for _, bucket := range buckets { 1713 if opts.ShowDir != DirLast { 1714 contentCh <- c.bucketInfo2ClientContent(bucket) 1715 } 1716 for objectVersion := range c.listVersions(ctx, bucket.Name, "", 1717 opts.Recursive, opts.TimeRef, opts.WithOlderVersions, opts.WithDeleteMarkers) { 1718 if objectVersion.Err != nil { 1719 if minio.ToErrorResponse(objectVersion.Err).Code == "NotImplemented" { 1720 goto noVersioning 1721 } else { 1722 contentCh <- &ClientContent{ 1723 Err: probe.NewError(objectVersion.Err), 1724 } 1725 continue 1726 } 1727 } 1728 contentCh <- c.objectInfo2ClientContent(bucket.Name, objectVersion) 1729 } 1730 1731 if opts.ShowDir == DirLast { 1732 contentCh <- c.bucketInfo2ClientContent(bucket) 1733 } 1734 } 1735 return 1736 default: 1737 for objectVersion := range c.listVersions(ctx, b, o, 1738 opts.Recursive, opts.TimeRef, opts.WithOlderVersions, opts.WithDeleteMarkers) { 1739 if objectVersion.Err != nil { 1740 if minio.ToErrorResponse(objectVersion.Err).Code == "NotImplemented" { 1741 goto noVersioning 1742 } else { 1743 contentCh <- &ClientContent{ 1744 Err: probe.NewError(objectVersion.Err), 1745 } 1746 continue 1747 } 1748 } 1749 contentCh <- c.objectInfo2ClientContent(b, objectVersion) 1750 } 1751 return 1752 } 1753 1754noVersioning: 1755 c.unversionedList(ctx, contentCh, opts) 1756 1757} 1758 1759// unversionedList is the non versioned S3 listing 1760func (c *S3Client) unversionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) { 1761 if opts.Incomplete { 1762 if opts.Recursive { 1763 c.listIncompleteRecursiveInRoutine(ctx, contentCh, opts) 1764 } else { 1765 c.listIncompleteInRoutine(ctx, contentCh, opts) 1766 } 1767 } else { 1768 if opts.Recursive { 1769 c.listRecursiveInRoutine(ctx, contentCh, opts) 1770 } else { 1771 c.listInRoutine(ctx, contentCh, opts) 1772 } 1773 } 1774} 1775 1776func (c *S3Client) listIncompleteInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) { 1777 // get bucket and object from URL. 1778 b, o := c.url2BucketAndObject() 1779 switch { 1780 case b == "" && o == "": 1781 buckets, err := c.api.ListBuckets(ctx) 1782 if err != nil { 1783 contentCh <- &ClientContent{ 1784 Err: probe.NewError(err), 1785 } 1786 return 1787 } 1788 isRecursive := false 1789 for _, bucket := range buckets { 1790 for object := range c.api.ListIncompleteUploads(ctx, bucket.Name, o, isRecursive) { 1791 if object.Err != nil { 1792 contentCh <- &ClientContent{ 1793 Err: probe.NewError(object.Err), 1794 } 1795 return 1796 } 1797 content := &ClientContent{} 1798 url := c.targetURL.Clone() 1799 // Join bucket with - incoming object key. 1800 url.Path = c.joinPath(bucket.Name, object.Key) 1801 switch { 1802 case strings.HasSuffix(object.Key, string(c.targetURL.Separator)): 1803 // We need to keep the trailing Separator, do not use filepath.Join(). 1804 content.URL = url 1805 content.Time = time.Now() 1806 content.Type = os.ModeDir 1807 default: 1808 content.URL = url 1809 content.Size = object.Size 1810 content.Time = object.Initiated 1811 content.Type = os.ModeTemporary 1812 } 1813 contentCh <- content 1814 } 1815 } 1816 default: 1817 isRecursive := false 1818 for object := range c.api.ListIncompleteUploads(ctx, b, o, isRecursive) { 1819 if object.Err != nil { 1820 contentCh <- &ClientContent{ 1821 Err: probe.NewError(object.Err), 1822 } 1823 return 1824 } 1825 content := &ClientContent{} 1826 url := c.targetURL.Clone() 1827 // Join bucket with - incoming object key. 1828 url.Path = c.joinPath(b, object.Key) 1829 switch { 1830 case strings.HasSuffix(object.Key, string(c.targetURL.Separator)): 1831 // We need to keep the trailing Separator, do not use filepath.Join(). 1832 content.URL = url 1833 content.Time = time.Now() 1834 content.Type = os.ModeDir 1835 default: 1836 content.URL = url 1837 content.Size = object.Size 1838 content.Time = object.Initiated 1839 content.Type = os.ModeTemporary 1840 } 1841 contentCh <- content 1842 } 1843 } 1844} 1845 1846func (c *S3Client) listIncompleteRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) { 1847 // get bucket and object from URL. 1848 b, o := c.url2BucketAndObject() 1849 switch { 1850 case b == "" && o == "": 1851 buckets, err := c.api.ListBuckets(ctx) 1852 if err != nil { 1853 contentCh <- &ClientContent{ 1854 Err: probe.NewError(err), 1855 } 1856 return 1857 } 1858 isRecursive := true 1859 for _, bucket := range buckets { 1860 if opts.ShowDir != DirLast { 1861 contentCh <- c.bucketInfo2ClientContent(bucket) 1862 } 1863 1864 for object := range c.api.ListIncompleteUploads(ctx, bucket.Name, o, isRecursive) { 1865 if object.Err != nil { 1866 contentCh <- &ClientContent{ 1867 Err: probe.NewError(object.Err), 1868 } 1869 return 1870 } 1871 url := c.targetURL.Clone() 1872 url.Path = c.joinPath(bucket.Name, object.Key) 1873 content := &ClientContent{} 1874 content.URL = url 1875 content.Size = object.Size 1876 content.Time = object.Initiated 1877 content.Type = os.ModeTemporary 1878 contentCh <- content 1879 } 1880 1881 if opts.ShowDir == DirLast { 1882 contentCh <- c.bucketInfo2ClientContent(bucket) 1883 } 1884 } 1885 default: 1886 isRecursive := true 1887 for object := range c.api.ListIncompleteUploads(ctx, b, o, isRecursive) { 1888 if object.Err != nil { 1889 contentCh <- &ClientContent{ 1890 Err: probe.NewError(object.Err), 1891 } 1892 return 1893 } 1894 url := c.targetURL.Clone() 1895 // Join bucket and incoming object key. 1896 url.Path = c.joinPath(b, object.Key) 1897 content := &ClientContent{} 1898 content.URL = url 1899 content.Size = object.Size 1900 content.Time = object.Initiated 1901 content.Type = os.ModeTemporary 1902 contentCh <- content 1903 } 1904 } 1905} 1906 1907// Returns new path by joining path segments with URL path separator. 1908func (c *S3Client) joinPath(bucket string, objects ...string) string { 1909 p := string(c.targetURL.Separator) + bucket 1910 for _, o := range objects { 1911 p += string(c.targetURL.Separator) + o 1912 } 1913 return p 1914} 1915 1916// Convert objectInfo to ClientContent 1917func (c *S3Client) bucketInfo2ClientContent(bucket minio.BucketInfo) *ClientContent { 1918 content := &ClientContent{} 1919 url := c.targetURL.Clone() 1920 url.Path = c.joinPath(bucket.Name) 1921 content.URL = url 1922 content.Size = 0 1923 content.Time = bucket.CreationDate 1924 content.Type = os.ModeDir 1925 return content 1926} 1927 1928// Convert objectInfo to ClientContent 1929func (c *S3Client) objectInfo2ClientContent(bucket string, entry minio.ObjectInfo) *ClientContent { 1930 content := &ClientContent{} 1931 url := c.targetURL.Clone() 1932 // Join bucket and incoming object key. 1933 if bucket == "" { 1934 panic("should never happen, bucket cannot be empty") 1935 } 1936 url.Path = c.joinPath(bucket, entry.Key) 1937 content.URL = url 1938 content.Size = entry.Size 1939 content.ETag = entry.ETag 1940 content.Time = entry.LastModified 1941 content.Expires = entry.Expires 1942 content.Expiration = entry.Expiration 1943 content.ExpirationRuleID = entry.ExpirationRuleID 1944 content.VersionID = entry.VersionID 1945 content.StorageClass = entry.StorageClass 1946 content.IsDeleteMarker = entry.IsDeleteMarker 1947 content.IsLatest = entry.IsLatest 1948 content.Restore = entry.Restore 1949 content.Metadata = map[string]string{} 1950 content.UserMetadata = map[string]string{} 1951 content.ReplicationStatus = entry.ReplicationStatus 1952 for k, v := range entry.UserMetadata { 1953 content.UserMetadata[k] = v 1954 } 1955 for k := range entry.Metadata { 1956 content.Metadata[k] = entry.Metadata.Get(k) 1957 } 1958 attr, _ := parseAttribute(content.UserMetadata) 1959 if len(attr) > 0 { 1960 _, mtime, _ := parseAtimeMtime(attr) 1961 if !mtime.IsZero() { 1962 content.Time = mtime 1963 } 1964 } 1965 attr, _ = parseAttribute(content.Metadata) 1966 if len(attr) > 0 { 1967 _, mtime, _ := parseAtimeMtime(attr) 1968 if !mtime.IsZero() { 1969 content.Time = mtime 1970 } 1971 } 1972 1973 if strings.HasSuffix(entry.Key, string(c.targetURL.Separator)) { 1974 content.Type = os.ModeDir 1975 if content.Time.IsZero() { 1976 content.Time = time.Now() 1977 } 1978 } else { 1979 content.Type = os.FileMode(0664) 1980 } 1981 1982 return content 1983} 1984 1985// Returns bucket stat info of current bucket. 1986func (c *S3Client) bucketStat(ctx context.Context, bucket string) (*ClientContent, *probe.Error) { 1987 exists, e := c.api.BucketExists(ctx, bucket) 1988 if e != nil { 1989 return nil, probe.NewError(e) 1990 } 1991 if !exists { 1992 return nil, probe.NewError(BucketDoesNotExist{Bucket: bucket}) 1993 } 1994 return &ClientContent{URL: c.targetURL.Clone(), Time: time.Unix(0, 0), Type: os.ModeDir}, nil 1995} 1996 1997func (c *S3Client) listInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) { 1998 // get bucket and object from URL. 1999 b, o := c.url2BucketAndObject() 2000 switch { 2001 case b == "" && o == "": 2002 buckets, e := c.api.ListBuckets(ctx) 2003 if e != nil { 2004 contentCh <- &ClientContent{ 2005 Err: probe.NewError(e), 2006 } 2007 return 2008 } 2009 for _, bucket := range buckets { 2010 contentCh <- c.bucketInfo2ClientContent(bucket) 2011 } 2012 case b != "" && !strings.HasSuffix(c.targetURL.Path, string(c.targetURL.Separator)) && o == "": 2013 content, err := c.bucketStat(ctx, b) 2014 if err != nil { 2015 contentCh <- &ClientContent{Err: err.Trace(b)} 2016 return 2017 } 2018 contentCh <- content 2019 default: 2020 isRecursive := false 2021 for object := range c.listObjectWrapper(ctx, b, o, isRecursive, time.Time{}, false, false, opts.WithMetadata, -1) { 2022 if object.Err != nil { 2023 contentCh <- &ClientContent{ 2024 Err: probe.NewError(object.Err), 2025 } 2026 return 2027 } 2028 2029 // Avoid sending an empty directory when we are specifically listing it 2030 if strings.HasSuffix(object.Key, string(c.targetURL.Separator)) && o == object.Key { 2031 continue 2032 } 2033 2034 contentCh <- c.objectInfo2ClientContent(b, object) 2035 } 2036 } 2037} 2038 2039// S3 offers a range of storage classes designed for 2040// different use cases, following list captures these. 2041const ( 2042 // General purpose. 2043 // s3StorageClassStandard = "STANDARD" 2044 // Infrequent access. 2045 // s3StorageClassInfrequent = "STANDARD_IA" 2046 // Reduced redundancy access. 2047 // s3StorageClassRedundancy = "REDUCED_REDUNDANCY" 2048 // Archive access. 2049 s3StorageClassGlacier = "GLACIER" 2050) 2051 2052func (c *S3Client) listRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions) { 2053 // get bucket and object from URL. 2054 b, o := c.url2BucketAndObject() 2055 switch { 2056 case b == "" && o == "": 2057 buckets, err := c.api.ListBuckets(ctx) 2058 if err != nil { 2059 contentCh <- &ClientContent{ 2060 Err: probe.NewError(err), 2061 } 2062 return 2063 } 2064 for _, bucket := range buckets { 2065 if opts.ShowDir == DirFirst { 2066 contentCh <- c.bucketInfo2ClientContent(bucket) 2067 } 2068 2069 isRecursive := true 2070 for object := range c.listObjectWrapper(ctx, bucket.Name, o, isRecursive, time.Time{}, false, false, opts.WithMetadata, -1) { 2071 if object.Err != nil { 2072 contentCh <- &ClientContent{ 2073 Err: probe.NewError(object.Err), 2074 } 2075 return 2076 } 2077 contentCh <- c.objectInfo2ClientContent(bucket.Name, object) 2078 } 2079 2080 if opts.ShowDir == DirLast { 2081 contentCh <- c.bucketInfo2ClientContent(bucket) 2082 } 2083 } 2084 default: 2085 isRecursive := true 2086 for object := range c.listObjectWrapper(ctx, b, o, isRecursive, time.Time{}, false, false, opts.WithMetadata, -1) { 2087 if object.Err != nil { 2088 contentCh <- &ClientContent{ 2089 Err: probe.NewError(object.Err), 2090 } 2091 return 2092 } 2093 contentCh <- c.objectInfo2ClientContent(b, object) 2094 } 2095 } 2096} 2097 2098// ShareDownload - get a usable presigned object url to share. 2099func (c *S3Client) ShareDownload(ctx context.Context, versionID string, expires time.Duration) (string, *probe.Error) { 2100 bucket, object := c.url2BucketAndObject() 2101 // No additional request parameters are set for the time being. 2102 reqParams := make(url.Values) 2103 if versionID != "" { 2104 reqParams.Set("versionId", versionID) 2105 } 2106 presignedURL, e := c.api.PresignedGetObject(ctx, bucket, object, expires, reqParams) 2107 if e != nil { 2108 return "", probe.NewError(e) 2109 } 2110 return presignedURL.String(), nil 2111} 2112 2113// ShareUpload - get data for presigned post http form upload. 2114func (c *S3Client) ShareUpload(ctx context.Context, isRecursive bool, expires time.Duration, contentType string) (string, map[string]string, *probe.Error) { 2115 bucket, object := c.url2BucketAndObject() 2116 p := minio.NewPostPolicy() 2117 if e := p.SetExpires(UTCNow().Add(expires)); e != nil { 2118 return "", nil, probe.NewError(e) 2119 } 2120 if strings.TrimSpace(contentType) != "" || contentType != "" { 2121 // No need to verify for error here, since we have stripped out spaces. 2122 p.SetContentType(contentType) 2123 } 2124 if e := p.SetBucket(bucket); e != nil { 2125 return "", nil, probe.NewError(e) 2126 } 2127 if isRecursive { 2128 if e := p.SetKeyStartsWith(object); e != nil { 2129 return "", nil, probe.NewError(e) 2130 } 2131 } else { 2132 if e := p.SetKey(object); e != nil { 2133 return "", nil, probe.NewError(e) 2134 } 2135 } 2136 u, m, e := c.api.PresignedPostPolicy(ctx, p) 2137 if e != nil { 2138 return "", nil, probe.NewError(e) 2139 } 2140 return u.String(), m, nil 2141} 2142 2143// SetObjectLockConfig - Set object lock configurataion of bucket. 2144func (c *S3Client) SetObjectLockConfig(ctx context.Context, mode minio.RetentionMode, validity uint64, unit minio.ValidityUnit) *probe.Error { 2145 bucket, _ := c.url2BucketAndObject() 2146 2147 // FIXME: This is too ugly, fix minio-go 2148 vuint := (uint)(validity) 2149 if mode != "" && vuint > 0 && unit != "" { 2150 e := c.api.SetBucketObjectLockConfig(ctx, bucket, &mode, &vuint, &unit) 2151 if e != nil { 2152 return probe.NewError(e).Trace(c.GetURL().String()) 2153 } 2154 return nil 2155 } 2156 if mode == "" && vuint == 0 && unit == "" { 2157 e := c.api.SetBucketObjectLockConfig(ctx, bucket, nil, nil, nil) 2158 if e != nil { 2159 return probe.NewError(e).Trace(c.GetURL().String()) 2160 } 2161 return nil 2162 } 2163 return errInvalidArgument().Trace(c.GetURL().String()) 2164} 2165 2166// PutObjectRetention - Set object retention for a given object. 2167func (c *S3Client) PutObjectRetention(ctx context.Context, versionID string, mode minio.RetentionMode, retainUntilDate time.Time, bypassGovernance bool) *probe.Error { 2168 bucket, object := c.url2BucketAndObject() 2169 2170 var ( 2171 modePtr *minio.RetentionMode 2172 retainUntilDatePtr *time.Time 2173 ) 2174 2175 if mode != "" && retainUntilDate.IsZero() { 2176 return errInvalidArgument().Trace(c.GetURL().String()) 2177 } 2178 2179 if mode != "" { 2180 modePtr = &mode 2181 retainUntilDatePtr = &retainUntilDate 2182 } 2183 2184 opts := minio.PutObjectRetentionOptions{ 2185 VersionID: versionID, 2186 RetainUntilDate: retainUntilDatePtr, 2187 Mode: modePtr, 2188 GovernanceBypass: bypassGovernance, 2189 } 2190 e := c.api.PutObjectRetention(ctx, bucket, object, opts) 2191 if e != nil { 2192 return probe.NewError(e).Trace(c.GetURL().String()) 2193 } 2194 return nil 2195} 2196 2197// GetObjectRetention - Get object retention for a given object. 2198func (c *S3Client) GetObjectRetention(ctx context.Context, versionID string) (minio.RetentionMode, time.Time, *probe.Error) { 2199 bucket, object := c.url2BucketAndObject() 2200 if object == "" { 2201 return "", time.Time{}, probe.NewError(ObjectNameEmpty{}).Trace(c.GetURL().String()) 2202 } 2203 modePtr, untilPtr, e := c.api.GetObjectRetention(ctx, bucket, object, versionID) 2204 if e != nil { 2205 return "", time.Time{}, probe.NewError(e).Trace(c.GetURL().String()) 2206 } 2207 var ( 2208 mode minio.RetentionMode 2209 until time.Time 2210 ) 2211 if modePtr != nil { 2212 mode = *modePtr 2213 } 2214 if untilPtr != nil { 2215 until = *untilPtr 2216 } 2217 return mode, until, nil 2218} 2219 2220// PutObjectLegalHold - Set object legal hold for a given object. 2221func (c *S3Client) PutObjectLegalHold(ctx context.Context, versionID string, lhold minio.LegalHoldStatus) *probe.Error { 2222 bucket, object := c.url2BucketAndObject() 2223 if lhold.IsValid() { 2224 opts := minio.PutObjectLegalHoldOptions{ 2225 Status: &lhold, 2226 VersionID: versionID, 2227 } 2228 e := c.api.PutObjectLegalHold(ctx, bucket, object, opts) 2229 if e != nil { 2230 return probe.NewError(e).Trace(c.GetURL().String()) 2231 } 2232 return nil 2233 } 2234 return errInvalidArgument().Trace(c.GetURL().String()) 2235} 2236 2237// GetObjectLegalHold - Get object legal hold for a given object. 2238func (c *S3Client) GetObjectLegalHold(ctx context.Context, versionID string) (minio.LegalHoldStatus, *probe.Error) { 2239 var lhold minio.LegalHoldStatus 2240 bucket, object := c.url2BucketAndObject() 2241 opts := minio.GetObjectLegalHoldOptions{ 2242 VersionID: versionID, 2243 } 2244 lhPtr, e := c.api.GetObjectLegalHold(ctx, bucket, object, opts) 2245 if e != nil { 2246 errResp := minio.ToErrorResponse(e) 2247 if errResp.Code != "NoSuchObjectLockConfiguration" { 2248 return "", probe.NewError(e).Trace(c.GetURL().String()) 2249 } 2250 return "", nil 2251 } 2252 // lhPtr can be nil if there is no legalhold status set 2253 if lhPtr != nil { 2254 lhold = *lhPtr 2255 } 2256 return lhold, nil 2257} 2258 2259// GetObjectLockConfig - Get object lock configuration of bucket. 2260func (c *S3Client) GetObjectLockConfig(ctx context.Context) (string, minio.RetentionMode, uint64, minio.ValidityUnit, *probe.Error) { 2261 bucket, _ := c.url2BucketAndObject() 2262 2263 status, mode, validity, unit, e := c.api.GetObjectLockConfig(ctx, bucket) 2264 if e != nil { 2265 return "", "", 0, "", probe.NewError(e).Trace(c.GetURL().String()) 2266 } 2267 2268 if mode != nil && validity != nil && unit != nil { 2269 // FIXME: this is too ugly, fix minio-go 2270 vuint64 := uint64(*validity) 2271 return status, *mode, vuint64, *unit, nil 2272 } 2273 2274 return status, "", 0, "", nil 2275} 2276 2277// GetTags - Get tags of bucket or object. 2278func (c *S3Client) GetTags(ctx context.Context, versionID string) (map[string]string, *probe.Error) { 2279 bucketName, objectName := c.url2BucketAndObject() 2280 if bucketName == "" { 2281 return nil, probe.NewError(BucketNameEmpty{}) 2282 } 2283 2284 if objectName == "" { 2285 if versionID != "" { 2286 return nil, probe.NewError(errors.New("getting bucket tags does not support versioning parameters")) 2287 } 2288 2289 tags, err := c.api.GetBucketTagging(ctx, bucketName) 2290 if err != nil { 2291 return nil, probe.NewError(err) 2292 } 2293 2294 return tags.ToMap(), nil 2295 } 2296 2297 tags, err := c.api.GetObjectTagging(ctx, bucketName, objectName, minio.GetObjectTaggingOptions{VersionID: versionID}) 2298 if err != nil { 2299 return nil, probe.NewError(err) 2300 } 2301 2302 return tags.ToMap(), nil 2303} 2304 2305// SetTags - Set tags of bucket or object. 2306func (c *S3Client) SetTags(ctx context.Context, versionID, tagString string) *probe.Error { 2307 bucketName, objectName := c.url2BucketAndObject() 2308 if bucketName == "" { 2309 return probe.NewError(BucketNameEmpty{}) 2310 } 2311 2312 tags, err := tags.Parse(tagString, objectName != "") 2313 if err != nil { 2314 return probe.NewError(err) 2315 } 2316 2317 if objectName == "" { 2318 if versionID != "" { 2319 return probe.NewError(errors.New("setting bucket tags does not support versioning parameters")) 2320 } 2321 err = c.api.SetBucketTagging(ctx, bucketName, tags) 2322 } else { 2323 err = c.api.PutObjectTagging(ctx, bucketName, objectName, tags, minio.PutObjectTaggingOptions{VersionID: versionID}) 2324 } 2325 2326 if err != nil { 2327 return probe.NewError(err) 2328 } 2329 2330 return nil 2331} 2332 2333// DeleteTags - Delete tags of bucket or object 2334func (c *S3Client) DeleteTags(ctx context.Context, versionID string) *probe.Error { 2335 bucketName, objectName := c.url2BucketAndObject() 2336 if bucketName == "" { 2337 return probe.NewError(BucketNameEmpty{}) 2338 } 2339 2340 var err error 2341 if objectName == "" { 2342 if versionID != "" { 2343 return probe.NewError(errors.New("setting bucket tags does not support versioning parameters")) 2344 } 2345 err = c.api.RemoveBucketTagging(ctx, bucketName) 2346 } else { 2347 err = c.api.RemoveObjectTagging(ctx, bucketName, objectName, minio.RemoveObjectTaggingOptions{VersionID: versionID}) 2348 } 2349 2350 if err != nil { 2351 return probe.NewError(err) 2352 } 2353 2354 return nil 2355} 2356 2357// GetLifecycle - Get current lifecycle configuration. 2358func (c *S3Client) GetLifecycle(ctx context.Context) (*lifecycle.Configuration, *probe.Error) { 2359 bucket, _ := c.url2BucketAndObject() 2360 if bucket == "" { 2361 return nil, probe.NewError(BucketNameEmpty{}) 2362 } 2363 2364 config, e := c.api.GetBucketLifecycle(ctx, bucket) 2365 if e != nil { 2366 return nil, probe.NewError(e) 2367 } 2368 2369 return config, nil 2370} 2371 2372// SetLifecycle - Set lifecycle configuration on a bucket 2373func (c *S3Client) SetLifecycle(ctx context.Context, config *lifecycle.Configuration) *probe.Error { 2374 bucket, _ := c.url2BucketAndObject() 2375 if bucket == "" { 2376 return probe.NewError(BucketNameEmpty{}) 2377 } 2378 2379 if e := c.api.SetBucketLifecycle(ctx, bucket, config); e != nil { 2380 return probe.NewError(e) 2381 } 2382 2383 return nil 2384} 2385 2386// GetVersion - gets bucket version info. 2387func (c *S3Client) GetVersion(ctx context.Context) (config minio.BucketVersioningConfiguration, err *probe.Error) { 2388 bucket, _ := c.url2BucketAndObject() 2389 if bucket == "" { 2390 return config, probe.NewError(BucketNameEmpty{}) 2391 } 2392 var e error 2393 config, e = c.api.GetBucketVersioning(ctx, bucket) 2394 if e != nil { 2395 return config, probe.NewError(e) 2396 } 2397 2398 return config, nil 2399} 2400 2401// SetVersion - Set version configuration on a bucket 2402func (c *S3Client) SetVersion(ctx context.Context, status string) *probe.Error { 2403 bucket, _ := c.url2BucketAndObject() 2404 if bucket == "" { 2405 return probe.NewError(BucketNameEmpty{}) 2406 } 2407 var err error 2408 switch status { 2409 case "enable": 2410 err = c.api.EnableVersioning(ctx, bucket) 2411 case "suspend": 2412 err = c.api.SuspendVersioning(ctx, bucket) 2413 default: 2414 return probe.NewError(fmt.Errorf("Invalid versioning status")) 2415 } 2416 return probe.NewError(err) 2417} 2418 2419// GetReplication - gets replication configuration for a given bucket. 2420func (c *S3Client) GetReplication(ctx context.Context) (replication.Config, *probe.Error) { 2421 bucket, _ := c.url2BucketAndObject() 2422 if bucket == "" { 2423 return replication.Config{}, probe.NewError(BucketNameEmpty{}) 2424 } 2425 2426 replicationCfg, e := c.api.GetBucketReplication(ctx, bucket) 2427 if e != nil { 2428 return replication.Config{}, probe.NewError(e) 2429 } 2430 return replicationCfg, nil 2431} 2432 2433// RemoveReplication - removes replication configuration for a given bucket. 2434func (c *S3Client) RemoveReplication(ctx context.Context) *probe.Error { 2435 bucket, _ := c.url2BucketAndObject() 2436 if bucket == "" { 2437 return probe.NewError(BucketNameEmpty{}) 2438 } 2439 2440 e := c.api.RemoveBucketReplication(ctx, bucket) 2441 return probe.NewError(e) 2442} 2443 2444// SetReplication sets replication configuration for a given bucket. 2445func (c *S3Client) SetReplication(ctx context.Context, cfg *replication.Config, opts replication.Options) *probe.Error { 2446 bucket, objectPrefix := c.url2BucketAndObject() 2447 if bucket == "" { 2448 return probe.NewError(BucketNameEmpty{}) 2449 } 2450 opts.Prefix = objectPrefix 2451 switch opts.Op { 2452 case replication.AddOption: 2453 if e := cfg.AddRule(opts); e != nil { 2454 return probe.NewError(e) 2455 } 2456 case replication.SetOption: 2457 if e := cfg.EditRule(opts); e != nil { 2458 return probe.NewError(e) 2459 } 2460 case replication.RemoveOption: 2461 if e := cfg.RemoveRule(opts); e != nil { 2462 return probe.NewError(e) 2463 } 2464 case replication.ImportOption: 2465 default: 2466 return probe.NewError(fmt.Errorf("Invalid replication option")) 2467 } 2468 if e := c.api.SetBucketReplication(ctx, bucket, *cfg); e != nil { 2469 return probe.NewError(e) 2470 } 2471 return nil 2472} 2473 2474// GetReplicationMetrics - Get replication metrics for a given bucket. 2475func (c *S3Client) GetReplicationMetrics(ctx context.Context) (replication.Metrics, *probe.Error) { 2476 bucket, _ := c.url2BucketAndObject() 2477 if bucket == "" { 2478 return replication.Metrics{}, probe.NewError(BucketNameEmpty{}) 2479 } 2480 2481 metrics, e := c.api.GetBucketReplicationMetrics(ctx, bucket) 2482 if e != nil { 2483 return replication.Metrics{}, probe.NewError(e) 2484 } 2485 return metrics, nil 2486} 2487 2488// ResetReplication - kicks off replication again on previously replicated objects if existing object 2489// replication is enabled in the replication config.Optional to provide a timestamp 2490func (c *S3Client) ResetReplication(ctx context.Context, before time.Duration, tgtArn string) (rinfo replication.ResyncTargetsInfo, err *probe.Error) { 2491 bucket, _ := c.url2BucketAndObject() 2492 if bucket == "" { 2493 return rinfo, probe.NewError(BucketNameEmpty{}) 2494 } 2495 2496 rinfo, e := c.api.ResetBucketReplicationOnTarget(ctx, bucket, before, tgtArn) 2497 if e != nil { 2498 return rinfo, probe.NewError(e) 2499 } 2500 return rinfo, nil 2501} 2502 2503// GetEncryption - gets bucket encryption info. 2504func (c *S3Client) GetEncryption(ctx context.Context) (algorithm, keyID string, err *probe.Error) { 2505 bucket, _ := c.url2BucketAndObject() 2506 if bucket == "" { 2507 return "", "", probe.NewError(BucketNameEmpty{}) 2508 } 2509 2510 config, e := c.api.GetBucketEncryption(ctx, bucket) 2511 if e != nil { 2512 return "", "", probe.NewError(e) 2513 } 2514 for _, rule := range config.Rules { 2515 algorithm = rule.Apply.SSEAlgorithm 2516 if rule.Apply.KmsMasterKeyID != "" { 2517 keyID = rule.Apply.KmsMasterKeyID 2518 break 2519 } 2520 } 2521 return algorithm, keyID, nil 2522} 2523 2524// SetEncryption - Set encryption configuration on a bucket 2525func (c *S3Client) SetEncryption(ctx context.Context, encType string, kmsKeyID string) *probe.Error { 2526 bucket, _ := c.url2BucketAndObject() 2527 if bucket == "" { 2528 return probe.NewError(BucketNameEmpty{}) 2529 } 2530 var config *sse.Configuration 2531 switch strings.ToLower(encType) { 2532 case "sse-kms": 2533 config = sse.NewConfigurationSSEKMS(kmsKeyID) 2534 case "sse-s3": 2535 config = sse.NewConfigurationSSES3() 2536 default: 2537 return probe.NewError(fmt.Errorf("Invalid encryption algorithm %s", encType)) 2538 } 2539 if err := c.api.SetBucketEncryption(ctx, bucket, config); err != nil { 2540 return probe.NewError(err) 2541 } 2542 return nil 2543} 2544 2545// DeleteEncryption - removes encryption configuration on a bucket 2546func (c *S3Client) DeleteEncryption(ctx context.Context) *probe.Error { 2547 bucket, _ := c.url2BucketAndObject() 2548 if bucket == "" { 2549 return probe.NewError(BucketNameEmpty{}) 2550 } 2551 if err := c.api.RemoveBucketEncryption(ctx, bucket); err != nil { 2552 return probe.NewError(err) 2553 } 2554 return nil 2555} 2556 2557// GetBucketInfo gets info about a bucket 2558func (c *S3Client) GetBucketInfo(ctx context.Context) (BucketInfo, *probe.Error) { 2559 var b BucketInfo 2560 bucket, _ := c.url2BucketAndObject() 2561 if bucket == "" { 2562 return b, probe.NewError(BucketNameEmpty{}) 2563 } 2564 content, err := c.bucketStat(ctx, bucket) 2565 if err != nil { 2566 return b, err.Trace(bucket) 2567 } 2568 b.URL = content.URL 2569 b.Size = content.Size 2570 b.Type = content.Type 2571 b.Date = content.Time 2572 if vcfg, err := c.GetVersion(ctx); err == nil { 2573 b.Versioning.Status = vcfg.Status 2574 b.Versioning.MFADelete = vcfg.MFADelete 2575 } 2576 if enabled, mode, validity, unit, err := c.api.GetObjectLockConfig(ctx, bucket); err == nil { 2577 if mode != nil { 2578 b.Locking.Mode = *mode 2579 } 2580 b.Locking.Enabled = enabled 2581 if validity != nil && unit != nil { 2582 vuint64 := uint64(*validity) 2583 b.Locking.Validity = fmt.Sprintf("%d%s", vuint64, unit) 2584 } 2585 } 2586 2587 if rcfg, err := c.GetReplication(ctx); err == nil { 2588 if !rcfg.Empty() { 2589 b.Replication.Enabled = true 2590 } 2591 } 2592 if algo, keyID, err := c.GetEncryption(ctx); err == nil { 2593 b.Encryption.Algorithm = algo 2594 b.Encryption.KeyID = keyID 2595 } 2596 2597 if pType, policyStr, err := c.GetAccess(ctx); err == nil { 2598 b.Policy.Type = pType 2599 b.Policy.Text = policyStr 2600 } 2601 location, e := c.api.GetBucketLocation(ctx, bucket) 2602 if e != nil { 2603 return b, probe.NewError(e) 2604 } 2605 b.Location = location 2606 if tags, err := c.GetTags(ctx, ""); err == nil { 2607 b.Tagging = tags 2608 } 2609 if lfc, err := c.GetLifecycle(ctx); err == nil { 2610 b.ILM.Config = lfc 2611 } 2612 if nfc, err := c.api.GetBucketNotification(ctx, bucket); err == nil { 2613 b.Notification.Config = nfc 2614 } 2615 return b, nil 2616} 2617 2618// Restore gets a copy of an archived object 2619func (c *S3Client) Restore(ctx context.Context, versionID string, days int) *probe.Error { 2620 bucket, object := c.url2BucketAndObject() 2621 if bucket == "" { 2622 return probe.NewError(BucketNameEmpty{}) 2623 } 2624 if object == "" { 2625 return probe.NewError(ObjectNameEmpty{}) 2626 } 2627 2628 req := minio.RestoreRequest{} 2629 req.SetDays(days) 2630 req.SetGlacierJobParameters(minio.GlacierJobParameters{Tier: minio.TierExpedited}) 2631 if err := c.api.RestoreObject(ctx, bucket, object, versionID, req); err != nil { 2632 return probe.NewError(err) 2633 } 2634 return nil 2635} 2636