1package manta
2
3import (
4	"bytes"
5	"context"
6	"fmt"
7	"io"
8	"os"
9	"path"
10	"sort"
11	"strconv"
12	"strings"
13	"time"
14
15	metrics "github.com/armon/go-metrics"
16	log "github.com/hashicorp/go-hclog"
17	"github.com/hashicorp/vault/sdk/physical"
18	triton "github.com/joyent/triton-go"
19	"github.com/joyent/triton-go/authentication"
20	"github.com/joyent/triton-go/errors"
21	"github.com/joyent/triton-go/storage"
22)
23
24const mantaDefaultRootStore = "/stor"
25
26type MantaBackend struct {
27	logger     log.Logger
28	permitPool *physical.PermitPool
29	client     *storage.StorageClient
30	directory  string
31}
32
33func NewMantaBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
34	user := os.Getenv("MANTA_USER")
35	if user == "" {
36		user = conf["user"]
37	}
38
39	keyId := os.Getenv("MANTA_KEY_ID")
40	if keyId == "" {
41		keyId = conf["key_id"]
42	}
43
44	url := os.Getenv("MANTA_URL")
45	if url == "" {
46		url = conf["url"]
47	} else {
48		url = "https://us-east.manta.joyent.com"
49	}
50
51	subuser := os.Getenv("MANTA_SUBUSER")
52	if subuser == "" {
53		if confUser, ok := conf["subuser"]; ok {
54			subuser = confUser
55		}
56	}
57
58	input := authentication.SSHAgentSignerInput{
59		KeyID:       keyId,
60		AccountName: user,
61		Username:    subuser,
62	}
63	signer, err := authentication.NewSSHAgentSigner(input)
64	if err != nil {
65		return nil, fmt.Errorf("Error Creating SSH Agent Signer: %w", err)
66	}
67
68	maxParStr, ok := conf["max_parallel"]
69	var maxParInt int
70	if ok {
71		maxParInt, err = strconv.Atoi(maxParStr)
72		if err != nil {
73			return nil, fmt.Errorf("failed parsing max_parallel parameter: %w", err)
74		}
75		if logger.IsDebug() {
76			logger.Debug("max_parallel set", "max_parallel", maxParInt)
77		}
78	}
79
80	config := &triton.ClientConfig{
81		MantaURL:    url,
82		AccountName: user,
83		Signers:     []authentication.Signer{signer},
84	}
85
86	client, err := storage.NewClient(config)
87	if err != nil {
88		return nil, fmt.Errorf("failed initialising Storage client: %w", err)
89	}
90
91	return &MantaBackend{
92		client:     client,
93		directory:  conf["directory"],
94		logger:     logger,
95		permitPool: physical.NewPermitPool(maxParInt),
96	}, nil
97}
98
99// Put is used to insert or update an entry
100func (m *MantaBackend) Put(ctx context.Context, entry *physical.Entry) error {
101	defer metrics.MeasureSince([]string{"manta", "put"}, time.Now())
102
103	m.permitPool.Acquire()
104	defer m.permitPool.Release()
105
106	r := bytes.NewReader(entry.Value)
107	r.Seek(0, 0)
108
109	return m.client.Objects().Put(ctx, &storage.PutObjectInput{
110		ObjectPath:    path.Join(mantaDefaultRootStore, m.directory, entry.Key, ".vault_value"),
111		ObjectReader:  r,
112		ContentLength: uint64(len(entry.Value)),
113		ForceInsert:   true,
114	})
115}
116
117// Get is used to fetch an entry
118func (m *MantaBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
119	defer metrics.MeasureSince([]string{"manta", "get"}, time.Now())
120
121	m.permitPool.Acquire()
122	defer m.permitPool.Release()
123
124	output, err := m.client.Objects().Get(ctx, &storage.GetObjectInput{
125		ObjectPath: path.Join(mantaDefaultRootStore, m.directory, key, ".vault_value"),
126	})
127	if err != nil {
128		if strings.Contains(err.Error(), "ResourceNotFound") {
129			return nil, nil
130		}
131		return nil, err
132	}
133
134	defer output.ObjectReader.Close()
135
136	data := make([]byte, output.ContentLength)
137	_, err = io.ReadFull(output.ObjectReader, data)
138	if err != nil {
139		return nil, err
140	}
141
142	ent := &physical.Entry{
143		Key:   key,
144		Value: data,
145	}
146
147	return ent, nil
148}
149
150// Delete is used to permanently delete an entry
151func (m *MantaBackend) Delete(ctx context.Context, key string) error {
152	defer metrics.MeasureSince([]string{"manta", "delete"}, time.Now())
153
154	m.permitPool.Acquire()
155	defer m.permitPool.Release()
156
157	if strings.HasSuffix(key, "/") {
158		err := m.client.Dir().Delete(ctx, &storage.DeleteDirectoryInput{
159			DirectoryName: path.Join(mantaDefaultRootStore, m.directory, key),
160			ForceDelete:   true,
161		})
162		if err != nil {
163			return err
164		}
165	} else {
166		err := m.client.Objects().Delete(ctx, &storage.DeleteObjectInput{
167			ObjectPath: path.Join(mantaDefaultRootStore, m.directory, key, ".vault_value"),
168		})
169		if err != nil {
170			if errors.IsResourceNotFound(err) {
171				return nil
172			}
173			return err
174		}
175
176		return tryDeleteDirectory(ctx, m, path.Join(mantaDefaultRootStore, m.directory, key))
177	}
178
179	return nil
180}
181
182func tryDeleteDirectory(ctx context.Context, m *MantaBackend, directoryPath string) error {
183	objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{
184		DirectoryName: directoryPath,
185	})
186	if err != nil {
187		if errors.IsResourceNotFound(err) {
188			return nil
189		}
190		return err
191	}
192	if objs != nil && len(objs.Entries) == 0 {
193		err := m.client.Dir().Delete(ctx, &storage.DeleteDirectoryInput{
194			DirectoryName: directoryPath,
195		})
196		if err != nil {
197			return err
198		}
199
200		return tryDeleteDirectory(ctx, m, path.Dir(directoryPath))
201	}
202	return nil
203}
204
205// List is used to list all the keys under a given
206// prefix, up to the next prefix.
207func (m *MantaBackend) List(ctx context.Context, prefix string) ([]string, error) {
208	defer metrics.MeasureSince([]string{"manta", "list"}, time.Now())
209
210	m.permitPool.Acquire()
211	defer m.permitPool.Release()
212
213	objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{
214		DirectoryName: path.Join(mantaDefaultRootStore, m.directory, prefix),
215	})
216	if err != nil {
217		if errors.IsResourceNotFound(err) {
218			return []string{}, nil
219		}
220		return nil, err
221	}
222
223	keys := []string{}
224	for _, obj := range objs.Entries {
225		if obj.Type == "directory" {
226			objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{
227				DirectoryName: path.Join(mantaDefaultRootStore, m.directory, prefix, obj.Name),
228			})
229			if err != nil {
230				if !errors.IsResourceNotFound(err) {
231					return nil, err
232				}
233			}
234
235			// We need to check to see if there is something more than just the `value` file
236			// if the length of the children is:
237			// > 1 and includes the value `index` then we need to add foo and foo/
238			// = 1 and the value is `index` then we need to add foo
239			// = 1 and the value is not `index` then we need to add foo/
240			if len(objs.Entries) == 1 {
241				if objs.Entries[0].Name != ".vault_value" {
242					keys = append(keys, fmt.Sprintf("%s/", obj.Name))
243				} else {
244					keys = append(keys, obj.Name)
245				}
246			} else if len(objs.Entries) > 1 {
247				for _, childObj := range objs.Entries {
248					if childObj.Name == ".vault_value" {
249						keys = append(keys, obj.Name)
250					} else {
251						keys = append(keys, fmt.Sprintf("%s/", obj.Name))
252					}
253				}
254			} else {
255				keys = append(keys, obj.Name)
256			}
257		}
258	}
259
260	sort.Strings(keys)
261
262	return keys, nil
263}
264