1# Copyright 2011 Matt Chaput. All rights reserved.
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions are met:
5#
6#    1. Redistributions of source code must retain the above copyright notice,
7#       this list of conditions and the following disclaimer.
8#
9#    2. Redistributions in binary form must reproduce the above copyright
10#       notice, this list of conditions and the following disclaimer in the
11#       documentation and/or other materials provided with the distribution.
12#
13# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR
14# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
15# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
16# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
17# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
18# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
19# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
20# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
21# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
22# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23#
24# The views and conclusions contained in the software and documentation are
25# those of the authors and should not be interpreted as representing official
26# policies, either expressed or implied, of Matt Chaput.
27
28import errno
29import os
30import sys
31from threading import Lock
32from shutil import copyfileobj
33
34try:
35    import mmap
36except ImportError:
37    mmap = None
38
39from whoosh.compat import BytesIO, memoryview_
40from whoosh.filedb.structfile import BufferFile, StructFile
41from whoosh.filedb.filestore import FileStorage, StorageError
42from whoosh.system import emptybytes
43from whoosh.util import random_name
44
45
46class CompoundStorage(FileStorage):
47    readonly = True
48
49    def __init__(self, dbfile, use_mmap=True, basepos=0):
50        self._file = dbfile
51        self.is_closed = False
52
53        # Seek to the end to get total file size (to check if mmap is OK)
54        dbfile.seek(0, os.SEEK_END)
55        filesize = self._file.tell()
56        dbfile.seek(basepos)
57
58        self._diroffset = self._file.read_long()
59        self._dirlength = self._file.read_int()
60        self._file.seek(self._diroffset)
61        self._dir = self._file.read_pickle()
62        self._options = self._file.read_pickle()
63        self._locks = {}
64        self._source = None
65
66        use_mmap = (
67            use_mmap
68            and hasattr(self._file, "fileno")  # check file is a real file
69            and filesize < sys.maxsize  # check fit on 32-bit Python
70        )
71        if mmap and use_mmap:
72            # Try to open the entire segment as a memory-mapped object
73            try:
74                fileno = self._file.fileno()
75                self._source = mmap.mmap(fileno, 0, access=mmap.ACCESS_READ)
76            except (mmap.error, OSError):
77                e = sys.exc_info()[1]
78                # If we got an error because there wasn't enough memory to
79                # open the map, ignore it and fall through, we'll just use the
80                # (slower) "sub-file" implementation
81                if e.errno == errno.ENOMEM:
82                    pass
83                else:
84                    raise
85            else:
86                # If that worked, we can close the file handle we were given
87                self._file.close()
88                self._file = None
89
90    def __repr__(self):
91        return "<%s (%s)>" % (self.__class__.__name__, self._name)
92
93    def close(self):
94        if self.is_closed:
95            raise Exception("Already closed")
96        self.is_closed = True
97
98        if self._source:
99            try:
100                self._source.close()
101            except BufferError:
102                del self._source
103        if self._file:
104            self._file.close()
105
106    def range(self, name):
107        try:
108            fileinfo = self._dir[name]
109        except KeyError:
110            raise NameError("Unknown file %r" % (name,))
111        return fileinfo["offset"], fileinfo["length"]
112
113    def open_file(self, name, *args, **kwargs):
114        if self.is_closed:
115            raise StorageError("Storage was closed")
116
117        offset, length = self.range(name)
118        if self._source:
119            # Create a memoryview/buffer from the mmap
120            buf = memoryview_(self._source, offset, length)
121            f = BufferFile(buf, name=name)
122        elif hasattr(self._file, "subset"):
123            f = self._file.subset(offset, length, name=name)
124        else:
125            f = StructFile(SubFile(self._file, offset, length), name=name)
126        return f
127
128    def list(self):
129        return list(self._dir.keys())
130
131    def file_exists(self, name):
132        return name in self._dir
133
134    def file_length(self, name):
135        info = self._dir[name]
136        return info["length"]
137
138    def file_modified(self, name):
139        info = self._dir[name]
140        return info["modified"]
141
142    def lock(self, name):
143        if name not in self._locks:
144            self._locks[name] = Lock()
145        return self._locks[name]
146
147    @staticmethod
148    def assemble(dbfile, store, names, **options):
149        assert names, names
150
151        directory = {}
152        basepos = dbfile.tell()
153        dbfile.write_long(0)  # Directory position
154        dbfile.write_int(0)  # Directory length
155
156        # Copy the files into the compound file
157        for name in names:
158            if name.endswith(".toc") or name.endswith(".seg"):
159                raise Exception(name)
160
161        for name in names:
162            offset = dbfile.tell()
163            length = store.file_length(name)
164            modified = store.file_modified(name)
165            directory[name] = {"offset": offset, "length": length,
166                               "modified": modified}
167            f = store.open_file(name)
168            copyfileobj(f, dbfile)
169            f.close()
170
171        CompoundStorage.write_dir(dbfile, basepos, directory, options)
172
173    @staticmethod
174    def write_dir(dbfile, basepos, directory, options=None):
175        options = options or {}
176
177        dirpos = dbfile.tell()  # Remember the start of the directory
178        dbfile.write_pickle(directory)  # Write the directory
179        dbfile.write_pickle(options)
180        endpos = dbfile.tell()  # Remember the end of the directory
181        dbfile.flush()
182        dbfile.seek(basepos)  # Seek back to the start
183        dbfile.write_long(dirpos)  # Directory position
184        dbfile.write_int(endpos - dirpos)  # Directory length
185
186        dbfile.close()
187
188
189class SubFile(object):
190    def __init__(self, parentfile, offset, length, name=None):
191        self._file = parentfile
192        self._offset = offset
193        self._length = length
194        self._end = offset + length
195        self._pos = 0
196
197        self.name = name
198        self.closed = False
199
200    def close(self):
201        self.closed = True
202
203    def subset(self, position, length, name=None):
204        start = self._offset + position
205        end = start + length
206        name = name or self.name
207        assert self._offset >= start >= self._end
208        assert self._offset >= end >= self._end
209        return SubFile(self._file, self._offset + position, length, name=name)
210
211    def read(self, size=None):
212        if size is None:
213            size = self._length - self._pos
214        else:
215            size = min(size, self._length - self._pos)
216        if size < 0:
217            size = 0
218
219        if size > 0:
220            self._file.seek(self._offset + self._pos)
221            self._pos += size
222            return self._file.read(size)
223        else:
224            return emptybytes
225
226    def readline(self):
227        maxsize = self._length - self._pos
228        self._file.seek(self._offset + self._pos)
229        data = self._file.readline()
230        if len(data) > maxsize:
231            data = data[:maxsize]
232        self._pos += len(data)
233        return data
234
235    def seek(self, where, whence=0):
236        if whence == 0:  # Absolute
237            pos = where
238        elif whence == 1:  # Relative
239            pos = self._pos + where
240        elif whence == 2:  # From end
241            pos = self._length - where
242        else:
243            raise ValueError
244
245        self._pos = pos
246
247    def tell(self):
248        return self._pos
249
250
251class CompoundWriter(object):
252    def __init__(self, tempstorage, buffersize=32 * 1024):
253        assert isinstance(buffersize, int)
254        self._tempstorage = tempstorage
255        self._tempname = "%s.ctmp" % random_name()
256        self._temp = tempstorage.create_file(self._tempname, mode="w+b")
257        self._buffersize = buffersize
258        self._streams = {}
259
260    def create_file(self, name):
261        ss = self.SubStream(self._temp, self._buffersize)
262        self._streams[name] = ss
263        return StructFile(ss)
264
265    def _readback(self):
266        temp = self._temp
267        for name, substream in self._streams.items():
268            substream.close()
269
270            def gen():
271                for f, offset, length in substream.blocks:
272                    if f is None:
273                        f = temp
274                    f.seek(offset)
275                    yield f.read(length)
276
277            yield (name, gen)
278        temp.close()
279        self._tempstorage.delete_file(self._tempname)
280
281    def save_as_compound(self, dbfile):
282        basepos = dbfile.tell()
283        dbfile.write_long(0)  # Directory offset
284        dbfile.write_int(0)  # Directory length
285
286        directory = {}
287        for name, blocks in self._readback():
288            filestart = dbfile.tell()
289            for block in blocks():
290                dbfile.write(block)
291            directory[name] = {"offset": filestart,
292                               "length": dbfile.tell() - filestart}
293
294        CompoundStorage.write_dir(dbfile, basepos, directory)
295
296    def save_as_files(self, storage, name_fn):
297        for name, blocks in self._readback():
298            f = storage.create_file(name_fn(name))
299            for block in blocks():
300                f.write(block)
301            f.close()
302
303    class SubStream(object):
304        def __init__(self, dbfile, buffersize):
305            self._dbfile = dbfile
306            self._buffersize = buffersize
307            self._buffer = BytesIO()
308            self.blocks = []
309
310        def tell(self):
311            return sum(b[2] for b in self.blocks) + self._buffer.tell()
312
313        def write(self, inbytes):
314            bio = self._buffer
315            buflen = bio.tell()
316            length = buflen + len(inbytes)
317            if length >= self._buffersize:
318                offset = self._dbfile.tell()
319                self._dbfile.write(bio.getvalue()[:buflen])
320                self._dbfile.write(inbytes)
321
322                self.blocks.append((None, offset, length))
323                self._buffer.seek(0)
324            else:
325                bio.write(inbytes)
326
327        def close(self):
328            bio = self._buffer
329            length = bio.tell()
330            if length:
331                self.blocks.append((bio, 0, length))
332