1package manta 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "path" 10 "sort" 11 "strconv" 12 "strings" 13 "time" 14 15 metrics "github.com/armon/go-metrics" 16 "github.com/hashicorp/errwrap" 17 log "github.com/hashicorp/go-hclog" 18 "github.com/hashicorp/vault/sdk/physical" 19 triton "github.com/joyent/triton-go" 20 "github.com/joyent/triton-go/authentication" 21 "github.com/joyent/triton-go/errors" 22 "github.com/joyent/triton-go/storage" 23) 24 25const mantaDefaultRootStore = "/stor" 26 27type MantaBackend struct { 28 logger log.Logger 29 permitPool *physical.PermitPool 30 client *storage.StorageClient 31 directory string 32} 33 34func NewMantaBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) { 35 user := os.Getenv("MANTA_USER") 36 if user == "" { 37 user = conf["user"] 38 } 39 40 keyId := os.Getenv("MANTA_KEY_ID") 41 if keyId == "" { 42 keyId = conf["key_id"] 43 } 44 45 url := os.Getenv("MANTA_URL") 46 if url == "" { 47 url = conf["url"] 48 } else { 49 url = "https://us-east.manta.joyent.com" 50 } 51 52 subuser := os.Getenv("MANTA_SUBUSER") 53 if subuser == "" { 54 if confUser, ok := conf["subuser"]; ok { 55 subuser = confUser 56 } 57 } 58 59 input := authentication.SSHAgentSignerInput{ 60 KeyID: keyId, 61 AccountName: user, 62 Username: subuser, 63 } 64 signer, err := authentication.NewSSHAgentSigner(input) 65 if err != nil { 66 return nil, errwrap.Wrapf("Error Creating SSH Agent Signer: {{err}}", err) 67 } 68 69 maxParStr, ok := conf["max_parallel"] 70 var maxParInt int 71 if ok { 72 maxParInt, err = strconv.Atoi(maxParStr) 73 if err != nil { 74 return nil, errwrap.Wrapf("failed parsing max_parallel parameter: {{err}}", err) 75 } 76 if logger.IsDebug() { 77 logger.Debug("max_parallel set", "max_parallel", maxParInt) 78 } 79 } 80 81 config := &triton.ClientConfig{ 82 MantaURL: url, 83 AccountName: user, 84 Signers: []authentication.Signer{signer}, 85 } 86 87 client, err := storage.NewClient(config) 88 if err != nil { 89 return nil, errwrap.Wrapf("failed initialising Storage client: {{err}}", err) 90 } 91 92 return &MantaBackend{ 93 client: client, 94 directory: conf["directory"], 95 logger: logger, 96 permitPool: physical.NewPermitPool(maxParInt), 97 }, nil 98} 99 100// Put is used to insert or update an entry 101func (m *MantaBackend) Put(ctx context.Context, entry *physical.Entry) error { 102 defer metrics.MeasureSince([]string{"manta", "put"}, time.Now()) 103 104 m.permitPool.Acquire() 105 defer m.permitPool.Release() 106 107 r := bytes.NewReader(entry.Value) 108 r.Seek(0, 0) 109 110 return m.client.Objects().Put(ctx, &storage.PutObjectInput{ 111 ObjectPath: path.Join(mantaDefaultRootStore, m.directory, entry.Key, ".vault_value"), 112 ObjectReader: r, 113 ContentLength: uint64(len(entry.Value)), 114 ForceInsert: true, 115 }) 116} 117 118// Get is used to fetch an entry 119func (m *MantaBackend) Get(ctx context.Context, key string) (*physical.Entry, error) { 120 defer metrics.MeasureSince([]string{"manta", "get"}, time.Now()) 121 122 m.permitPool.Acquire() 123 defer m.permitPool.Release() 124 125 output, err := m.client.Objects().Get(ctx, &storage.GetObjectInput{ 126 ObjectPath: path.Join(mantaDefaultRootStore, m.directory, key, ".vault_value"), 127 }) 128 if err != nil { 129 if strings.Contains(err.Error(), "ResourceNotFound") { 130 return nil, nil 131 } 132 return nil, err 133 } 134 135 defer output.ObjectReader.Close() 136 137 data := make([]byte, output.ContentLength) 138 _, err = io.ReadFull(output.ObjectReader, data) 139 if err != nil { 140 return nil, err 141 } 142 143 ent := &physical.Entry{ 144 Key: key, 145 Value: data, 146 } 147 148 return ent, nil 149} 150 151// Delete is used to permanently delete an entry 152func (m *MantaBackend) Delete(ctx context.Context, key string) error { 153 defer metrics.MeasureSince([]string{"manta", "delete"}, time.Now()) 154 155 m.permitPool.Acquire() 156 defer m.permitPool.Release() 157 158 if strings.HasSuffix(key, "/") { 159 err := m.client.Dir().Delete(ctx, &storage.DeleteDirectoryInput{ 160 DirectoryName: path.Join(mantaDefaultRootStore, m.directory, key), 161 ForceDelete: true, 162 }) 163 if err != nil { 164 return err 165 } 166 } else { 167 err := m.client.Objects().Delete(ctx, &storage.DeleteObjectInput{ 168 ObjectPath: path.Join(mantaDefaultRootStore, m.directory, key, ".vault_value"), 169 }) 170 if err != nil { 171 if errors.IsResourceNotFound(err) { 172 return nil 173 } 174 return err 175 } 176 177 return tryDeleteDirectory(ctx, m, path.Join(mantaDefaultRootStore, m.directory, key)) 178 } 179 180 return nil 181} 182 183func tryDeleteDirectory(ctx context.Context, m *MantaBackend, directoryPath string) error { 184 objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{ 185 DirectoryName: directoryPath, 186 }) 187 if err != nil { 188 if errors.IsResourceNotFound(err) { 189 return nil 190 } 191 return err 192 } 193 if objs != nil && len(objs.Entries) == 0 { 194 err := m.client.Dir().Delete(ctx, &storage.DeleteDirectoryInput{ 195 DirectoryName: directoryPath, 196 }) 197 if err != nil { 198 return err 199 } 200 201 return tryDeleteDirectory(ctx, m, path.Dir(directoryPath)) 202 } 203 return nil 204} 205 206// List is used to list all the keys under a given 207// prefix, up to the next prefix. 208func (m *MantaBackend) List(ctx context.Context, prefix string) ([]string, error) { 209 defer metrics.MeasureSince([]string{"manta", "list"}, time.Now()) 210 211 m.permitPool.Acquire() 212 defer m.permitPool.Release() 213 214 objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{ 215 DirectoryName: path.Join(mantaDefaultRootStore, m.directory, prefix), 216 }) 217 if err != nil { 218 if errors.IsResourceNotFound(err) { 219 return []string{}, nil 220 } 221 return nil, err 222 } 223 224 keys := []string{} 225 for _, obj := range objs.Entries { 226 if obj.Type == "directory" { 227 objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{ 228 DirectoryName: path.Join(mantaDefaultRootStore, m.directory, prefix, obj.Name), 229 }) 230 if err != nil { 231 if !errors.IsResourceNotFound(err) { 232 return nil, err 233 } 234 } 235 236 //We need to check to see if there is something more than just the `value` file 237 //if the length of the children is: 238 // > 1 and includes the value `index` then we need to add foo and foo/ 239 // = 1 and the value is `index` then we need to add foo 240 // = 1 and the value is not `index` then we need to add foo/ 241 if len(objs.Entries) == 1 { 242 if objs.Entries[0].Name != ".vault_value" { 243 keys = append(keys, fmt.Sprintf("%s/", obj.Name)) 244 } else { 245 keys = append(keys, obj.Name) 246 } 247 } else if len(objs.Entries) > 1 { 248 for _, childObj := range objs.Entries { 249 if childObj.Name == ".vault_value" { 250 keys = append(keys, obj.Name) 251 } else { 252 keys = append(keys, fmt.Sprintf("%s/", obj.Name)) 253 } 254 } 255 } else { 256 keys = append(keys, obj.Name) 257 } 258 } 259 } 260 261 sort.Strings(keys) 262 263 return keys, nil 264} 265