1# Copyright 2011 Matt Chaput. All rights reserved. 2# 3# Redistribution and use in source and binary forms, with or without 4# modification, are permitted provided that the following conditions are met: 5# 6# 1. Redistributions of source code must retain the above copyright notice, 7# this list of conditions and the following disclaimer. 8# 9# 2. Redistributions in binary form must reproduce the above copyright 10# notice, this list of conditions and the following disclaimer in the 11# documentation and/or other materials provided with the distribution. 12# 13# THIS SOFTWARE IS PROVIDED BY MATT CHAPUT ``AS IS'' AND ANY EXPRESS OR 14# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 15# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 16# EVENT SHALL MATT CHAPUT OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 17# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 18# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 19# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 20# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 21# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 22# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 23# 24# The views and conclusions contained in the software and documentation are 25# those of the authors and should not be interpreted as representing official 26# policies, either expressed or implied, of Matt Chaput. 27 28import errno 29import os 30import sys 31from threading import Lock 32from shutil import copyfileobj 33 34try: 35 import mmap 36except ImportError: 37 mmap = None 38 39from whoosh.compat import BytesIO, memoryview_ 40from whoosh.filedb.structfile import BufferFile, StructFile 41from whoosh.filedb.filestore import FileStorage, StorageError 42from whoosh.system import emptybytes 43from whoosh.util import random_name 44 45 46class CompoundStorage(FileStorage): 47 readonly = True 48 49 def __init__(self, dbfile, use_mmap=True, basepos=0): 50 self._file = dbfile 51 self.is_closed = False 52 53 # Seek to the end to get total file size (to check if mmap is OK) 54 dbfile.seek(0, os.SEEK_END) 55 filesize = self._file.tell() 56 dbfile.seek(basepos) 57 58 self._diroffset = self._file.read_long() 59 self._dirlength = self._file.read_int() 60 self._file.seek(self._diroffset) 61 self._dir = self._file.read_pickle() 62 self._options = self._file.read_pickle() 63 self._locks = {} 64 self._source = None 65 66 use_mmap = ( 67 use_mmap 68 and hasattr(self._file, "fileno") # check file is a real file 69 and filesize < sys.maxsize # check fit on 32-bit Python 70 ) 71 if mmap and use_mmap: 72 # Try to open the entire segment as a memory-mapped object 73 try: 74 fileno = self._file.fileno() 75 self._source = mmap.mmap(fileno, 0, access=mmap.ACCESS_READ) 76 except (mmap.error, OSError): 77 e = sys.exc_info()[1] 78 # If we got an error because there wasn't enough memory to 79 # open the map, ignore it and fall through, we'll just use the 80 # (slower) "sub-file" implementation 81 if e.errno == errno.ENOMEM: 82 pass 83 else: 84 raise 85 else: 86 # If that worked, we can close the file handle we were given 87 self._file.close() 88 self._file = None 89 90 def __repr__(self): 91 return "<%s (%s)>" % (self.__class__.__name__, self._name) 92 93 def close(self): 94 if self.is_closed: 95 raise Exception("Already closed") 96 self.is_closed = True 97 98 if self._source: 99 try: 100 self._source.close() 101 except BufferError: 102 del self._source 103 if self._file: 104 self._file.close() 105 106 def range(self, name): 107 try: 108 fileinfo = self._dir[name] 109 except KeyError: 110 raise NameError("Unknown file %r" % (name,)) 111 return fileinfo["offset"], fileinfo["length"] 112 113 def open_file(self, name, *args, **kwargs): 114 if self.is_closed: 115 raise StorageError("Storage was closed") 116 117 offset, length = self.range(name) 118 if self._source: 119 # Create a memoryview/buffer from the mmap 120 buf = memoryview_(self._source, offset, length) 121 f = BufferFile(buf, name=name) 122 elif hasattr(self._file, "subset"): 123 f = self._file.subset(offset, length, name=name) 124 else: 125 f = StructFile(SubFile(self._file, offset, length), name=name) 126 return f 127 128 def list(self): 129 return list(self._dir.keys()) 130 131 def file_exists(self, name): 132 return name in self._dir 133 134 def file_length(self, name): 135 info = self._dir[name] 136 return info["length"] 137 138 def file_modified(self, name): 139 info = self._dir[name] 140 return info["modified"] 141 142 def lock(self, name): 143 if name not in self._locks: 144 self._locks[name] = Lock() 145 return self._locks[name] 146 147 @staticmethod 148 def assemble(dbfile, store, names, **options): 149 assert names, names 150 151 directory = {} 152 basepos = dbfile.tell() 153 dbfile.write_long(0) # Directory position 154 dbfile.write_int(0) # Directory length 155 156 # Copy the files into the compound file 157 for name in names: 158 if name.endswith(".toc") or name.endswith(".seg"): 159 raise Exception(name) 160 161 for name in names: 162 offset = dbfile.tell() 163 length = store.file_length(name) 164 modified = store.file_modified(name) 165 directory[name] = {"offset": offset, "length": length, 166 "modified": modified} 167 f = store.open_file(name) 168 copyfileobj(f, dbfile) 169 f.close() 170 171 CompoundStorage.write_dir(dbfile, basepos, directory, options) 172 173 @staticmethod 174 def write_dir(dbfile, basepos, directory, options=None): 175 options = options or {} 176 177 dirpos = dbfile.tell() # Remember the start of the directory 178 dbfile.write_pickle(directory) # Write the directory 179 dbfile.write_pickle(options) 180 endpos = dbfile.tell() # Remember the end of the directory 181 dbfile.flush() 182 dbfile.seek(basepos) # Seek back to the start 183 dbfile.write_long(dirpos) # Directory position 184 dbfile.write_int(endpos - dirpos) # Directory length 185 186 dbfile.close() 187 188 189class SubFile(object): 190 def __init__(self, parentfile, offset, length, name=None): 191 self._file = parentfile 192 self._offset = offset 193 self._length = length 194 self._end = offset + length 195 self._pos = 0 196 197 self.name = name 198 self.closed = False 199 200 def close(self): 201 self.closed = True 202 203 def subset(self, position, length, name=None): 204 start = self._offset + position 205 end = start + length 206 name = name or self.name 207 assert self._offset >= start >= self._end 208 assert self._offset >= end >= self._end 209 return SubFile(self._file, self._offset + position, length, name=name) 210 211 def read(self, size=None): 212 if size is None: 213 size = self._length - self._pos 214 else: 215 size = min(size, self._length - self._pos) 216 if size < 0: 217 size = 0 218 219 if size > 0: 220 self._file.seek(self._offset + self._pos) 221 self._pos += size 222 return self._file.read(size) 223 else: 224 return emptybytes 225 226 def readline(self): 227 maxsize = self._length - self._pos 228 self._file.seek(self._offset + self._pos) 229 data = self._file.readline() 230 if len(data) > maxsize: 231 data = data[:maxsize] 232 self._pos += len(data) 233 return data 234 235 def seek(self, where, whence=0): 236 if whence == 0: # Absolute 237 pos = where 238 elif whence == 1: # Relative 239 pos = self._pos + where 240 elif whence == 2: # From end 241 pos = self._length - where 242 else: 243 raise ValueError 244 245 self._pos = pos 246 247 def tell(self): 248 return self._pos 249 250 251class CompoundWriter(object): 252 def __init__(self, tempstorage, buffersize=32 * 1024): 253 assert isinstance(buffersize, int) 254 self._tempstorage = tempstorage 255 self._tempname = "%s.ctmp" % random_name() 256 self._temp = tempstorage.create_file(self._tempname, mode="w+b") 257 self._buffersize = buffersize 258 self._streams = {} 259 260 def create_file(self, name): 261 ss = self.SubStream(self._temp, self._buffersize) 262 self._streams[name] = ss 263 return StructFile(ss) 264 265 def _readback(self): 266 temp = self._temp 267 for name, substream in self._streams.items(): 268 substream.close() 269 270 def gen(): 271 for f, offset, length in substream.blocks: 272 if f is None: 273 f = temp 274 f.seek(offset) 275 yield f.read(length) 276 277 yield (name, gen) 278 temp.close() 279 self._tempstorage.delete_file(self._tempname) 280 281 def save_as_compound(self, dbfile): 282 basepos = dbfile.tell() 283 dbfile.write_long(0) # Directory offset 284 dbfile.write_int(0) # Directory length 285 286 directory = {} 287 for name, blocks in self._readback(): 288 filestart = dbfile.tell() 289 for block in blocks(): 290 dbfile.write(block) 291 directory[name] = {"offset": filestart, 292 "length": dbfile.tell() - filestart} 293 294 CompoundStorage.write_dir(dbfile, basepos, directory) 295 296 def save_as_files(self, storage, name_fn): 297 for name, blocks in self._readback(): 298 f = storage.create_file(name_fn(name)) 299 for block in blocks(): 300 f.write(block) 301 f.close() 302 303 class SubStream(object): 304 def __init__(self, dbfile, buffersize): 305 self._dbfile = dbfile 306 self._buffersize = buffersize 307 self._buffer = BytesIO() 308 self.blocks = [] 309 310 def tell(self): 311 return sum(b[2] for b in self.blocks) + self._buffer.tell() 312 313 def write(self, inbytes): 314 bio = self._buffer 315 buflen = bio.tell() 316 length = buflen + len(inbytes) 317 if length >= self._buffersize: 318 offset = self._dbfile.tell() 319 self._dbfile.write(bio.getvalue()[:buflen]) 320 self._dbfile.write(inbytes) 321 322 self.blocks.append((None, offset, length)) 323 self._buffer.seek(0) 324 else: 325 bio.write(inbytes) 326 327 def close(self): 328 bio = self._buffer 329 length = bio.tell() 330 if length: 331 self.blocks.append((bio, 0, length)) 332