1// mgo - MongoDB driver for Go
2//
3// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
4//
5// All rights reserved.
6//
7// Redistribution and use in source and binary forms, with or without
8// modification, are permitted provided that the following conditions are met:
9//
10// 1. Redistributions of source code must retain the above copyright notice, this
11//    list of conditions and the following disclaimer.
12// 2. Redistributions in binary form must reproduce the above copyright notice,
13//    this list of conditions and the following disclaimer in the documentation
14//    and/or other materials provided with the distribution.
15//
16// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
20// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27package mgo
28
29import (
30	"crypto/md5"
31	"encoding/hex"
32	"errors"
33	"hash"
34	"io"
35	"os"
36	"sync"
37	"time"
38
39	"gopkg.in/mgo.v2/bson"
40)
41
42type GridFS struct {
43	Files  *Collection
44	Chunks *Collection
45}
46
47type gfsFileMode int
48
49const (
50	gfsClosed  gfsFileMode = 0
51	gfsReading gfsFileMode = 1
52	gfsWriting gfsFileMode = 2
53)
54
55type GridFile struct {
56	m    sync.Mutex
57	c    sync.Cond
58	gfs  *GridFS
59	mode gfsFileMode
60	err  error
61
62	chunk  int
63	offset int64
64
65	wpending int
66	wbuf     []byte
67	wsum     hash.Hash
68
69	rbuf   []byte
70	rcache *gfsCachedChunk
71
72	doc gfsFile
73}
74
75type gfsFile struct {
76	Id          interface{} "_id"
77	ChunkSize   int         "chunkSize"
78	UploadDate  time.Time   "uploadDate"
79	Length      int64       ",minsize"
80	MD5         string
81	Filename    string    ",omitempty"
82	ContentType string    "contentType,omitempty"
83	Metadata    *bson.Raw ",omitempty"
84}
85
86type gfsChunk struct {
87	Id      interface{} "_id"
88	FilesId interface{} "files_id"
89	N       int
90	Data    []byte
91}
92
93type gfsCachedChunk struct {
94	wait sync.Mutex
95	n    int
96	data []byte
97	err  error
98}
99
100func newGridFS(db *Database, prefix string) *GridFS {
101	return &GridFS{db.C(prefix + ".files"), db.C(prefix + ".chunks")}
102}
103
104func (gfs *GridFS) newFile() *GridFile {
105	file := &GridFile{gfs: gfs}
106	file.c.L = &file.m
107	//runtime.SetFinalizer(file, finalizeFile)
108	return file
109}
110
111func finalizeFile(file *GridFile) {
112	file.Close()
113}
114
115// Create creates a new file with the provided name in the GridFS.  If the file
116// name already exists, a new version will be inserted with an up-to-date
117// uploadDate that will cause it to be atomically visible to the Open and
118// OpenId methods.  If the file name is not important, an empty name may be
119// provided and the file Id used instead.
120//
121// It's important to Close files whether they are being written to
122// or read from, and to check the err result to ensure the operation
123// completed successfully.
124//
125// A simple example inserting a new file:
126//
127//     func check(err error) {
128//         if err != nil {
129//             panic(err.String())
130//         }
131//     }
132//     file, err := db.GridFS("fs").Create("myfile.txt")
133//     check(err)
134//     n, err := file.Write([]byte("Hello world!"))
135//     check(err)
136//     err = file.Close()
137//     check(err)
138//     fmt.Printf("%d bytes written\n", n)
139//
140// The io.Writer interface is implemented by *GridFile and may be used to
141// help on the file creation.  For example:
142//
143//     file, err := db.GridFS("fs").Create("myfile.txt")
144//     check(err)
145//     messages, err := os.Open("/var/log/messages")
146//     check(err)
147//     defer messages.Close()
148//     err = io.Copy(file, messages)
149//     check(err)
150//     err = file.Close()
151//     check(err)
152//
153func (gfs *GridFS) Create(name string) (file *GridFile, err error) {
154	file = gfs.newFile()
155	file.mode = gfsWriting
156	file.wsum = md5.New()
157	file.doc = gfsFile{Id: bson.NewObjectId(), ChunkSize: 255 * 1024, Filename: name}
158	return
159}
160
161// OpenId returns the file with the provided id, for reading.
162// If the file isn't found, err will be set to mgo.ErrNotFound.
163//
164// It's important to Close files whether they are being written to
165// or read from, and to check the err result to ensure the operation
166// completed successfully.
167//
168// The following example will print the first 8192 bytes from the file:
169//
170//     func check(err error) {
171//         if err != nil {
172//             panic(err.String())
173//         }
174//     }
175//     file, err := db.GridFS("fs").OpenId(objid)
176//     check(err)
177//     b := make([]byte, 8192)
178//     n, err := file.Read(b)
179//     check(err)
180//     fmt.Println(string(b))
181//     check(err)
182//     err = file.Close()
183//     check(err)
184//     fmt.Printf("%d bytes read\n", n)
185//
186// The io.Reader interface is implemented by *GridFile and may be used to
187// deal with it.  As an example, the following snippet will dump the whole
188// file into the standard output:
189//
190//     file, err := db.GridFS("fs").OpenId(objid)
191//     check(err)
192//     err = io.Copy(os.Stdout, file)
193//     check(err)
194//     err = file.Close()
195//     check(err)
196//
197func (gfs *GridFS) OpenId(id interface{}) (file *GridFile, err error) {
198	var doc gfsFile
199	err = gfs.Files.Find(bson.M{"_id": id}).One(&doc)
200	if err != nil {
201		return
202	}
203	file = gfs.newFile()
204	file.mode = gfsReading
205	file.doc = doc
206	return
207}
208
209// Open returns the most recently uploaded file with the provided
210// name, for reading. If the file isn't found, err will be set
211// to mgo.ErrNotFound.
212//
213// It's important to Close files whether they are being written to
214// or read from, and to check the err result to ensure the operation
215// completed successfully.
216//
217// The following example will print the first 8192 bytes from the file:
218//
219//     file, err := db.GridFS("fs").Open("myfile.txt")
220//     check(err)
221//     b := make([]byte, 8192)
222//     n, err := file.Read(b)
223//     check(err)
224//     fmt.Println(string(b))
225//     check(err)
226//     err = file.Close()
227//     check(err)
228//     fmt.Printf("%d bytes read\n", n)
229//
230// The io.Reader interface is implemented by *GridFile and may be used to
231// deal with it.  As an example, the following snippet will dump the whole
232// file into the standard output:
233//
234//     file, err := db.GridFS("fs").Open("myfile.txt")
235//     check(err)
236//     err = io.Copy(os.Stdout, file)
237//     check(err)
238//     err = file.Close()
239//     check(err)
240//
241func (gfs *GridFS) Open(name string) (file *GridFile, err error) {
242	var doc gfsFile
243	err = gfs.Files.Find(bson.M{"filename": name}).Sort("-uploadDate").One(&doc)
244	if err != nil {
245		return
246	}
247	file = gfs.newFile()
248	file.mode = gfsReading
249	file.doc = doc
250	return
251}
252
253// OpenNext opens the next file from iter for reading, sets *file to it,
254// and returns true on the success case. If no more documents are available
255// on iter or an error occurred, *file is set to nil and the result is false.
256// Errors will be available via iter.Err().
257//
258// The iter parameter must be an iterator on the GridFS files collection.
259// Using the GridFS.Find method is an easy way to obtain such an iterator,
260// but any iterator on the collection will work.
261//
262// If the provided *file is non-nil, OpenNext will close it before attempting
263// to iterate to the next element. This means that in a loop one only
264// has to worry about closing files when breaking out of the loop early
265// (break, return, or panic).
266//
267// For example:
268//
269//     gfs := db.GridFS("fs")
270//     query := gfs.Find(nil).Sort("filename")
271//     iter := query.Iter()
272//     var f *mgo.GridFile
273//     for gfs.OpenNext(iter, &f) {
274//         fmt.Printf("Filename: %s\n", f.Name())
275//     }
276//     if iter.Close() != nil {
277//         panic(iter.Close())
278//     }
279//
280func (gfs *GridFS) OpenNext(iter *Iter, file **GridFile) bool {
281	if *file != nil {
282		// Ignoring the error here shouldn't be a big deal
283		// as we're reading the file and the loop iteration
284		// for this file is finished.
285		_ = (*file).Close()
286	}
287	var doc gfsFile
288	if !iter.Next(&doc) {
289		*file = nil
290		return false
291	}
292	f := gfs.newFile()
293	f.mode = gfsReading
294	f.doc = doc
295	*file = f
296	return true
297}
298
299// Find runs query on GridFS's files collection and returns
300// the resulting Query.
301//
302// This logic:
303//
304//     gfs := db.GridFS("fs")
305//     iter := gfs.Find(nil).Iter()
306//
307// Is equivalent to:
308//
309//     files := db.C("fs" + ".files")
310//     iter := files.Find(nil).Iter()
311//
312func (gfs *GridFS) Find(query interface{}) *Query {
313	return gfs.Files.Find(query)
314}
315
316// RemoveId deletes the file with the provided id from the GridFS.
317func (gfs *GridFS) RemoveId(id interface{}) error {
318	err := gfs.Files.Remove(bson.M{"_id": id})
319	if err != nil {
320		return err
321	}
322	_, err = gfs.Chunks.RemoveAll(bson.D{{"files_id", id}})
323	return err
324}
325
326type gfsDocId struct {
327	Id interface{} "_id"
328}
329
330// Remove deletes all files with the provided name from the GridFS.
331func (gfs *GridFS) Remove(name string) (err error) {
332	iter := gfs.Files.Find(bson.M{"filename": name}).Select(bson.M{"_id": 1}).Iter()
333	var doc gfsDocId
334	for iter.Next(&doc) {
335		if e := gfs.RemoveId(doc.Id); e != nil {
336			err = e
337		}
338	}
339	if err == nil {
340		err = iter.Close()
341	}
342	return err
343}
344
345func (file *GridFile) assertMode(mode gfsFileMode) {
346	switch file.mode {
347	case mode:
348		return
349	case gfsWriting:
350		panic("GridFile is open for writing")
351	case gfsReading:
352		panic("GridFile is open for reading")
353	case gfsClosed:
354		panic("GridFile is closed")
355	default:
356		panic("internal error: missing GridFile mode")
357	}
358}
359
360// SetChunkSize sets size of saved chunks.  Once the file is written to, it
361// will be split in blocks of that size and each block saved into an
362// independent chunk document.  The default chunk size is 255kb.
363//
364// It is a runtime error to call this function once the file has started
365// being written to.
366func (file *GridFile) SetChunkSize(bytes int) {
367	file.assertMode(gfsWriting)
368	debugf("GridFile %p: setting chunk size to %d", file, bytes)
369	file.m.Lock()
370	file.doc.ChunkSize = bytes
371	file.m.Unlock()
372}
373
374// Id returns the current file Id.
375func (file *GridFile) Id() interface{} {
376	return file.doc.Id
377}
378
379// SetId changes the current file Id.
380//
381// It is a runtime error to call this function once the file has started
382// being written to, or when the file is not open for writing.
383func (file *GridFile) SetId(id interface{}) {
384	file.assertMode(gfsWriting)
385	file.m.Lock()
386	file.doc.Id = id
387	file.m.Unlock()
388}
389
390// Name returns the optional file name.  An empty string will be returned
391// in case it is unset.
392func (file *GridFile) Name() string {
393	return file.doc.Filename
394}
395
396// SetName changes the optional file name.  An empty string may be used to
397// unset it.
398//
399// It is a runtime error to call this function when the file is not open
400// for writing.
401func (file *GridFile) SetName(name string) {
402	file.assertMode(gfsWriting)
403	file.m.Lock()
404	file.doc.Filename = name
405	file.m.Unlock()
406}
407
408// ContentType returns the optional file content type.  An empty string will be
409// returned in case it is unset.
410func (file *GridFile) ContentType() string {
411	return file.doc.ContentType
412}
413
414// ContentType changes the optional file content type.  An empty string may be
415// used to unset it.
416//
417// It is a runtime error to call this function when the file is not open
418// for writing.
419func (file *GridFile) SetContentType(ctype string) {
420	file.assertMode(gfsWriting)
421	file.m.Lock()
422	file.doc.ContentType = ctype
423	file.m.Unlock()
424}
425
426// GetMeta unmarshals the optional "metadata" field associated with the
427// file into the result parameter. The meaning of keys under that field
428// is user-defined. For example:
429//
430//     result := struct{ INode int }{}
431//     err = file.GetMeta(&result)
432//     if err != nil {
433//         panic(err.String())
434//     }
435//     fmt.Printf("inode: %d\n", result.INode)
436//
437func (file *GridFile) GetMeta(result interface{}) (err error) {
438	file.m.Lock()
439	if file.doc.Metadata != nil {
440		err = bson.Unmarshal(file.doc.Metadata.Data, result)
441	}
442	file.m.Unlock()
443	return
444}
445
446// SetMeta changes the optional "metadata" field associated with the
447// file. The meaning of keys under that field is user-defined.
448// For example:
449//
450//     file.SetMeta(bson.M{"inode": inode})
451//
452// It is a runtime error to call this function when the file is not open
453// for writing.
454func (file *GridFile) SetMeta(metadata interface{}) {
455	file.assertMode(gfsWriting)
456	data, err := bson.Marshal(metadata)
457	file.m.Lock()
458	if err != nil && file.err == nil {
459		file.err = err
460	} else {
461		file.doc.Metadata = &bson.Raw{Data: data}
462	}
463	file.m.Unlock()
464}
465
466// Size returns the file size in bytes.
467func (file *GridFile) Size() (bytes int64) {
468	file.m.Lock()
469	bytes = file.doc.Length
470	file.m.Unlock()
471	return
472}
473
474// MD5 returns the file MD5 as a hex-encoded string.
475func (file *GridFile) MD5() (md5 string) {
476	return file.doc.MD5
477}
478
479// UploadDate returns the file upload time.
480func (file *GridFile) UploadDate() time.Time {
481	return file.doc.UploadDate
482}
483
484// SetUploadDate changes the file upload time.
485//
486// It is a runtime error to call this function when the file is not open
487// for writing.
488func (file *GridFile) SetUploadDate(t time.Time) {
489	file.assertMode(gfsWriting)
490	file.m.Lock()
491	file.doc.UploadDate = t
492	file.m.Unlock()
493}
494
495// Close flushes any pending changes in case the file is being written
496// to, waits for any background operations to finish, and closes the file.
497//
498// It's important to Close files whether they are being written to
499// or read from, and to check the err result to ensure the operation
500// completed successfully.
501func (file *GridFile) Close() (err error) {
502	file.m.Lock()
503	defer file.m.Unlock()
504	if file.mode == gfsWriting {
505		if len(file.wbuf) > 0 && file.err == nil {
506			file.insertChunk(file.wbuf)
507			file.wbuf = file.wbuf[0:0]
508		}
509		file.completeWrite()
510	} else if file.mode == gfsReading && file.rcache != nil {
511		file.rcache.wait.Lock()
512		file.rcache = nil
513	}
514	file.mode = gfsClosed
515	debugf("GridFile %p: closed", file)
516	return file.err
517}
518
519func (file *GridFile) completeWrite() {
520	for file.wpending > 0 {
521		debugf("GridFile %p: waiting for %d pending chunks to complete file write", file, file.wpending)
522		file.c.Wait()
523	}
524	if file.err == nil {
525		hexsum := hex.EncodeToString(file.wsum.Sum(nil))
526		if file.doc.UploadDate.IsZero() {
527			file.doc.UploadDate = bson.Now()
528		}
529		file.doc.MD5 = hexsum
530		file.err = file.gfs.Files.Insert(file.doc)
531	}
532	if file.err != nil {
533		file.gfs.Chunks.RemoveAll(bson.D{{"files_id", file.doc.Id}})
534	}
535	if file.err == nil {
536		index := Index{
537			Key:    []string{"files_id", "n"},
538			Unique: true,
539		}
540		file.err = file.gfs.Chunks.EnsureIndex(index)
541	}
542}
543
544// Abort cancels an in-progress write, preventing the file from being
545// automically created and ensuring previously written chunks are
546// removed when the file is closed.
547//
548// It is a runtime error to call Abort when the file was not opened
549// for writing.
550func (file *GridFile) Abort() {
551	if file.mode != gfsWriting {
552		panic("file.Abort must be called on file opened for writing")
553	}
554	file.err = errors.New("write aborted")
555}
556
557// Write writes the provided data to the file and returns the
558// number of bytes written and an error in case something
559// wrong happened.
560//
561// The file will internally cache the data so that all but the last
562// chunk sent to the database have the size defined by SetChunkSize.
563// This also means that errors may be deferred until a future call
564// to Write or Close.
565//
566// The parameters and behavior of this function turn the file
567// into an io.Writer.
568func (file *GridFile) Write(data []byte) (n int, err error) {
569	file.assertMode(gfsWriting)
570	file.m.Lock()
571	debugf("GridFile %p: writing %d bytes", file, len(data))
572	defer file.m.Unlock()
573
574	if file.err != nil {
575		return 0, file.err
576	}
577
578	n = len(data)
579	file.doc.Length += int64(n)
580	chunkSize := file.doc.ChunkSize
581
582	if len(file.wbuf)+len(data) < chunkSize {
583		file.wbuf = append(file.wbuf, data...)
584		return
585	}
586
587	// First, flush file.wbuf complementing with data.
588	if len(file.wbuf) > 0 {
589		missing := chunkSize - len(file.wbuf)
590		if missing > len(data) {
591			missing = len(data)
592		}
593		file.wbuf = append(file.wbuf, data[:missing]...)
594		data = data[missing:]
595		file.insertChunk(file.wbuf)
596		file.wbuf = file.wbuf[0:0]
597	}
598
599	// Then, flush all chunks from data without copying.
600	for len(data) > chunkSize {
601		size := chunkSize
602		if size > len(data) {
603			size = len(data)
604		}
605		file.insertChunk(data[:size])
606		data = data[size:]
607	}
608
609	// And append the rest for a future call.
610	file.wbuf = append(file.wbuf, data...)
611
612	return n, file.err
613}
614
615func (file *GridFile) insertChunk(data []byte) {
616	n := file.chunk
617	file.chunk++
618	debugf("GridFile %p: adding to checksum: %q", file, string(data))
619	file.wsum.Write(data)
620
621	for file.doc.ChunkSize*file.wpending >= 1024*1024 {
622		// Hold on.. we got a MB pending.
623		file.c.Wait()
624		if file.err != nil {
625			return
626		}
627	}
628
629	file.wpending++
630
631	debugf("GridFile %p: inserting chunk %d with %d bytes", file, n, len(data))
632
633	// We may not own the memory of data, so rather than
634	// simply copying it, we'll marshal the document ahead of time.
635	data, err := bson.Marshal(gfsChunk{bson.NewObjectId(), file.doc.Id, n, data})
636	if err != nil {
637		file.err = err
638		return
639	}
640
641	go func() {
642		err := file.gfs.Chunks.Insert(bson.Raw{Data: data})
643		file.m.Lock()
644		file.wpending--
645		if err != nil && file.err == nil {
646			file.err = err
647		}
648		file.c.Broadcast()
649		file.m.Unlock()
650	}()
651}
652
653// Seek sets the offset for the next Read or Write on file to
654// offset, interpreted according to whence: 0 means relative to
655// the origin of the file, 1 means relative to the current offset,
656// and 2 means relative to the end. It returns the new offset and
657// an error, if any.
658func (file *GridFile) Seek(offset int64, whence int) (pos int64, err error) {
659	file.m.Lock()
660	debugf("GridFile %p: seeking for %s (whence=%d)", file, offset, whence)
661	defer file.m.Unlock()
662	switch whence {
663	case os.SEEK_SET:
664	case os.SEEK_CUR:
665		offset += file.offset
666	case os.SEEK_END:
667		offset += file.doc.Length
668	default:
669		panic("unsupported whence value")
670	}
671	if offset > file.doc.Length {
672		return file.offset, errors.New("seek past end of file")
673	}
674	if offset == file.doc.Length {
675		// If we're seeking to the end of the file,
676		// no need to read anything. This enables
677		// a client to find the size of the file using only the
678		// io.ReadSeeker interface with low overhead.
679		file.offset = offset
680		return file.offset, nil
681	}
682	chunk := int(offset / int64(file.doc.ChunkSize))
683	if chunk+1 == file.chunk && offset >= file.offset {
684		file.rbuf = file.rbuf[int(offset-file.offset):]
685		file.offset = offset
686		return file.offset, nil
687	}
688	file.offset = offset
689	file.chunk = chunk
690	file.rbuf = nil
691	file.rbuf, err = file.getChunk()
692	if err == nil {
693		file.rbuf = file.rbuf[int(file.offset-int64(chunk)*int64(file.doc.ChunkSize)):]
694	}
695	return file.offset, err
696}
697
698// Read reads into b the next available data from the file and
699// returns the number of bytes written and an error in case
700// something wrong happened.  At the end of the file, n will
701// be zero and err will be set to io.EOF.
702//
703// The parameters and behavior of this function turn the file
704// into an io.Reader.
705func (file *GridFile) Read(b []byte) (n int, err error) {
706	file.assertMode(gfsReading)
707	file.m.Lock()
708	debugf("GridFile %p: reading at offset %d into buffer of length %d", file, file.offset, len(b))
709	defer file.m.Unlock()
710	if file.offset == file.doc.Length {
711		return 0, io.EOF
712	}
713	for err == nil {
714		i := copy(b, file.rbuf)
715		n += i
716		file.offset += int64(i)
717		file.rbuf = file.rbuf[i:]
718		if i == len(b) || file.offset == file.doc.Length {
719			break
720		}
721		b = b[i:]
722		file.rbuf, err = file.getChunk()
723	}
724	return n, err
725}
726
727func (file *GridFile) getChunk() (data []byte, err error) {
728	cache := file.rcache
729	file.rcache = nil
730	if cache != nil && cache.n == file.chunk {
731		debugf("GridFile %p: Getting chunk %d from cache", file, file.chunk)
732		cache.wait.Lock()
733		data, err = cache.data, cache.err
734	} else {
735		debugf("GridFile %p: Fetching chunk %d", file, file.chunk)
736		var doc gfsChunk
737		err = file.gfs.Chunks.Find(bson.D{{"files_id", file.doc.Id}, {"n", file.chunk}}).One(&doc)
738		data = doc.Data
739	}
740	file.chunk++
741	if int64(file.chunk)*int64(file.doc.ChunkSize) < file.doc.Length {
742		// Read the next one in background.
743		cache = &gfsCachedChunk{n: file.chunk}
744		cache.wait.Lock()
745		debugf("GridFile %p: Scheduling chunk %d for background caching", file, file.chunk)
746		// Clone the session to avoid having it closed in between.
747		chunks := file.gfs.Chunks
748		session := chunks.Database.Session.Clone()
749		go func(id interface{}, n int) {
750			defer session.Close()
751			chunks = chunks.With(session)
752			var doc gfsChunk
753			cache.err = chunks.Find(bson.D{{"files_id", id}, {"n", n}}).One(&doc)
754			cache.data = doc.Data
755			cache.wait.Unlock()
756		}(file.doc.Id, file.chunk)
757		file.rcache = cache
758	}
759	debugf("Returning err: %#v", err)
760	return
761}
762