1package manta 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "io" 8 "os" 9 "path" 10 "sort" 11 "strconv" 12 "strings" 13 "time" 14 15 metrics "github.com/armon/go-metrics" 16 log "github.com/hashicorp/go-hclog" 17 "github.com/hashicorp/vault/sdk/physical" 18 triton "github.com/joyent/triton-go" 19 "github.com/joyent/triton-go/authentication" 20 "github.com/joyent/triton-go/errors" 21 "github.com/joyent/triton-go/storage" 22) 23 24const mantaDefaultRootStore = "/stor" 25 26type MantaBackend struct { 27 logger log.Logger 28 permitPool *physical.PermitPool 29 client *storage.StorageClient 30 directory string 31} 32 33func NewMantaBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) { 34 user := os.Getenv("MANTA_USER") 35 if user == "" { 36 user = conf["user"] 37 } 38 39 keyId := os.Getenv("MANTA_KEY_ID") 40 if keyId == "" { 41 keyId = conf["key_id"] 42 } 43 44 url := os.Getenv("MANTA_URL") 45 if url == "" { 46 url = conf["url"] 47 } else { 48 url = "https://us-east.manta.joyent.com" 49 } 50 51 subuser := os.Getenv("MANTA_SUBUSER") 52 if subuser == "" { 53 if confUser, ok := conf["subuser"]; ok { 54 subuser = confUser 55 } 56 } 57 58 input := authentication.SSHAgentSignerInput{ 59 KeyID: keyId, 60 AccountName: user, 61 Username: subuser, 62 } 63 signer, err := authentication.NewSSHAgentSigner(input) 64 if err != nil { 65 return nil, fmt.Errorf("Error Creating SSH Agent Signer: %w", err) 66 } 67 68 maxParStr, ok := conf["max_parallel"] 69 var maxParInt int 70 if ok { 71 maxParInt, err = strconv.Atoi(maxParStr) 72 if err != nil { 73 return nil, fmt.Errorf("failed parsing max_parallel parameter: %w", err) 74 } 75 if logger.IsDebug() { 76 logger.Debug("max_parallel set", "max_parallel", maxParInt) 77 } 78 } 79 80 config := &triton.ClientConfig{ 81 MantaURL: url, 82 AccountName: user, 83 Signers: []authentication.Signer{signer}, 84 } 85 86 client, err := storage.NewClient(config) 87 if err != nil { 88 return nil, fmt.Errorf("failed initialising Storage client: %w", err) 89 } 90 91 return &MantaBackend{ 92 client: client, 93 directory: conf["directory"], 94 logger: logger, 95 permitPool: physical.NewPermitPool(maxParInt), 96 }, nil 97} 98 99// Put is used to insert or update an entry 100func (m *MantaBackend) Put(ctx context.Context, entry *physical.Entry) error { 101 defer metrics.MeasureSince([]string{"manta", "put"}, time.Now()) 102 103 m.permitPool.Acquire() 104 defer m.permitPool.Release() 105 106 r := bytes.NewReader(entry.Value) 107 r.Seek(0, 0) 108 109 return m.client.Objects().Put(ctx, &storage.PutObjectInput{ 110 ObjectPath: path.Join(mantaDefaultRootStore, m.directory, entry.Key, ".vault_value"), 111 ObjectReader: r, 112 ContentLength: uint64(len(entry.Value)), 113 ForceInsert: true, 114 }) 115} 116 117// Get is used to fetch an entry 118func (m *MantaBackend) Get(ctx context.Context, key string) (*physical.Entry, error) { 119 defer metrics.MeasureSince([]string{"manta", "get"}, time.Now()) 120 121 m.permitPool.Acquire() 122 defer m.permitPool.Release() 123 124 output, err := m.client.Objects().Get(ctx, &storage.GetObjectInput{ 125 ObjectPath: path.Join(mantaDefaultRootStore, m.directory, key, ".vault_value"), 126 }) 127 if err != nil { 128 if strings.Contains(err.Error(), "ResourceNotFound") { 129 return nil, nil 130 } 131 return nil, err 132 } 133 134 defer output.ObjectReader.Close() 135 136 data := make([]byte, output.ContentLength) 137 _, err = io.ReadFull(output.ObjectReader, data) 138 if err != nil { 139 return nil, err 140 } 141 142 ent := &physical.Entry{ 143 Key: key, 144 Value: data, 145 } 146 147 return ent, nil 148} 149 150// Delete is used to permanently delete an entry 151func (m *MantaBackend) Delete(ctx context.Context, key string) error { 152 defer metrics.MeasureSince([]string{"manta", "delete"}, time.Now()) 153 154 m.permitPool.Acquire() 155 defer m.permitPool.Release() 156 157 if strings.HasSuffix(key, "/") { 158 err := m.client.Dir().Delete(ctx, &storage.DeleteDirectoryInput{ 159 DirectoryName: path.Join(mantaDefaultRootStore, m.directory, key), 160 ForceDelete: true, 161 }) 162 if err != nil { 163 return err 164 } 165 } else { 166 err := m.client.Objects().Delete(ctx, &storage.DeleteObjectInput{ 167 ObjectPath: path.Join(mantaDefaultRootStore, m.directory, key, ".vault_value"), 168 }) 169 if err != nil { 170 if errors.IsResourceNotFound(err) { 171 return nil 172 } 173 return err 174 } 175 176 return tryDeleteDirectory(ctx, m, path.Join(mantaDefaultRootStore, m.directory, key)) 177 } 178 179 return nil 180} 181 182func tryDeleteDirectory(ctx context.Context, m *MantaBackend, directoryPath string) error { 183 objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{ 184 DirectoryName: directoryPath, 185 }) 186 if err != nil { 187 if errors.IsResourceNotFound(err) { 188 return nil 189 } 190 return err 191 } 192 if objs != nil && len(objs.Entries) == 0 { 193 err := m.client.Dir().Delete(ctx, &storage.DeleteDirectoryInput{ 194 DirectoryName: directoryPath, 195 }) 196 if err != nil { 197 return err 198 } 199 200 return tryDeleteDirectory(ctx, m, path.Dir(directoryPath)) 201 } 202 return nil 203} 204 205// List is used to list all the keys under a given 206// prefix, up to the next prefix. 207func (m *MantaBackend) List(ctx context.Context, prefix string) ([]string, error) { 208 defer metrics.MeasureSince([]string{"manta", "list"}, time.Now()) 209 210 m.permitPool.Acquire() 211 defer m.permitPool.Release() 212 213 objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{ 214 DirectoryName: path.Join(mantaDefaultRootStore, m.directory, prefix), 215 }) 216 if err != nil { 217 if errors.IsResourceNotFound(err) { 218 return []string{}, nil 219 } 220 return nil, err 221 } 222 223 keys := []string{} 224 for _, obj := range objs.Entries { 225 if obj.Type == "directory" { 226 objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{ 227 DirectoryName: path.Join(mantaDefaultRootStore, m.directory, prefix, obj.Name), 228 }) 229 if err != nil { 230 if !errors.IsResourceNotFound(err) { 231 return nil, err 232 } 233 } 234 235 // We need to check to see if there is something more than just the `value` file 236 // if the length of the children is: 237 // > 1 and includes the value `index` then we need to add foo and foo/ 238 // = 1 and the value is `index` then we need to add foo 239 // = 1 and the value is not `index` then we need to add foo/ 240 if len(objs.Entries) == 1 { 241 if objs.Entries[0].Name != ".vault_value" { 242 keys = append(keys, fmt.Sprintf("%s/", obj.Name)) 243 } else { 244 keys = append(keys, obj.Name) 245 } 246 } else if len(objs.Entries) > 1 { 247 for _, childObj := range objs.Entries { 248 if childObj.Name == ".vault_value" { 249 keys = append(keys, obj.Name) 250 } else { 251 keys = append(keys, fmt.Sprintf("%s/", obj.Name)) 252 } 253 } 254 } else { 255 keys = append(keys, obj.Name) 256 } 257 } 258 } 259 260 sort.Strings(keys) 261 262 return keys, nil 263} 264