1package filesystem 2 3import ( 4 "bufio" 5 "bytes" 6 "context" 7 "fmt" 8 "io" 9 "io/ioutil" 10 "os" 11 "path" 12 "time" 13 14 storagedriver "github.com/docker/distribution/registry/storage/driver" 15 "github.com/docker/distribution/registry/storage/driver/base" 16 "github.com/docker/distribution/registry/storage/driver/factory" 17) 18 19const ( 20 driverName = "filesystem" 21 defaultRootDirectory = "/var/lib/registry" 22 defaultMaxThreads = uint64(100) 23 24 // minThreads is the minimum value for the maxthreads configuration 25 // parameter. If the driver's parameters are less than this we set 26 // the parameters to minThreads 27 minThreads = uint64(25) 28) 29 30// DriverParameters represents all configuration options available for the 31// filesystem driver 32type DriverParameters struct { 33 RootDirectory string 34 MaxThreads uint64 35} 36 37func init() { 38 factory.Register(driverName, &filesystemDriverFactory{}) 39} 40 41// filesystemDriverFactory implements the factory.StorageDriverFactory interface 42type filesystemDriverFactory struct{} 43 44func (factory *filesystemDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { 45 return FromParameters(parameters) 46} 47 48type driver struct { 49 rootDirectory string 50} 51 52type baseEmbed struct { 53 base.Base 54} 55 56// Driver is a storagedriver.StorageDriver implementation backed by a local 57// filesystem. All provided paths will be subpaths of the RootDirectory. 58type Driver struct { 59 baseEmbed 60} 61 62// FromParameters constructs a new Driver with a given parameters map 63// Optional Parameters: 64// - rootdirectory 65// - maxthreads 66func FromParameters(parameters map[string]interface{}) (*Driver, error) { 67 params, err := fromParametersImpl(parameters) 68 if err != nil || params == nil { 69 return nil, err 70 } 71 return New(*params), nil 72} 73 74func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) { 75 var ( 76 err error 77 maxThreads = defaultMaxThreads 78 rootDirectory = defaultRootDirectory 79 ) 80 81 if parameters != nil { 82 if rootDir, ok := parameters["rootdirectory"]; ok { 83 rootDirectory = fmt.Sprint(rootDir) 84 } 85 86 maxThreads, err = base.GetLimitFromParameter(parameters["maxthreads"], minThreads, defaultMaxThreads) 87 if err != nil { 88 return nil, fmt.Errorf("maxthreads config error: %s", err.Error()) 89 } 90 } 91 92 params := &DriverParameters{ 93 RootDirectory: rootDirectory, 94 MaxThreads: maxThreads, 95 } 96 return params, nil 97} 98 99// New constructs a new Driver with a given rootDirectory 100func New(params DriverParameters) *Driver { 101 fsDriver := &driver{rootDirectory: params.RootDirectory} 102 103 return &Driver{ 104 baseEmbed: baseEmbed{ 105 Base: base.Base{ 106 StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads), 107 }, 108 }, 109 } 110} 111 112// Implement the storagedriver.StorageDriver interface 113 114func (d *driver) Name() string { 115 return driverName 116} 117 118// GetContent retrieves the content stored at "path" as a []byte. 119func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { 120 rc, err := d.Reader(ctx, path, 0) 121 if err != nil { 122 return nil, err 123 } 124 defer rc.Close() 125 126 p, err := ioutil.ReadAll(rc) 127 if err != nil { 128 return nil, err 129 } 130 131 return p, nil 132} 133 134// PutContent stores the []byte content at a location designated by "path". 135func (d *driver) PutContent(ctx context.Context, subPath string, contents []byte) error { 136 writer, err := d.Writer(ctx, subPath, false) 137 if err != nil { 138 return err 139 } 140 defer writer.Close() 141 _, err = io.Copy(writer, bytes.NewReader(contents)) 142 if err != nil { 143 writer.Cancel() 144 return err 145 } 146 return writer.Commit() 147} 148 149// Reader retrieves an io.ReadCloser for the content stored at "path" with a 150// given byte offset. 151func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { 152 file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644) 153 if err != nil { 154 if os.IsNotExist(err) { 155 return nil, storagedriver.PathNotFoundError{Path: path} 156 } 157 158 return nil, err 159 } 160 161 seekPos, err := file.Seek(offset, io.SeekStart) 162 if err != nil { 163 file.Close() 164 return nil, err 165 } else if seekPos < offset { 166 file.Close() 167 return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} 168 } 169 170 return file, nil 171} 172 173func (d *driver) Writer(ctx context.Context, subPath string, append bool) (storagedriver.FileWriter, error) { 174 fullPath := d.fullPath(subPath) 175 parentDir := path.Dir(fullPath) 176 if err := os.MkdirAll(parentDir, 0777); err != nil { 177 return nil, err 178 } 179 180 fp, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0666) 181 if err != nil { 182 return nil, err 183 } 184 185 var offset int64 186 187 if !append { 188 err := fp.Truncate(0) 189 if err != nil { 190 fp.Close() 191 return nil, err 192 } 193 } else { 194 n, err := fp.Seek(0, io.SeekEnd) 195 if err != nil { 196 fp.Close() 197 return nil, err 198 } 199 offset = n 200 } 201 202 return newFileWriter(fp, offset), nil 203} 204 205// Stat retrieves the FileInfo for the given path, including the current size 206// in bytes and the creation time. 207func (d *driver) Stat(ctx context.Context, subPath string) (storagedriver.FileInfo, error) { 208 fullPath := d.fullPath(subPath) 209 210 fi, err := os.Stat(fullPath) 211 if err != nil { 212 if os.IsNotExist(err) { 213 return nil, storagedriver.PathNotFoundError{Path: subPath} 214 } 215 216 return nil, err 217 } 218 219 return fileInfo{ 220 path: subPath, 221 FileInfo: fi, 222 }, nil 223} 224 225// List returns a list of the objects that are direct descendants of the given 226// path. 227func (d *driver) List(ctx context.Context, subPath string) ([]string, error) { 228 fullPath := d.fullPath(subPath) 229 230 dir, err := os.Open(fullPath) 231 if err != nil { 232 if os.IsNotExist(err) { 233 return nil, storagedriver.PathNotFoundError{Path: subPath} 234 } 235 return nil, err 236 } 237 238 defer dir.Close() 239 240 fileNames, err := dir.Readdirnames(0) 241 if err != nil { 242 return nil, err 243 } 244 245 keys := make([]string, 0, len(fileNames)) 246 for _, fileName := range fileNames { 247 keys = append(keys, path.Join(subPath, fileName)) 248 } 249 250 return keys, nil 251} 252 253// Move moves an object stored at sourcePath to destPath, removing the original 254// object. 255func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { 256 source := d.fullPath(sourcePath) 257 dest := d.fullPath(destPath) 258 259 if _, err := os.Stat(source); os.IsNotExist(err) { 260 return storagedriver.PathNotFoundError{Path: sourcePath} 261 } 262 263 if err := os.MkdirAll(path.Dir(dest), 0755); err != nil { 264 return err 265 } 266 267 err := os.Rename(source, dest) 268 return err 269} 270 271// Delete recursively deletes all objects stored at "path" and its subpaths. 272func (d *driver) Delete(ctx context.Context, subPath string) error { 273 fullPath := d.fullPath(subPath) 274 275 _, err := os.Stat(fullPath) 276 if err != nil && !os.IsNotExist(err) { 277 return err 278 } else if err != nil { 279 return storagedriver.PathNotFoundError{Path: subPath} 280 } 281 282 err = os.RemoveAll(fullPath) 283 return err 284} 285 286// URLFor returns a URL which may be used to retrieve the content stored at the given path. 287// May return an UnsupportedMethodErr in certain StorageDriver implementations. 288func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { 289 return "", storagedriver.ErrUnsupportedMethod{} 290} 291 292// Walk traverses a filesystem defined within driver, starting 293// from the given path, calling f on each file 294func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error { 295 return storagedriver.WalkFallback(ctx, d, path, f) 296} 297 298// fullPath returns the absolute path of a key within the Driver's storage. 299func (d *driver) fullPath(subPath string) string { 300 return path.Join(d.rootDirectory, subPath) 301} 302 303type fileInfo struct { 304 os.FileInfo 305 path string 306} 307 308var _ storagedriver.FileInfo = fileInfo{} 309 310// Path provides the full path of the target of this file info. 311func (fi fileInfo) Path() string { 312 return fi.path 313} 314 315// Size returns current length in bytes of the file. The return value can 316// be used to write to the end of the file at path. The value is 317// meaningless if IsDir returns true. 318func (fi fileInfo) Size() int64 { 319 if fi.IsDir() { 320 return 0 321 } 322 323 return fi.FileInfo.Size() 324} 325 326// ModTime returns the modification time for the file. For backends that 327// don't have a modification time, the creation time should be returned. 328func (fi fileInfo) ModTime() time.Time { 329 return fi.FileInfo.ModTime() 330} 331 332// IsDir returns true if the path is a directory. 333func (fi fileInfo) IsDir() bool { 334 return fi.FileInfo.IsDir() 335} 336 337type fileWriter struct { 338 file *os.File 339 size int64 340 bw *bufio.Writer 341 closed bool 342 committed bool 343 cancelled bool 344} 345 346func newFileWriter(file *os.File, size int64) *fileWriter { 347 return &fileWriter{ 348 file: file, 349 size: size, 350 bw: bufio.NewWriter(file), 351 } 352} 353 354func (fw *fileWriter) Write(p []byte) (int, error) { 355 if fw.closed { 356 return 0, fmt.Errorf("already closed") 357 } else if fw.committed { 358 return 0, fmt.Errorf("already committed") 359 } else if fw.cancelled { 360 return 0, fmt.Errorf("already cancelled") 361 } 362 n, err := fw.bw.Write(p) 363 fw.size += int64(n) 364 return n, err 365} 366 367func (fw *fileWriter) Size() int64 { 368 return fw.size 369} 370 371func (fw *fileWriter) Close() error { 372 if fw.closed { 373 return fmt.Errorf("already closed") 374 } 375 376 if err := fw.bw.Flush(); err != nil { 377 return err 378 } 379 380 if err := fw.file.Sync(); err != nil { 381 return err 382 } 383 384 if err := fw.file.Close(); err != nil { 385 return err 386 } 387 fw.closed = true 388 return nil 389} 390 391func (fw *fileWriter) Cancel() error { 392 if fw.closed { 393 return fmt.Errorf("already closed") 394 } 395 396 fw.cancelled = true 397 fw.file.Close() 398 return os.Remove(fw.file.Name()) 399} 400 401func (fw *fileWriter) Commit() error { 402 if fw.closed { 403 return fmt.Errorf("already closed") 404 } else if fw.committed { 405 return fmt.Errorf("already committed") 406 } else if fw.cancelled { 407 return fmt.Errorf("already cancelled") 408 } 409 410 if err := fw.bw.Flush(); err != nil { 411 return err 412 } 413 414 if err := fw.file.Sync(); err != nil { 415 return err 416 } 417 418 fw.committed = true 419 return nil 420} 421