1package manta
2
3import (
4	"bytes"
5	"context"
6	"fmt"
7	"io"
8	"os"
9	"path"
10	"sort"
11	"strconv"
12	"strings"
13	"time"
14
15	metrics "github.com/armon/go-metrics"
16	"github.com/hashicorp/errwrap"
17	log "github.com/hashicorp/go-hclog"
18	"github.com/hashicorp/vault/sdk/physical"
19	triton "github.com/joyent/triton-go"
20	"github.com/joyent/triton-go/authentication"
21	"github.com/joyent/triton-go/errors"
22	"github.com/joyent/triton-go/storage"
23)
24
25const mantaDefaultRootStore = "/stor"
26
27type MantaBackend struct {
28	logger     log.Logger
29	permitPool *physical.PermitPool
30	client     *storage.StorageClient
31	directory  string
32}
33
34func NewMantaBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
35	user := os.Getenv("MANTA_USER")
36	if user == "" {
37		user = conf["user"]
38	}
39
40	keyId := os.Getenv("MANTA_KEY_ID")
41	if keyId == "" {
42		keyId = conf["key_id"]
43	}
44
45	url := os.Getenv("MANTA_URL")
46	if url == "" {
47		url = conf["url"]
48	} else {
49		url = "https://us-east.manta.joyent.com"
50	}
51
52	subuser := os.Getenv("MANTA_SUBUSER")
53	if subuser == "" {
54		if confUser, ok := conf["subuser"]; ok {
55			subuser = confUser
56		}
57	}
58
59	input := authentication.SSHAgentSignerInput{
60		KeyID:       keyId,
61		AccountName: user,
62		Username:    subuser,
63	}
64	signer, err := authentication.NewSSHAgentSigner(input)
65	if err != nil {
66		return nil, errwrap.Wrapf("Error Creating SSH Agent Signer: {{err}}", err)
67	}
68
69	maxParStr, ok := conf["max_parallel"]
70	var maxParInt int
71	if ok {
72		maxParInt, err = strconv.Atoi(maxParStr)
73		if err != nil {
74			return nil, errwrap.Wrapf("failed parsing max_parallel parameter: {{err}}", err)
75		}
76		if logger.IsDebug() {
77			logger.Debug("max_parallel set", "max_parallel", maxParInt)
78		}
79	}
80
81	config := &triton.ClientConfig{
82		MantaURL:    url,
83		AccountName: user,
84		Signers:     []authentication.Signer{signer},
85	}
86
87	client, err := storage.NewClient(config)
88	if err != nil {
89		return nil, errwrap.Wrapf("failed initialising Storage client: {{err}}", err)
90	}
91
92	return &MantaBackend{
93		client:     client,
94		directory:  conf["directory"],
95		logger:     logger,
96		permitPool: physical.NewPermitPool(maxParInt),
97	}, nil
98}
99
100// Put is used to insert or update an entry
101func (m *MantaBackend) Put(ctx context.Context, entry *physical.Entry) error {
102	defer metrics.MeasureSince([]string{"manta", "put"}, time.Now())
103
104	m.permitPool.Acquire()
105	defer m.permitPool.Release()
106
107	r := bytes.NewReader(entry.Value)
108	r.Seek(0, 0)
109
110	return m.client.Objects().Put(ctx, &storage.PutObjectInput{
111		ObjectPath:    path.Join(mantaDefaultRootStore, m.directory, entry.Key, ".vault_value"),
112		ObjectReader:  r,
113		ContentLength: uint64(len(entry.Value)),
114		ForceInsert:   true,
115	})
116}
117
118// Get is used to fetch an entry
119func (m *MantaBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
120	defer metrics.MeasureSince([]string{"manta", "get"}, time.Now())
121
122	m.permitPool.Acquire()
123	defer m.permitPool.Release()
124
125	output, err := m.client.Objects().Get(ctx, &storage.GetObjectInput{
126		ObjectPath: path.Join(mantaDefaultRootStore, m.directory, key, ".vault_value"),
127	})
128	if err != nil {
129		if strings.Contains(err.Error(), "ResourceNotFound") {
130			return nil, nil
131		}
132		return nil, err
133	}
134
135	defer output.ObjectReader.Close()
136
137	data := make([]byte, output.ContentLength)
138	_, err = io.ReadFull(output.ObjectReader, data)
139	if err != nil {
140		return nil, err
141	}
142
143	ent := &physical.Entry{
144		Key:   key,
145		Value: data,
146	}
147
148	return ent, nil
149}
150
151// Delete is used to permanently delete an entry
152func (m *MantaBackend) Delete(ctx context.Context, key string) error {
153	defer metrics.MeasureSince([]string{"manta", "delete"}, time.Now())
154
155	m.permitPool.Acquire()
156	defer m.permitPool.Release()
157
158	if strings.HasSuffix(key, "/") {
159		err := m.client.Dir().Delete(ctx, &storage.DeleteDirectoryInput{
160			DirectoryName: path.Join(mantaDefaultRootStore, m.directory, key),
161			ForceDelete:   true,
162		})
163		if err != nil {
164			return err
165		}
166	} else {
167		err := m.client.Objects().Delete(ctx, &storage.DeleteObjectInput{
168			ObjectPath: path.Join(mantaDefaultRootStore, m.directory, key, ".vault_value"),
169		})
170		if err != nil {
171			if errors.IsResourceNotFound(err) {
172				return nil
173			}
174			return err
175		}
176
177		return tryDeleteDirectory(ctx, m, path.Join(mantaDefaultRootStore, m.directory, key))
178	}
179
180	return nil
181}
182
183func tryDeleteDirectory(ctx context.Context, m *MantaBackend, directoryPath string) error {
184	objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{
185		DirectoryName: directoryPath,
186	})
187	if err != nil {
188		if errors.IsResourceNotFound(err) {
189			return nil
190		}
191		return err
192	}
193	if objs != nil && len(objs.Entries) == 0 {
194		err := m.client.Dir().Delete(ctx, &storage.DeleteDirectoryInput{
195			DirectoryName: directoryPath,
196		})
197		if err != nil {
198			return err
199		}
200
201		return tryDeleteDirectory(ctx, m, path.Dir(directoryPath))
202	}
203	return nil
204}
205
206// List is used to list all the keys under a given
207// prefix, up to the next prefix.
208func (m *MantaBackend) List(ctx context.Context, prefix string) ([]string, error) {
209	defer metrics.MeasureSince([]string{"manta", "list"}, time.Now())
210
211	m.permitPool.Acquire()
212	defer m.permitPool.Release()
213
214	objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{
215		DirectoryName: path.Join(mantaDefaultRootStore, m.directory, prefix),
216	})
217	if err != nil {
218		if errors.IsResourceNotFound(err) {
219			return []string{}, nil
220		}
221		return nil, err
222	}
223
224	keys := []string{}
225	for _, obj := range objs.Entries {
226		if obj.Type == "directory" {
227			objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{
228				DirectoryName: path.Join(mantaDefaultRootStore, m.directory, prefix, obj.Name),
229			})
230			if err != nil {
231				if !errors.IsResourceNotFound(err) {
232					return nil, err
233				}
234			}
235
236			//We need to check to see if there is something more than just the `value` file
237			//if the length of the children is:
238			// > 1 and includes the value `index` then we need to add foo and foo/
239			// = 1 and the value is `index` then we need to add foo
240			// = 1 and the value is not `index` then we need to add foo/
241			if len(objs.Entries) == 1 {
242				if objs.Entries[0].Name != ".vault_value" {
243					keys = append(keys, fmt.Sprintf("%s/", obj.Name))
244				} else {
245					keys = append(keys, obj.Name)
246				}
247			} else if len(objs.Entries) > 1 {
248				for _, childObj := range objs.Entries {
249					if childObj.Name == ".vault_value" {
250						keys = append(keys, obj.Name)
251					} else {
252						keys = append(keys, fmt.Sprintf("%s/", obj.Name))
253					}
254				}
255			} else {
256				keys = append(keys, obj.Name)
257			}
258		}
259	}
260
261	sort.Strings(keys)
262
263	return keys, nil
264}
265