1package filesystem
2
3import (
4	"bufio"
5	"bytes"
6	"context"
7	"fmt"
8	"io"
9	"io/ioutil"
10	"os"
11	"path"
12	"time"
13
14	storagedriver "github.com/docker/distribution/registry/storage/driver"
15	"github.com/docker/distribution/registry/storage/driver/base"
16	"github.com/docker/distribution/registry/storage/driver/factory"
17)
18
19const (
20	driverName           = "filesystem"
21	defaultRootDirectory = "/var/lib/registry"
22	defaultMaxThreads    = uint64(100)
23
24	// minThreads is the minimum value for the maxthreads configuration
25	// parameter. If the driver's parameters are less than this we set
26	// the parameters to minThreads
27	minThreads = uint64(25)
28)
29
30// DriverParameters represents all configuration options available for the
31// filesystem driver
32type DriverParameters struct {
33	RootDirectory string
34	MaxThreads    uint64
35}
36
37func init() {
38	factory.Register(driverName, &filesystemDriverFactory{})
39}
40
41// filesystemDriverFactory implements the factory.StorageDriverFactory interface
42type filesystemDriverFactory struct{}
43
44func (factory *filesystemDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
45	return FromParameters(parameters)
46}
47
48type driver struct {
49	rootDirectory string
50}
51
52type baseEmbed struct {
53	base.Base
54}
55
56// Driver is a storagedriver.StorageDriver implementation backed by a local
57// filesystem. All provided paths will be subpaths of the RootDirectory.
58type Driver struct {
59	baseEmbed
60}
61
62// FromParameters constructs a new Driver with a given parameters map
63// Optional Parameters:
64// - rootdirectory
65// - maxthreads
66func FromParameters(parameters map[string]interface{}) (*Driver, error) {
67	params, err := fromParametersImpl(parameters)
68	if err != nil || params == nil {
69		return nil, err
70	}
71	return New(*params), nil
72}
73
74func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) {
75	var (
76		err           error
77		maxThreads    = defaultMaxThreads
78		rootDirectory = defaultRootDirectory
79	)
80
81	if parameters != nil {
82		if rootDir, ok := parameters["rootdirectory"]; ok {
83			rootDirectory = fmt.Sprint(rootDir)
84		}
85
86		maxThreads, err = base.GetLimitFromParameter(parameters["maxthreads"], minThreads, defaultMaxThreads)
87		if err != nil {
88			return nil, fmt.Errorf("maxthreads config error: %s", err.Error())
89		}
90	}
91
92	params := &DriverParameters{
93		RootDirectory: rootDirectory,
94		MaxThreads:    maxThreads,
95	}
96	return params, nil
97}
98
99// New constructs a new Driver with a given rootDirectory
100func New(params DriverParameters) *Driver {
101	fsDriver := &driver{rootDirectory: params.RootDirectory}
102
103	return &Driver{
104		baseEmbed: baseEmbed{
105			Base: base.Base{
106				StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads),
107			},
108		},
109	}
110}
111
112// Implement the storagedriver.StorageDriver interface
113
114func (d *driver) Name() string {
115	return driverName
116}
117
118// GetContent retrieves the content stored at "path" as a []byte.
119func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
120	rc, err := d.Reader(ctx, path, 0)
121	if err != nil {
122		return nil, err
123	}
124	defer rc.Close()
125
126	p, err := ioutil.ReadAll(rc)
127	if err != nil {
128		return nil, err
129	}
130
131	return p, nil
132}
133
134// PutContent stores the []byte content at a location designated by "path".
135func (d *driver) PutContent(ctx context.Context, subPath string, contents []byte) error {
136	writer, err := d.Writer(ctx, subPath, false)
137	if err != nil {
138		return err
139	}
140	defer writer.Close()
141	_, err = io.Copy(writer, bytes.NewReader(contents))
142	if err != nil {
143		writer.Cancel()
144		return err
145	}
146	return writer.Commit()
147}
148
149// Reader retrieves an io.ReadCloser for the content stored at "path" with a
150// given byte offset.
151func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
152	file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644)
153	if err != nil {
154		if os.IsNotExist(err) {
155			return nil, storagedriver.PathNotFoundError{Path: path}
156		}
157
158		return nil, err
159	}
160
161	seekPos, err := file.Seek(offset, io.SeekStart)
162	if err != nil {
163		file.Close()
164		return nil, err
165	} else if seekPos < offset {
166		file.Close()
167		return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
168	}
169
170	return file, nil
171}
172
173func (d *driver) Writer(ctx context.Context, subPath string, append bool) (storagedriver.FileWriter, error) {
174	fullPath := d.fullPath(subPath)
175	parentDir := path.Dir(fullPath)
176	if err := os.MkdirAll(parentDir, 0777); err != nil {
177		return nil, err
178	}
179
180	fp, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0666)
181	if err != nil {
182		return nil, err
183	}
184
185	var offset int64
186
187	if !append {
188		err := fp.Truncate(0)
189		if err != nil {
190			fp.Close()
191			return nil, err
192		}
193	} else {
194		n, err := fp.Seek(0, io.SeekEnd)
195		if err != nil {
196			fp.Close()
197			return nil, err
198		}
199		offset = n
200	}
201
202	return newFileWriter(fp, offset), nil
203}
204
205// Stat retrieves the FileInfo for the given path, including the current size
206// in bytes and the creation time.
207func (d *driver) Stat(ctx context.Context, subPath string) (storagedriver.FileInfo, error) {
208	fullPath := d.fullPath(subPath)
209
210	fi, err := os.Stat(fullPath)
211	if err != nil {
212		if os.IsNotExist(err) {
213			return nil, storagedriver.PathNotFoundError{Path: subPath}
214		}
215
216		return nil, err
217	}
218
219	return fileInfo{
220		path:     subPath,
221		FileInfo: fi,
222	}, nil
223}
224
225// List returns a list of the objects that are direct descendants of the given
226// path.
227func (d *driver) List(ctx context.Context, subPath string) ([]string, error) {
228	fullPath := d.fullPath(subPath)
229
230	dir, err := os.Open(fullPath)
231	if err != nil {
232		if os.IsNotExist(err) {
233			return nil, storagedriver.PathNotFoundError{Path: subPath}
234		}
235		return nil, err
236	}
237
238	defer dir.Close()
239
240	fileNames, err := dir.Readdirnames(0)
241	if err != nil {
242		return nil, err
243	}
244
245	keys := make([]string, 0, len(fileNames))
246	for _, fileName := range fileNames {
247		keys = append(keys, path.Join(subPath, fileName))
248	}
249
250	return keys, nil
251}
252
253// Move moves an object stored at sourcePath to destPath, removing the original
254// object.
255func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
256	source := d.fullPath(sourcePath)
257	dest := d.fullPath(destPath)
258
259	if _, err := os.Stat(source); os.IsNotExist(err) {
260		return storagedriver.PathNotFoundError{Path: sourcePath}
261	}
262
263	if err := os.MkdirAll(path.Dir(dest), 0755); err != nil {
264		return err
265	}
266
267	err := os.Rename(source, dest)
268	return err
269}
270
271// Delete recursively deletes all objects stored at "path" and its subpaths.
272func (d *driver) Delete(ctx context.Context, subPath string) error {
273	fullPath := d.fullPath(subPath)
274
275	_, err := os.Stat(fullPath)
276	if err != nil && !os.IsNotExist(err) {
277		return err
278	} else if err != nil {
279		return storagedriver.PathNotFoundError{Path: subPath}
280	}
281
282	err = os.RemoveAll(fullPath)
283	return err
284}
285
286// URLFor returns a URL which may be used to retrieve the content stored at the given path.
287// May return an UnsupportedMethodErr in certain StorageDriver implementations.
288func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
289	return "", storagedriver.ErrUnsupportedMethod{}
290}
291
292// Walk traverses a filesystem defined within driver, starting
293// from the given path, calling f on each file
294func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
295	return storagedriver.WalkFallback(ctx, d, path, f)
296}
297
298// fullPath returns the absolute path of a key within the Driver's storage.
299func (d *driver) fullPath(subPath string) string {
300	return path.Join(d.rootDirectory, subPath)
301}
302
303type fileInfo struct {
304	os.FileInfo
305	path string
306}
307
308var _ storagedriver.FileInfo = fileInfo{}
309
310// Path provides the full path of the target of this file info.
311func (fi fileInfo) Path() string {
312	return fi.path
313}
314
315// Size returns current length in bytes of the file. The return value can
316// be used to write to the end of the file at path. The value is
317// meaningless if IsDir returns true.
318func (fi fileInfo) Size() int64 {
319	if fi.IsDir() {
320		return 0
321	}
322
323	return fi.FileInfo.Size()
324}
325
326// ModTime returns the modification time for the file. For backends that
327// don't have a modification time, the creation time should be returned.
328func (fi fileInfo) ModTime() time.Time {
329	return fi.FileInfo.ModTime()
330}
331
332// IsDir returns true if the path is a directory.
333func (fi fileInfo) IsDir() bool {
334	return fi.FileInfo.IsDir()
335}
336
337type fileWriter struct {
338	file      *os.File
339	size      int64
340	bw        *bufio.Writer
341	closed    bool
342	committed bool
343	cancelled bool
344}
345
346func newFileWriter(file *os.File, size int64) *fileWriter {
347	return &fileWriter{
348		file: file,
349		size: size,
350		bw:   bufio.NewWriter(file),
351	}
352}
353
354func (fw *fileWriter) Write(p []byte) (int, error) {
355	if fw.closed {
356		return 0, fmt.Errorf("already closed")
357	} else if fw.committed {
358		return 0, fmt.Errorf("already committed")
359	} else if fw.cancelled {
360		return 0, fmt.Errorf("already cancelled")
361	}
362	n, err := fw.bw.Write(p)
363	fw.size += int64(n)
364	return n, err
365}
366
367func (fw *fileWriter) Size() int64 {
368	return fw.size
369}
370
371func (fw *fileWriter) Close() error {
372	if fw.closed {
373		return fmt.Errorf("already closed")
374	}
375
376	if err := fw.bw.Flush(); err != nil {
377		return err
378	}
379
380	if err := fw.file.Sync(); err != nil {
381		return err
382	}
383
384	if err := fw.file.Close(); err != nil {
385		return err
386	}
387	fw.closed = true
388	return nil
389}
390
391func (fw *fileWriter) Cancel() error {
392	if fw.closed {
393		return fmt.Errorf("already closed")
394	}
395
396	fw.cancelled = true
397	fw.file.Close()
398	return os.Remove(fw.file.Name())
399}
400
401func (fw *fileWriter) Commit() error {
402	if fw.closed {
403		return fmt.Errorf("already closed")
404	} else if fw.committed {
405		return fmt.Errorf("already committed")
406	} else if fw.cancelled {
407		return fmt.Errorf("already cancelled")
408	}
409
410	if err := fw.bw.Flush(); err != nil {
411		return err
412	}
413
414	if err := fw.file.Sync(); err != nil {
415		return err
416	}
417
418	fw.committed = true
419	return nil
420}
421