1""" 2$URL: svn+ssh://svn.mems-exchange.org/repos/trunk/durus/test/utest_file_storage.py $ 3$Id: utest_file_storage.py 31610 2009-05-03 22:07:44Z dbinger $ 4""" 5from durus.connection import Connection 6from durus.file import File 7from durus.file_storage import TempFileStorage, FileStorage 8from durus.logger import direct_output 9from durus.persistent import Persistent 10from durus.serialize import pack_record 11from durus.utils import int8_to_str, ShortRead, write_int4_str, as_bytes 12from os import unlink 13from sancho.utest import UTest, raises 14from tempfile import mktemp 15import os 16import sys 17 18 19class FileStorageTest (UTest): 20 21 def _pre(self): 22 direct_output(sys.stdout) 23 24 def check_file_storage(self): 25 name = mktemp() 26 b = FileStorage(name) 27 assert b.new_oid() == int8_to_str(0) 28 assert b.new_oid() == int8_to_str(1) 29 assert b.new_oid() == int8_to_str(2) 30 raises(KeyError, b.load, int8_to_str(0)) 31 record = pack_record(int8_to_str(0), as_bytes('ok'), as_bytes('')) 32 b.begin() 33 b.store(int8_to_str(0), record) 34 b.end() 35 b.sync() 36 b.begin() 37 b.store(int8_to_str(1), pack_record( 38 int8_to_str(1), as_bytes('no'), as_bytes(''))) 39 b.end() 40 assert len(list(b.gen_oid_record(start_oid=int8_to_str(0)))) == 1 41 assert len(list(b.gen_oid_record())) == 2 42 b.pack() 43 b.close() 44 unlink(name + '.prepack') 45 raises(ValueError, b.pack) # storage closed 46 unlink(name + '.pack') 47 raises(ValueError, b.load, int8_to_str(0)) # storage closed 48 unlink(name) 49 50 def check_reopen(self): 51 f = TempFileStorage() 52 filename = f.get_filename() 53 if os.name == 'nt': 54 f.close() # don't try to re-open an open file on windows 55 return 56 g = FileStorage(filename, readonly=True) 57 raises(IOError, FileStorage, filename) 58 f.close() 59 g.close() 60 61 def check_open_empty(self): 62 name = mktemp() 63 f = open(name, 'w') 64 f.close() 65 s = FileStorage(name) 66 s.close() 67 unlink(name) 68 69 def check_short_magic(self): 70 name = mktemp() 71 f = open(name, 'w') 72 f.write('b') 73 f.close() 74 raises(AssertionError, FileStorage, name) 75 unlink(name) 76 77 def check_wrong_magic(self): 78 name = mktemp() 79 f = open(name, 'w') 80 f.write('bogusbogus') 81 f.close() 82 raises(AssertionError, FileStorage, name) 83 unlink(name) 84 85 def check_bad_record_size(self): 86 name = mktemp() 87 f = open(name, 'wb') 88 g = FileStorage(name) 89 f.seek(0, 2) 90 write_int4_str(f, 'ok') 91 g.close() 92 f.close() 93 raises(ShortRead, FileStorage, name) 94 unlink(name) 95 96 def check_repair(self): 97 name = mktemp() 98 g = FileStorage(name) 99 g.close() 100 f = open(name, 'r+b') 101 f.seek(0, 2) 102 p = f.tell() 103 f.write(as_bytes('b')) 104 f.flush() 105 raises(ShortRead, FileStorage, name, readonly=True) 106 h = FileStorage(name, repair=True) 107 f.seek(0, 2) 108 assert p == f.tell() 109 f.close() 110 h.close() 111 unlink(name) 112 113 114class ShelfStorageTest (UTest): 115 116 def a(self): 117 f = File(prefix='shelftest') 118 name = f.get_name() 119 f.close() 120 s = FileStorage(name) 121 c = Connection(s) 122 r = c.get_root() 123 for x in range(10): 124 r["a%s" % x] = Persistent() 125 c.commit() 126 deleted_oids = [ 127 r['a0']._p_oid, r['a2']._p_oid, r['a7']._p_oid, r['a8']._p_oid] 128 del r['a0'] 129 del r['a2'] 130 del r['a7'] 131 del r['a8'] 132 c.commit() 133 c.pack() 134 c.abort() 135 assert c.get(deleted_oids[0])._p_is_ghost() 136 assert c.get(deleted_oids[1])._p_is_ghost() 137 raises(KeyError, getattr, c.get(deleted_oids[0]), 'a') 138 assert len([repr(oid) for oid, record in s.gen_oid_record()]) == 7 139 c.commit() 140 c.pack() 141 new_oid = s.new_oid() 142 assert new_oid == deleted_oids[-1], (new_oid, deleted_oids) 143 new_oid = s.new_oid() 144 assert new_oid == deleted_oids[-2], (new_oid, deleted_oids) 145 new_oid = s.new_oid() 146 assert new_oid == deleted_oids[-3], (new_oid, deleted_oids) 147 new_oid = s.new_oid() 148 assert new_oid == deleted_oids[-4], (new_oid, deleted_oids) 149 new_oid = s.new_oid() 150 assert new_oid == int8_to_str(11), repr(new_oid) 151 new_oid = s.new_oid() 152 assert new_oid == int8_to_str(12), repr(new_oid) 153 154 def b(self): 155 f = File(prefix='shelftest') 156 name = f.get_name() 157 f.close() 158 s = FileStorage(name) 159 c = Connection(s) 160 r = c.get_root() 161 for x in range(10): 162 r["a%s" % x] = Persistent() 163 c.commit() 164 deleted_oid = r['a9']._p_oid 165 del r['a9'] 166 c.commit() 167 c.pack() 168 c.abort() 169 assert len([repr(oid) for oid, record in s.gen_oid_record()]) == 10 170 new_oid = s.new_oid() 171 assert new_oid == deleted_oid 172 new_oid = s.new_oid() 173 assert new_oid == int8_to_str(11) 174 175 def c(self): 176 f = File(prefix='shelftest') 177 name = f.get_name() 178 f.close() 179 s = FileStorage(name) 180 c = Connection(s) 181 r = c.get_root() 182 for x in range(10): 183 r["a%s" % x] = Persistent() 184 c.commit() 185 deleted_oid = r['a9']._p_oid 186 del r['a9'] 187 c.commit() 188 c.pack() 189 c.abort() 190 r.clear() 191 c.commit() 192 c.pack() 193 c.abort() 194 new_oid = s.new_oid() 195 assert new_oid == int8_to_str(1), repr(new_oid) 196 new_oid = s.new_oid() 197 assert new_oid == int8_to_str(2), repr(new_oid) 198 199 200if __name__ == "__main__": 201 FileStorageTest() 202 ShelfStorageTest() 203