1package filesystem
2
3import (
4	"bufio"
5	"bytes"
6	"fmt"
7	"io"
8	"io/ioutil"
9	"os"
10	"path"
11	"reflect"
12	"strconv"
13	"time"
14
15	"github.com/docker/distribution/context"
16	storagedriver "github.com/docker/distribution/registry/storage/driver"
17	"github.com/docker/distribution/registry/storage/driver/base"
18	"github.com/docker/distribution/registry/storage/driver/factory"
19)
20
21const (
22	driverName           = "filesystem"
23	defaultRootDirectory = "/var/lib/registry"
24	defaultMaxThreads    = uint64(100)
25
26	// minThreads is the minimum value for the maxthreads configuration
27	// parameter. If the driver's parameters are less than this we set
28	// the parameters to minThreads
29	minThreads = uint64(25)
30)
31
32// DriverParameters represents all configuration options available for the
33// filesystem driver
34type DriverParameters struct {
35	RootDirectory string
36	MaxThreads    uint64
37}
38
39func init() {
40	factory.Register(driverName, &filesystemDriverFactory{})
41}
42
43// filesystemDriverFactory implements the factory.StorageDriverFactory interface
44type filesystemDriverFactory struct{}
45
46func (factory *filesystemDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
47	return FromParameters(parameters)
48}
49
50type driver struct {
51	rootDirectory string
52}
53
54type baseEmbed struct {
55	base.Base
56}
57
58// Driver is a storagedriver.StorageDriver implementation backed by a local
59// filesystem. All provided paths will be subpaths of the RootDirectory.
60type Driver struct {
61	baseEmbed
62}
63
64// FromParameters constructs a new Driver with a given parameters map
65// Optional Parameters:
66// - rootdirectory
67// - maxthreads
68func FromParameters(parameters map[string]interface{}) (*Driver, error) {
69	params, err := fromParametersImpl(parameters)
70	if err != nil || params == nil {
71		return nil, err
72	}
73	return New(*params), nil
74}
75
76func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) {
77	var (
78		err           error
79		maxThreads    = defaultMaxThreads
80		rootDirectory = defaultRootDirectory
81	)
82
83	if parameters != nil {
84		if rootDir, ok := parameters["rootdirectory"]; ok {
85			rootDirectory = fmt.Sprint(rootDir)
86		}
87
88		// Get maximum number of threads for blocking filesystem operations,
89		// if specified
90		threads := parameters["maxthreads"]
91		switch v := threads.(type) {
92		case string:
93			if maxThreads, err = strconv.ParseUint(v, 0, 64); err != nil {
94				return nil, fmt.Errorf("maxthreads parameter must be an integer, %v invalid", threads)
95			}
96		case uint64:
97			maxThreads = v
98		case int, int32, int64:
99			val := reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int()
100			// If threads is negative casting to uint64 will wrap around and
101			// give you the hugest thread limit ever. Let's be sensible, here
102			if val > 0 {
103				maxThreads = uint64(val)
104			}
105		case uint, uint32:
106			maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint()
107		case nil:
108			// do nothing
109		default:
110			return nil, fmt.Errorf("invalid value for maxthreads: %#v", threads)
111		}
112
113		if maxThreads < minThreads {
114			maxThreads = minThreads
115		}
116	}
117
118	params := &DriverParameters{
119		RootDirectory: rootDirectory,
120		MaxThreads:    maxThreads,
121	}
122	return params, nil
123}
124
125// New constructs a new Driver with a given rootDirectory
126func New(params DriverParameters) *Driver {
127	fsDriver := &driver{rootDirectory: params.RootDirectory}
128
129	return &Driver{
130		baseEmbed: baseEmbed{
131			Base: base.Base{
132				StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads),
133			},
134		},
135	}
136}
137
138// Implement the storagedriver.StorageDriver interface
139
140func (d *driver) Name() string {
141	return driverName
142}
143
144// GetContent retrieves the content stored at "path" as a []byte.
145func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
146	rc, err := d.Reader(ctx, path, 0)
147	if err != nil {
148		return nil, err
149	}
150	defer rc.Close()
151
152	p, err := ioutil.ReadAll(rc)
153	if err != nil {
154		return nil, err
155	}
156
157	return p, nil
158}
159
160// PutContent stores the []byte content at a location designated by "path".
161func (d *driver) PutContent(ctx context.Context, subPath string, contents []byte) error {
162	writer, err := d.Writer(ctx, subPath, false)
163	if err != nil {
164		return err
165	}
166	defer writer.Close()
167	_, err = io.Copy(writer, bytes.NewReader(contents))
168	if err != nil {
169		writer.Cancel()
170		return err
171	}
172	return writer.Commit()
173}
174
175// Reader retrieves an io.ReadCloser for the content stored at "path" with a
176// given byte offset.
177func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
178	file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644)
179	if err != nil {
180		if os.IsNotExist(err) {
181			return nil, storagedriver.PathNotFoundError{Path: path}
182		}
183
184		return nil, err
185	}
186
187	seekPos, err := file.Seek(int64(offset), os.SEEK_SET)
188	if err != nil {
189		file.Close()
190		return nil, err
191	} else if seekPos < int64(offset) {
192		file.Close()
193		return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
194	}
195
196	return file, nil
197}
198
199func (d *driver) Writer(ctx context.Context, subPath string, append bool) (storagedriver.FileWriter, error) {
200	fullPath := d.fullPath(subPath)
201	parentDir := path.Dir(fullPath)
202	if err := os.MkdirAll(parentDir, 0777); err != nil {
203		return nil, err
204	}
205
206	fp, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0666)
207	if err != nil {
208		return nil, err
209	}
210
211	var offset int64
212
213	if !append {
214		err := fp.Truncate(0)
215		if err != nil {
216			fp.Close()
217			return nil, err
218		}
219	} else {
220		n, err := fp.Seek(0, os.SEEK_END)
221		if err != nil {
222			fp.Close()
223			return nil, err
224		}
225		offset = int64(n)
226	}
227
228	return newFileWriter(fp, offset), nil
229}
230
231// Stat retrieves the FileInfo for the given path, including the current size
232// in bytes and the creation time.
233func (d *driver) Stat(ctx context.Context, subPath string) (storagedriver.FileInfo, error) {
234	fullPath := d.fullPath(subPath)
235
236	fi, err := os.Stat(fullPath)
237	if err != nil {
238		if os.IsNotExist(err) {
239			return nil, storagedriver.PathNotFoundError{Path: subPath}
240		}
241
242		return nil, err
243	}
244
245	return fileInfo{
246		path:     subPath,
247		FileInfo: fi,
248	}, nil
249}
250
251// List returns a list of the objects that are direct descendants of the given
252// path.
253func (d *driver) List(ctx context.Context, subPath string) ([]string, error) {
254	fullPath := d.fullPath(subPath)
255
256	dir, err := os.Open(fullPath)
257	if err != nil {
258		if os.IsNotExist(err) {
259			return nil, storagedriver.PathNotFoundError{Path: subPath}
260		}
261		return nil, err
262	}
263
264	defer dir.Close()
265
266	fileNames, err := dir.Readdirnames(0)
267	if err != nil {
268		return nil, err
269	}
270
271	keys := make([]string, 0, len(fileNames))
272	for _, fileName := range fileNames {
273		keys = append(keys, path.Join(subPath, fileName))
274	}
275
276	return keys, nil
277}
278
279// Move moves an object stored at sourcePath to destPath, removing the original
280// object.
281func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
282	source := d.fullPath(sourcePath)
283	dest := d.fullPath(destPath)
284
285	if _, err := os.Stat(source); os.IsNotExist(err) {
286		return storagedriver.PathNotFoundError{Path: sourcePath}
287	}
288
289	if err := os.MkdirAll(path.Dir(dest), 0755); err != nil {
290		return err
291	}
292
293	err := os.Rename(source, dest)
294	return err
295}
296
297// Delete recursively deletes all objects stored at "path" and its subpaths.
298func (d *driver) Delete(ctx context.Context, subPath string) error {
299	fullPath := d.fullPath(subPath)
300
301	_, err := os.Stat(fullPath)
302	if err != nil && !os.IsNotExist(err) {
303		return err
304	} else if err != nil {
305		return storagedriver.PathNotFoundError{Path: subPath}
306	}
307
308	err = os.RemoveAll(fullPath)
309	return err
310}
311
312// URLFor returns a URL which may be used to retrieve the content stored at the given path.
313// May return an UnsupportedMethodErr in certain StorageDriver implementations.
314func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
315	return "", storagedriver.ErrUnsupportedMethod{}
316}
317
318// fullPath returns the absolute path of a key within the Driver's storage.
319func (d *driver) fullPath(subPath string) string {
320	return path.Join(d.rootDirectory, subPath)
321}
322
323type fileInfo struct {
324	os.FileInfo
325	path string
326}
327
328var _ storagedriver.FileInfo = fileInfo{}
329
330// Path provides the full path of the target of this file info.
331func (fi fileInfo) Path() string {
332	return fi.path
333}
334
335// Size returns current length in bytes of the file. The return value can
336// be used to write to the end of the file at path. The value is
337// meaningless if IsDir returns true.
338func (fi fileInfo) Size() int64 {
339	if fi.IsDir() {
340		return 0
341	}
342
343	return fi.FileInfo.Size()
344}
345
346// ModTime returns the modification time for the file. For backends that
347// don't have a modification time, the creation time should be returned.
348func (fi fileInfo) ModTime() time.Time {
349	return fi.FileInfo.ModTime()
350}
351
352// IsDir returns true if the path is a directory.
353func (fi fileInfo) IsDir() bool {
354	return fi.FileInfo.IsDir()
355}
356
357type fileWriter struct {
358	file      *os.File
359	size      int64
360	bw        *bufio.Writer
361	closed    bool
362	committed bool
363	cancelled bool
364}
365
366func newFileWriter(file *os.File, size int64) *fileWriter {
367	return &fileWriter{
368		file: file,
369		size: size,
370		bw:   bufio.NewWriter(file),
371	}
372}
373
374func (fw *fileWriter) Write(p []byte) (int, error) {
375	if fw.closed {
376		return 0, fmt.Errorf("already closed")
377	} else if fw.committed {
378		return 0, fmt.Errorf("already committed")
379	} else if fw.cancelled {
380		return 0, fmt.Errorf("already cancelled")
381	}
382	n, err := fw.bw.Write(p)
383	fw.size += int64(n)
384	return n, err
385}
386
387func (fw *fileWriter) Size() int64 {
388	return fw.size
389}
390
391func (fw *fileWriter) Close() error {
392	if fw.closed {
393		return fmt.Errorf("already closed")
394	}
395
396	if err := fw.bw.Flush(); err != nil {
397		return err
398	}
399
400	if err := fw.file.Sync(); err != nil {
401		return err
402	}
403
404	if err := fw.file.Close(); err != nil {
405		return err
406	}
407	fw.closed = true
408	return nil
409}
410
411func (fw *fileWriter) Cancel() error {
412	if fw.closed {
413		return fmt.Errorf("already closed")
414	}
415
416	fw.cancelled = true
417	fw.file.Close()
418	return os.Remove(fw.file.Name())
419}
420
421func (fw *fileWriter) Commit() error {
422	if fw.closed {
423		return fmt.Errorf("already closed")
424	} else if fw.committed {
425		return fmt.Errorf("already committed")
426	} else if fw.cancelled {
427		return fmt.Errorf("already cancelled")
428	}
429
430	if err := fw.bw.Flush(); err != nil {
431		return err
432	}
433
434	if err := fw.file.Sync(); err != nil {
435		return err
436	}
437
438	fw.committed = true
439	return nil
440}
441