1package filesystem 2 3import ( 4 "bufio" 5 "bytes" 6 "fmt" 7 "io" 8 "io/ioutil" 9 "os" 10 "path" 11 "reflect" 12 "strconv" 13 "time" 14 15 "github.com/docker/distribution/context" 16 storagedriver "github.com/docker/distribution/registry/storage/driver" 17 "github.com/docker/distribution/registry/storage/driver/base" 18 "github.com/docker/distribution/registry/storage/driver/factory" 19) 20 21const ( 22 driverName = "filesystem" 23 defaultRootDirectory = "/var/lib/registry" 24 defaultMaxThreads = uint64(100) 25 26 // minThreads is the minimum value for the maxthreads configuration 27 // parameter. If the driver's parameters are less than this we set 28 // the parameters to minThreads 29 minThreads = uint64(25) 30) 31 32// DriverParameters represents all configuration options available for the 33// filesystem driver 34type DriverParameters struct { 35 RootDirectory string 36 MaxThreads uint64 37} 38 39func init() { 40 factory.Register(driverName, &filesystemDriverFactory{}) 41} 42 43// filesystemDriverFactory implements the factory.StorageDriverFactory interface 44type filesystemDriverFactory struct{} 45 46func (factory *filesystemDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) { 47 return FromParameters(parameters) 48} 49 50type driver struct { 51 rootDirectory string 52} 53 54type baseEmbed struct { 55 base.Base 56} 57 58// Driver is a storagedriver.StorageDriver implementation backed by a local 59// filesystem. All provided paths will be subpaths of the RootDirectory. 60type Driver struct { 61 baseEmbed 62} 63 64// FromParameters constructs a new Driver with a given parameters map 65// Optional Parameters: 66// - rootdirectory 67// - maxthreads 68func FromParameters(parameters map[string]interface{}) (*Driver, error) { 69 params, err := fromParametersImpl(parameters) 70 if err != nil || params == nil { 71 return nil, err 72 } 73 return New(*params), nil 74} 75 76func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) { 77 var ( 78 err error 79 maxThreads = defaultMaxThreads 80 rootDirectory = defaultRootDirectory 81 ) 82 83 if parameters != nil { 84 if rootDir, ok := parameters["rootdirectory"]; ok { 85 rootDirectory = fmt.Sprint(rootDir) 86 } 87 88 // Get maximum number of threads for blocking filesystem operations, 89 // if specified 90 threads := parameters["maxthreads"] 91 switch v := threads.(type) { 92 case string: 93 if maxThreads, err = strconv.ParseUint(v, 0, 64); err != nil { 94 return nil, fmt.Errorf("maxthreads parameter must be an integer, %v invalid", threads) 95 } 96 case uint64: 97 maxThreads = v 98 case int, int32, int64: 99 val := reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int() 100 // If threads is negative casting to uint64 will wrap around and 101 // give you the hugest thread limit ever. Let's be sensible, here 102 if val > 0 { 103 maxThreads = uint64(val) 104 } 105 case uint, uint32: 106 maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint() 107 case nil: 108 // do nothing 109 default: 110 return nil, fmt.Errorf("invalid value for maxthreads: %#v", threads) 111 } 112 113 if maxThreads < minThreads { 114 maxThreads = minThreads 115 } 116 } 117 118 params := &DriverParameters{ 119 RootDirectory: rootDirectory, 120 MaxThreads: maxThreads, 121 } 122 return params, nil 123} 124 125// New constructs a new Driver with a given rootDirectory 126func New(params DriverParameters) *Driver { 127 fsDriver := &driver{rootDirectory: params.RootDirectory} 128 129 return &Driver{ 130 baseEmbed: baseEmbed{ 131 Base: base.Base{ 132 StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads), 133 }, 134 }, 135 } 136} 137 138// Implement the storagedriver.StorageDriver interface 139 140func (d *driver) Name() string { 141 return driverName 142} 143 144// GetContent retrieves the content stored at "path" as a []byte. 145func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { 146 rc, err := d.Reader(ctx, path, 0) 147 if err != nil { 148 return nil, err 149 } 150 defer rc.Close() 151 152 p, err := ioutil.ReadAll(rc) 153 if err != nil { 154 return nil, err 155 } 156 157 return p, nil 158} 159 160// PutContent stores the []byte content at a location designated by "path". 161func (d *driver) PutContent(ctx context.Context, subPath string, contents []byte) error { 162 writer, err := d.Writer(ctx, subPath, false) 163 if err != nil { 164 return err 165 } 166 defer writer.Close() 167 _, err = io.Copy(writer, bytes.NewReader(contents)) 168 if err != nil { 169 writer.Cancel() 170 return err 171 } 172 return writer.Commit() 173} 174 175// Reader retrieves an io.ReadCloser for the content stored at "path" with a 176// given byte offset. 177func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { 178 file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644) 179 if err != nil { 180 if os.IsNotExist(err) { 181 return nil, storagedriver.PathNotFoundError{Path: path} 182 } 183 184 return nil, err 185 } 186 187 seekPos, err := file.Seek(int64(offset), os.SEEK_SET) 188 if err != nil { 189 file.Close() 190 return nil, err 191 } else if seekPos < int64(offset) { 192 file.Close() 193 return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} 194 } 195 196 return file, nil 197} 198 199func (d *driver) Writer(ctx context.Context, subPath string, append bool) (storagedriver.FileWriter, error) { 200 fullPath := d.fullPath(subPath) 201 parentDir := path.Dir(fullPath) 202 if err := os.MkdirAll(parentDir, 0777); err != nil { 203 return nil, err 204 } 205 206 fp, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0666) 207 if err != nil { 208 return nil, err 209 } 210 211 var offset int64 212 213 if !append { 214 err := fp.Truncate(0) 215 if err != nil { 216 fp.Close() 217 return nil, err 218 } 219 } else { 220 n, err := fp.Seek(0, os.SEEK_END) 221 if err != nil { 222 fp.Close() 223 return nil, err 224 } 225 offset = int64(n) 226 } 227 228 return newFileWriter(fp, offset), nil 229} 230 231// Stat retrieves the FileInfo for the given path, including the current size 232// in bytes and the creation time. 233func (d *driver) Stat(ctx context.Context, subPath string) (storagedriver.FileInfo, error) { 234 fullPath := d.fullPath(subPath) 235 236 fi, err := os.Stat(fullPath) 237 if err != nil { 238 if os.IsNotExist(err) { 239 return nil, storagedriver.PathNotFoundError{Path: subPath} 240 } 241 242 return nil, err 243 } 244 245 return fileInfo{ 246 path: subPath, 247 FileInfo: fi, 248 }, nil 249} 250 251// List returns a list of the objects that are direct descendants of the given 252// path. 253func (d *driver) List(ctx context.Context, subPath string) ([]string, error) { 254 fullPath := d.fullPath(subPath) 255 256 dir, err := os.Open(fullPath) 257 if err != nil { 258 if os.IsNotExist(err) { 259 return nil, storagedriver.PathNotFoundError{Path: subPath} 260 } 261 return nil, err 262 } 263 264 defer dir.Close() 265 266 fileNames, err := dir.Readdirnames(0) 267 if err != nil { 268 return nil, err 269 } 270 271 keys := make([]string, 0, len(fileNames)) 272 for _, fileName := range fileNames { 273 keys = append(keys, path.Join(subPath, fileName)) 274 } 275 276 return keys, nil 277} 278 279// Move moves an object stored at sourcePath to destPath, removing the original 280// object. 281func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error { 282 source := d.fullPath(sourcePath) 283 dest := d.fullPath(destPath) 284 285 if _, err := os.Stat(source); os.IsNotExist(err) { 286 return storagedriver.PathNotFoundError{Path: sourcePath} 287 } 288 289 if err := os.MkdirAll(path.Dir(dest), 0755); err != nil { 290 return err 291 } 292 293 err := os.Rename(source, dest) 294 return err 295} 296 297// Delete recursively deletes all objects stored at "path" and its subpaths. 298func (d *driver) Delete(ctx context.Context, subPath string) error { 299 fullPath := d.fullPath(subPath) 300 301 _, err := os.Stat(fullPath) 302 if err != nil && !os.IsNotExist(err) { 303 return err 304 } else if err != nil { 305 return storagedriver.PathNotFoundError{Path: subPath} 306 } 307 308 err = os.RemoveAll(fullPath) 309 return err 310} 311 312// URLFor returns a URL which may be used to retrieve the content stored at the given path. 313// May return an UnsupportedMethodErr in certain StorageDriver implementations. 314func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { 315 return "", storagedriver.ErrUnsupportedMethod{} 316} 317 318// fullPath returns the absolute path of a key within the Driver's storage. 319func (d *driver) fullPath(subPath string) string { 320 return path.Join(d.rootDirectory, subPath) 321} 322 323type fileInfo struct { 324 os.FileInfo 325 path string 326} 327 328var _ storagedriver.FileInfo = fileInfo{} 329 330// Path provides the full path of the target of this file info. 331func (fi fileInfo) Path() string { 332 return fi.path 333} 334 335// Size returns current length in bytes of the file. The return value can 336// be used to write to the end of the file at path. The value is 337// meaningless if IsDir returns true. 338func (fi fileInfo) Size() int64 { 339 if fi.IsDir() { 340 return 0 341 } 342 343 return fi.FileInfo.Size() 344} 345 346// ModTime returns the modification time for the file. For backends that 347// don't have a modification time, the creation time should be returned. 348func (fi fileInfo) ModTime() time.Time { 349 return fi.FileInfo.ModTime() 350} 351 352// IsDir returns true if the path is a directory. 353func (fi fileInfo) IsDir() bool { 354 return fi.FileInfo.IsDir() 355} 356 357type fileWriter struct { 358 file *os.File 359 size int64 360 bw *bufio.Writer 361 closed bool 362 committed bool 363 cancelled bool 364} 365 366func newFileWriter(file *os.File, size int64) *fileWriter { 367 return &fileWriter{ 368 file: file, 369 size: size, 370 bw: bufio.NewWriter(file), 371 } 372} 373 374func (fw *fileWriter) Write(p []byte) (int, error) { 375 if fw.closed { 376 return 0, fmt.Errorf("already closed") 377 } else if fw.committed { 378 return 0, fmt.Errorf("already committed") 379 } else if fw.cancelled { 380 return 0, fmt.Errorf("already cancelled") 381 } 382 n, err := fw.bw.Write(p) 383 fw.size += int64(n) 384 return n, err 385} 386 387func (fw *fileWriter) Size() int64 { 388 return fw.size 389} 390 391func (fw *fileWriter) Close() error { 392 if fw.closed { 393 return fmt.Errorf("already closed") 394 } 395 396 if err := fw.bw.Flush(); err != nil { 397 return err 398 } 399 400 if err := fw.file.Sync(); err != nil { 401 return err 402 } 403 404 if err := fw.file.Close(); err != nil { 405 return err 406 } 407 fw.closed = true 408 return nil 409} 410 411func (fw *fileWriter) Cancel() error { 412 if fw.closed { 413 return fmt.Errorf("already closed") 414 } 415 416 fw.cancelled = true 417 fw.file.Close() 418 return os.Remove(fw.file.Name()) 419} 420 421func (fw *fileWriter) Commit() error { 422 if fw.closed { 423 return fmt.Errorf("already closed") 424 } else if fw.committed { 425 return fmt.Errorf("already committed") 426 } else if fw.cancelled { 427 return fmt.Errorf("already cancelled") 428 } 429 430 if err := fw.bw.Flush(); err != nil { 431 return err 432 } 433 434 if err := fw.file.Sync(); err != nil { 435 return err 436 } 437 438 fw.committed = true 439 return nil 440} 441