1//
2// Copyright (c) 2018, Joyent, Inc. All rights reserved.
3//
4// This Source Code Form is subject to the terms of the Mozilla Public
5// License, v. 2.0. If a copy of the MPL was not distributed with this
6// file, You can obtain one at http://mozilla.org/MPL/2.0/.
7//
8
9package storage
10
11import (
12	"context"
13	"encoding/json"
14	"io"
15	"net/http"
16	"net/url"
17	"path"
18	"strconv"
19	"strings"
20	"time"
21
22	"github.com/joyent/triton-go/client"
23	tt "github.com/joyent/triton-go/errors"
24	"github.com/pkg/errors"
25)
26
27type ObjectsClient struct {
28	client *client.Client
29}
30
31// AbortMpuInput represents parameters to an AbortMpu operation
32type AbortMpuInput struct {
33	PartsDirectoryPath string
34}
35
36func (s *ObjectsClient) AbortMultipartUpload(ctx context.Context, input *AbortMpuInput) error {
37	return abortMpu(*s, ctx, input)
38}
39
40// CommitMpuInput represents parameters to a CommitMpu operation
41type CommitMpuInput struct {
42	Id      string
43	Headers map[string]string
44	Body    CommitMpuBody
45}
46
47// CommitMpuBody represents the body of a CommitMpu request
48type CommitMpuBody struct {
49	Parts []string `json:"parts"`
50}
51
52func (s *ObjectsClient) CommitMultipartUpload(ctx context.Context, input *CommitMpuInput) error {
53	return commitMpu(*s, ctx, input)
54}
55
56// CreateMpuInput represents parameters to a CreateMpu operation.
57type CreateMpuInput struct {
58	Body            CreateMpuBody
59	ContentLength   uint64
60	ContentMD5      string
61	DurabilityLevel uint64
62	ForceInsert     bool //Force the creation of the directory tree
63}
64
65// CreateMpuOutput represents the response from a CreateMpu operation
66type CreateMpuOutput struct {
67	Id             string `json:"id"`
68	PartsDirectory string `json:"partsDirectory"`
69}
70
71// CreateMpuBody represents the body of a CreateMpu request.
72type CreateMpuBody struct {
73	ObjectPath string            `json:"objectPath"`
74	Headers    map[string]string `json:"headers,omitempty"`
75}
76
77func (s *ObjectsClient) CreateMultipartUpload(ctx context.Context, input *CreateMpuInput) (*CreateMpuOutput, error) {
78	return createMpu(*s, ctx, input)
79}
80
81// GetObjectInput represents parameters to a GetObject operation.
82type GetInfoInput struct {
83	ObjectPath string
84	Headers    map[string]string
85}
86
87// GetObjectOutput contains the outputs for a GetObject operation. It is your
88// responsibility to ensure that the io.ReadCloser ObjectReader is closed.
89type GetInfoOutput struct {
90	ContentLength uint64
91	ContentType   string
92	LastModified  time.Time
93	ContentMD5    string
94	ETag          string
95	Metadata      map[string]string
96}
97
98// GetInfo sends a HEAD request to an object in the Manta service. This function
99// does not return a response body.
100func (s *ObjectsClient) GetInfo(ctx context.Context, input *GetInfoInput) (*GetInfoOutput, error) {
101	absPath := absFileInput(s.client.AccountName, input.ObjectPath)
102
103	headers := &http.Header{}
104	for key, value := range input.Headers {
105		headers.Set(key, value)
106	}
107
108	reqInput := client.RequestInput{
109		Method:  http.MethodHead,
110		Path:    string(absPath),
111		Headers: headers,
112	}
113	_, respHeaders, err := s.client.ExecuteRequestStorage(ctx, reqInput)
114	if err != nil {
115		return nil, errors.Wrap(err, "unable to get info")
116	}
117
118	response := &GetInfoOutput{
119		ContentType: respHeaders.Get("Content-Type"),
120		ContentMD5:  respHeaders.Get("Content-MD5"),
121		ETag:        respHeaders.Get("Etag"),
122	}
123
124	lastModified, err := time.Parse(time.RFC1123, respHeaders.Get("Last-Modified"))
125	if err == nil {
126		response.LastModified = lastModified
127	}
128
129	contentLength, err := strconv.ParseUint(respHeaders.Get("Content-Length"), 10, 64)
130	if err == nil {
131		response.ContentLength = contentLength
132	}
133
134	metadata := map[string]string{}
135	for key, values := range respHeaders {
136		if strings.HasPrefix(key, "m-") {
137			metadata[key] = strings.Join(values, ", ")
138		}
139	}
140	response.Metadata = metadata
141
142	return response, nil
143}
144
145// IsDir is a convenience wrapper around the GetInfo function which takes an
146// ObjectPath and returns a boolean whether or not the object is a directory
147// type in Manta. Returns an error if GetInfo failed upstream for some reason.
148func (s *ObjectsClient) IsDir(ctx context.Context, objectPath string) (bool, error) {
149	info, err := s.GetInfo(ctx, &GetInfoInput{
150		ObjectPath: objectPath,
151	})
152	if err != nil {
153		return false, err
154	}
155	if info != nil {
156		return strings.HasSuffix(info.ContentType, "type=directory"), nil
157	}
158	return false, nil
159}
160
161// GetObjectInput represents parameters to a GetObject operation.
162type GetObjectInput struct {
163	ObjectPath string
164	Headers    map[string]string
165}
166
167// GetObjectOutput contains the outputs for a GetObject operation. It is your
168// responsibility to ensure that the io.ReadCloser ObjectReader is closed.
169type GetObjectOutput struct {
170	ContentLength uint64
171	ContentType   string
172	LastModified  time.Time
173	ContentMD5    string
174	ETag          string
175	Metadata      map[string]string
176	ObjectReader  io.ReadCloser
177}
178
179// Get retrieves an object from the Manta service. If error is nil (i.e. the
180// call returns successfully), it is your responsibility to close the
181// io.ReadCloser named ObjectReader in the operation output.
182func (s *ObjectsClient) Get(ctx context.Context, input *GetObjectInput) (*GetObjectOutput, error) {
183	absPath := absFileInput(s.client.AccountName, input.ObjectPath)
184
185	headers := &http.Header{}
186	for key, value := range input.Headers {
187		headers.Set(key, value)
188	}
189
190	reqInput := client.RequestInput{
191		Method:  http.MethodGet,
192		Path:    string(absPath),
193		Headers: headers,
194	}
195	respBody, respHeaders, err := s.client.ExecuteRequestStorage(ctx, reqInput)
196	if err != nil {
197		return nil, errors.Wrap(err, "unable to get object")
198	}
199
200	response := &GetObjectOutput{
201		ContentType:  respHeaders.Get("Content-Type"),
202		ContentMD5:   respHeaders.Get("Content-MD5"),
203		ETag:         respHeaders.Get("Etag"),
204		ObjectReader: respBody,
205	}
206
207	lastModified, err := time.Parse(time.RFC1123, respHeaders.Get("Last-Modified"))
208	if err == nil {
209		response.LastModified = lastModified
210	}
211
212	contentLength, err := strconv.ParseUint(respHeaders.Get("Content-Length"), 10, 64)
213	if err == nil {
214		response.ContentLength = contentLength
215	}
216
217	metadata := map[string]string{}
218	for key, values := range respHeaders {
219		if strings.HasPrefix(key, "m-") {
220			metadata[key] = strings.Join(values, ", ")
221		}
222	}
223	response.Metadata = metadata
224
225	return response, nil
226}
227
228// DeleteObjectInput represents parameters to a DeleteObject operation.
229type DeleteObjectInput struct {
230	ObjectPath string
231	Headers    map[string]string
232}
233
234// DeleteObject deletes an object.
235func (s *ObjectsClient) Delete(ctx context.Context, input *DeleteObjectInput) error {
236	absPath := absFileInput(s.client.AccountName, input.ObjectPath)
237
238	headers := &http.Header{}
239	for key, value := range input.Headers {
240		headers.Set(key, value)
241	}
242
243	reqInput := client.RequestInput{
244		Method:  http.MethodDelete,
245		Path:    string(absPath),
246		Headers: headers,
247	}
248	respBody, _, err := s.client.ExecuteRequestStorage(ctx, reqInput)
249	if respBody != nil {
250		defer respBody.Close()
251	}
252	if err != nil {
253		return errors.Wrap(err, "unable to delete object")
254	}
255
256	return nil
257}
258
259// GetMpuInput represents parameters to a GetMpu operation
260type GetMpuInput struct {
261	PartsDirectoryPath string
262}
263
264type GetMpuHeaders struct {
265	ContentLength int64  `json:"content-length"`
266	ContentMd5    string `json:"content-md5"`
267}
268
269type GetMpuOutput struct {
270	Id             string        `json:"id"`
271	State          string        `json:"state"`
272	PartsDirectory string        `json:"partsDirectory"`
273	TargetObject   string        `json:"targetObject"`
274	Headers        GetMpuHeaders `json:"headers"`
275	NumCopies      int64         `json:"numCopies"`
276	CreationTimeMs int64         `json:"creationTimeMs"`
277}
278
279func (s *ObjectsClient) GetMultipartUpload(ctx context.Context, input *GetMpuInput) (*GetMpuOutput, error) {
280	return getMpu(*s, ctx, input)
281}
282
283type ListMpuPartsInput struct {
284	Id string
285}
286
287type ListMpuPart struct {
288	ETag       string
289	PartNumber int
290	Size       int64
291}
292
293type ListMpuPartsOutput struct {
294	Parts []ListMpuPart
295}
296
297func (s *ObjectsClient) ListMultipartUploadParts(ctx context.Context, input *ListMpuPartsInput) (*ListMpuPartsOutput, error) {
298	return listMpuParts(*s, ctx, input)
299}
300
301// PutObjectMetadataInput represents parameters to a PutObjectMetadata operation.
302type PutObjectMetadataInput struct {
303	ObjectPath  string
304	ContentType string
305	Metadata    map[string]string
306}
307
308// PutObjectMetadata allows you to overwrite the HTTP headers for an already
309// existing object, without changing the data. Note this is an idempotent "replace"
310// operation, so you must specify the complete set of HTTP headers you want
311// stored on each request.
312//
313// You cannot change "critical" headers:
314// 	- Content-Length
315//	- Content-MD5
316//	- Durability-Level
317func (s *ObjectsClient) PutMetadata(ctx context.Context, input *PutObjectMetadataInput) error {
318	absPath := absFileInput(s.client.AccountName, input.ObjectPath)
319	query := &url.Values{}
320	query.Set("metadata", "true")
321
322	headers := &http.Header{}
323	headers.Set("Content-Type", input.ContentType)
324	for key, value := range input.Metadata {
325		headers.Set(key, value)
326	}
327
328	reqInput := client.RequestInput{
329		Method:  http.MethodPut,
330		Path:    string(absPath),
331		Query:   query,
332		Headers: headers,
333	}
334	respBody, _, err := s.client.ExecuteRequestStorage(ctx, reqInput)
335	if respBody != nil {
336		defer respBody.Close()
337	}
338	if err != nil {
339		return errors.Wrap(err, "unable to put metadata")
340	}
341
342	return nil
343}
344
345// PutObjectInput represents parameters to a PutObject operation.
346type PutObjectInput struct {
347	ObjectPath       string
348	DurabilityLevel  uint64
349	ContentType      string
350	ContentMD5       string
351	IfMatch          string
352	IfModifiedSince  *time.Time
353	ContentLength    uint64
354	MaxContentLength uint64
355	ObjectReader     io.Reader
356	Headers          map[string]string
357	ForceInsert      bool //Force the creation of the directory tree
358}
359
360func (s *ObjectsClient) Put(ctx context.Context, input *PutObjectInput) error {
361	absPath := absFileInput(s.client.AccountName, input.ObjectPath)
362	if input.ForceInsert {
363		absDirName := _AbsCleanPath(path.Dir(string(absPath)))
364		exists, err := checkDirectoryTreeExists(*s, ctx, absDirName)
365		if err != nil {
366			return err
367		}
368		if !exists {
369			err := createDirectory(*s, ctx, absDirName)
370			if err != nil {
371				return err
372			}
373			return putObject(*s, ctx, input, absPath)
374		}
375	}
376
377	return putObject(*s, ctx, input, absPath)
378}
379
380// UploadPartInput represents parameters to a UploadPart operation.
381type UploadPartInput struct {
382	Id           string
383	PartNum      uint64
384	ContentMD5   string
385	Headers      map[string]string
386	ObjectReader io.Reader
387}
388
389// UploadPartOutput represents the response from a
390type UploadPartOutput struct {
391	Part string `json:"part"`
392}
393
394func (s *ObjectsClient) UploadPart(ctx context.Context, input *UploadPartInput) (*UploadPartOutput, error) {
395	return uploadPart(*s, ctx, input)
396}
397
398// _AbsCleanPath is an internal type that means the input has been
399// path.Clean()'ed and is an absolute path.
400type _AbsCleanPath string
401
402func absFileInput(accountName, objPath string) _AbsCleanPath {
403	cleanInput := path.Clean(objPath)
404	if strings.HasPrefix(cleanInput, path.Join("/", accountName, "/")) {
405		return _AbsCleanPath(cleanInput)
406	}
407
408	cleanAbs := path.Clean(path.Join("/", accountName, objPath))
409	return _AbsCleanPath(cleanAbs)
410}
411
412func putObject(c ObjectsClient, ctx context.Context, input *PutObjectInput, absPath _AbsCleanPath) error {
413	if input.MaxContentLength != 0 && input.ContentLength != 0 {
414		return errors.New("ContentLength and MaxContentLength may not both be set to non-zero values.")
415	}
416
417	headers := &http.Header{}
418	for key, value := range input.Headers {
419		headers.Set(key, value)
420	}
421	if input.DurabilityLevel != 0 {
422		headers.Set("Durability-Level", strconv.FormatUint(input.DurabilityLevel, 10))
423	}
424	if input.ContentType != "" {
425		headers.Set("Content-Type", input.ContentType)
426	}
427	if input.ContentMD5 != "" {
428		headers.Set("Content-MD$", input.ContentMD5)
429	}
430	if input.IfMatch != "" {
431		headers.Set("If-Match", input.IfMatch)
432	}
433	if input.IfModifiedSince != nil {
434		headers.Set("If-Modified-Since", input.IfModifiedSince.Format(time.RFC1123))
435	}
436	if input.ContentLength != 0 {
437		headers.Set("Content-Length", strconv.FormatUint(input.ContentLength, 10))
438	}
439	if input.MaxContentLength != 0 {
440		headers.Set("Max-Content-Length", strconv.FormatUint(input.MaxContentLength, 10))
441	}
442
443	reqInput := client.RequestNoEncodeInput{
444		Method:  http.MethodPut,
445		Path:    string(absPath),
446		Headers: headers,
447		Body:    input.ObjectReader,
448	}
449	respBody, _, err := c.client.ExecuteRequestNoEncode(ctx, reqInput)
450	if respBody != nil {
451		defer respBody.Close()
452	}
453	if err != nil {
454		return errors.Wrap(err, "unable to put object")
455	}
456
457	return nil
458}
459
460func createDirectory(c ObjectsClient, ctx context.Context, absPath _AbsCleanPath) error {
461	dirClient := &DirectoryClient{
462		client: c.client,
463	}
464
465	// An abspath starts w/ a leading "/" which gets added to the slice as an
466	// empty string. Start all array math at 1.
467	parts := strings.Split(string(absPath), "/")
468	if len(parts) < 2 {
469		return errors.New("no path components to create directory")
470	}
471
472	folderPath := parts[1]
473	// Don't attempt to create a manta account as a directory
474	for i := 2; i < len(parts); i++ {
475		part := parts[i]
476		folderPath = path.Clean(path.Join("/", folderPath, part))
477		err := dirClient.Put(ctx, &PutDirectoryInput{
478			DirectoryName: folderPath,
479		})
480		if err != nil {
481			return err
482		}
483	}
484
485	return nil
486}
487
488func abortMpu(c ObjectsClient, ctx context.Context, input *AbortMpuInput) error {
489	reqInput := client.RequestInput{
490		Method:  http.MethodPost,
491		Path:    input.PartsDirectoryPath + "/abort",
492		Headers: &http.Header{},
493		Body:    nil,
494	}
495	respBody, _, err := c.client.ExecuteRequestStorage(ctx, reqInput)
496	if err != nil {
497		return errors.Wrap(err, "unable to abort mpu")
498	}
499
500	if respBody != nil {
501		defer respBody.Close()
502	}
503
504	return nil
505}
506
507func commitMpu(c ObjectsClient, ctx context.Context, input *CommitMpuInput) error {
508	headers := &http.Header{}
509	for key, value := range input.Headers {
510		headers.Set(key, value)
511	}
512
513	// The mpu directory prefix length is derived from the final character
514	// in the mpu identifier which we'll call P. The mpu prefix itself is
515	// the first P characters of the mpu identifier. In order to derive the
516	// correct directory structure we need to parse this information from
517	// the mpu identifier
518	id := input.Id
519	idLength := len(id)
520	prefixLen, err := strconv.Atoi(id[idLength-1 : idLength])
521	if err != nil {
522		return errors.Wrap(err, "unable to commit mpu due to invalid mpu prefix length")
523	}
524	prefix := id[:prefixLen]
525	partPath := "/" + c.client.AccountName + "/uploads/" + prefix + "/" + input.Id + "/commit"
526
527	reqInput := client.RequestInput{
528		Method:  http.MethodPost,
529		Path:    partPath,
530		Headers: headers,
531		Body:    input.Body,
532	}
533	respBody, _, err := c.client.ExecuteRequestStorage(ctx, reqInput)
534	if err != nil {
535		return errors.Wrap(err, "unable to commit mpu")
536	}
537
538	if respBody != nil {
539		defer respBody.Close()
540	}
541
542	return nil
543}
544
545func createMpu(c ObjectsClient, ctx context.Context, input *CreateMpuInput) (*CreateMpuOutput, error) {
546	absPath := absFileInput(c.client.AccountName, input.Body.ObjectPath)
547
548	// Because some clients will be treating Manta like S3, they will
549	// include slashes in object names which we'll need to convert to
550	// directories
551	if input.ForceInsert {
552		absDirName := _AbsCleanPath(path.Dir(string(absPath)))
553		exists, _ := checkDirectoryTreeExists(c, ctx, absDirName)
554		if !exists {
555			err := createDirectory(c, ctx, absDirName)
556			if err != nil {
557				return nil, errors.Wrap(err, "unable to create directory for create mpu operation")
558			}
559		}
560	}
561	headers := &http.Header{}
562	for key, value := range input.Body.Headers {
563		headers.Set(key, value)
564	}
565	if input.DurabilityLevel != 0 {
566		headers.Set("Durability-Level", strconv.FormatUint(input.DurabilityLevel, 10))
567	}
568	if input.ContentLength != 0 {
569		headers.Set("Content-Length", strconv.FormatUint(input.ContentLength, 10))
570	}
571	if input.ContentMD5 != "" {
572		headers.Set("Content-MD5", input.ContentMD5)
573	}
574
575	input.Body.ObjectPath = string(absPath)
576	reqInput := client.RequestInput{
577		Method:  http.MethodPost,
578		Path:    "/" + c.client.AccountName + "/uploads",
579		Headers: headers,
580		Body:    input.Body,
581	}
582	respBody, _, err := c.client.ExecuteRequestStorage(ctx, reqInput)
583	if err != nil {
584		return nil, errors.Wrap(err, "unable to create mpu")
585	}
586	if respBody != nil {
587		defer respBody.Close()
588	}
589
590	response := &CreateMpuOutput{}
591	decoder := json.NewDecoder(respBody)
592	if err = decoder.Decode(&response); err != nil {
593		return nil, errors.Wrap(err, "unable to decode create mpu response")
594	}
595
596	return response, nil
597}
598
599func getMpu(c ObjectsClient, ctx context.Context, input *GetMpuInput) (*GetMpuOutput, error) {
600	headers := &http.Header{}
601
602	reqInput := client.RequestInput{
603		Method:  http.MethodGet,
604		Path:    input.PartsDirectoryPath + "/state",
605		Headers: headers,
606	}
607	respBody, _, err := c.client.ExecuteRequestStorage(ctx, reqInput)
608	if err != nil {
609		return nil, errors.Wrap(err, "unable to get mpu")
610	}
611
612	response := &GetMpuOutput{}
613	decoder := json.NewDecoder(respBody)
614	if err = decoder.Decode(&response); err != nil {
615		return nil, errors.Wrap(err, "unable to decode get mpu response")
616	}
617
618	return response, nil
619}
620
621func listMpuParts(c ObjectsClient, ctx context.Context, input *ListMpuPartsInput) (*ListMpuPartsOutput, error) {
622	id := input.Id
623	idLength := len(id)
624	prefixLen, err := strconv.Atoi(id[idLength-1 : idLength])
625	if err != nil {
626		return nil, errors.Wrap(err, "unable to upload part")
627	}
628	prefix := id[:prefixLen]
629	partPath := "/" + c.client.AccountName + "/uploads/" + prefix + "/" + input.Id + "/"
630	listDirInput := ListDirectoryInput{
631		DirectoryName: partPath,
632	}
633
634	dirClient := &DirectoryClient{
635		client: c.client,
636	}
637
638	listDirOutput, err := dirClient.List(ctx, &listDirInput)
639	if err != nil {
640		return nil, errors.Wrap(err, "unable to list mpu parts")
641	}
642
643	var parts []ListMpuPart
644	for num, part := range listDirOutput.Entries {
645		parts = append(parts, ListMpuPart{
646			ETag:       part.ETag,
647			PartNumber: num,
648			Size:       int64(part.Size),
649		})
650	}
651
652	listMpuPartsOutput := &ListMpuPartsOutput{
653		Parts: parts,
654	}
655
656	return listMpuPartsOutput, nil
657}
658
659func uploadPart(c ObjectsClient, ctx context.Context, input *UploadPartInput) (*UploadPartOutput, error) {
660	headers := &http.Header{}
661	for key, value := range input.Headers {
662		headers.Set(key, value)
663	}
664
665	if input.ContentMD5 != "" {
666		headers.Set("Content-MD5", input.ContentMD5)
667	}
668
669	// The mpu directory prefix length is derived from the final character
670	// in the mpu identifier which we'll call P. The mpu prefix itself is
671	// the first P characters of the mpu identifier. In order to derive the
672	// correct directory structure we need to parse this information from
673	// the mpu identifier
674	id := input.Id
675	idLength := len(id)
676	partNum := strconv.FormatUint(input.PartNum, 10)
677	prefixLen, err := strconv.Atoi(id[idLength-1 : idLength])
678	if err != nil {
679		return nil, errors.Wrap(err, "unable to upload part due to invalid mpu prefix length")
680	}
681	prefix := id[:prefixLen]
682	partPath := "/" + c.client.AccountName + "/uploads/" + prefix + "/" + input.Id + "/" + partNum
683
684	reqInput := client.RequestNoEncodeInput{
685		Method:  http.MethodPut,
686		Path:    partPath,
687		Headers: headers,
688		Body:    input.ObjectReader,
689	}
690	respBody, respHeader, err := c.client.ExecuteRequestNoEncode(ctx, reqInput)
691	if respBody != nil {
692		defer respBody.Close()
693	}
694	if err != nil {
695		return nil, errors.Wrap(err, "unable to upload part")
696	}
697
698	uploadPartOutput := &UploadPartOutput{
699		Part: respHeader.Get("Etag"),
700	}
701	return uploadPartOutput, nil
702}
703
704func checkDirectoryTreeExists(c ObjectsClient, ctx context.Context, absPath _AbsCleanPath) (bool, error) {
705	exists, err := c.IsDir(ctx, string(absPath))
706	if err != nil {
707		if tt.IsResourceNotFoundError(err) || tt.IsStatusNotFoundCode(err) {
708			return false, nil
709		}
710		return false, err
711	}
712	if exists {
713		return true, nil
714	}
715
716	return false, nil
717}
718