1"""
2$URL: svn+ssh://svn.mems-exchange.org/repos/trunk/durus/test/utest_file_storage.py $
3$Id: utest_file_storage.py 31610 2009-05-03 22:07:44Z dbinger $
4"""
5from durus.connection import Connection
6from durus.file import File
7from durus.file_storage import TempFileStorage, FileStorage
8from durus.logger import direct_output
9from durus.persistent import Persistent
10from durus.serialize import pack_record
11from durus.utils import int8_to_str, ShortRead, write_int4_str, as_bytes
12from os import unlink
13from sancho.utest import UTest, raises
14from tempfile import mktemp
15import os
16import sys
17
18
19class FileStorageTest (UTest):
20
21    def _pre(self):
22        direct_output(sys.stdout)
23
24    def check_file_storage(self):
25        name = mktemp()
26        b = FileStorage(name)
27        assert b.new_oid() == int8_to_str(0)
28        assert b.new_oid() == int8_to_str(1)
29        assert b.new_oid() == int8_to_str(2)
30        raises(KeyError, b.load, int8_to_str(0))
31        record = pack_record(int8_to_str(0), as_bytes('ok'), as_bytes(''))
32        b.begin()
33        b.store(int8_to_str(0), record)
34        b.end()
35        b.sync()
36        b.begin()
37        b.store(int8_to_str(1), pack_record(
38            int8_to_str(1), as_bytes('no'), as_bytes('')))
39        b.end()
40        assert len(list(b.gen_oid_record(start_oid=int8_to_str(0)))) == 1
41        assert len(list(b.gen_oid_record())) == 2
42        b.pack()
43        b.close()
44        unlink(name + '.prepack')
45        raises(ValueError, b.pack) # storage closed
46        unlink(name + '.pack')
47        raises(ValueError, b.load, int8_to_str(0)) # storage closed
48        unlink(name)
49
50    def check_reopen(self):
51        f = TempFileStorage()
52        filename = f.get_filename()
53        if os.name == 'nt':
54            f.close() # don't try to re-open an open file on windows
55            return
56        g = FileStorage(filename, readonly=True)
57        raises(IOError, FileStorage, filename)
58        f.close()
59        g.close()
60
61    def check_open_empty(self):
62        name = mktemp()
63        f = open(name, 'w')
64        f.close()
65        s = FileStorage(name)
66        s.close()
67        unlink(name)
68
69    def check_short_magic(self):
70        name = mktemp()
71        f = open(name, 'w')
72        f.write('b')
73        f.close()
74        raises(AssertionError, FileStorage, name)
75        unlink(name)
76
77    def check_wrong_magic(self):
78        name = mktemp()
79        f = open(name, 'w')
80        f.write('bogusbogus')
81        f.close()
82        raises(AssertionError, FileStorage, name)
83        unlink(name)
84
85    def check_bad_record_size(self):
86        name = mktemp()
87        f = open(name, 'wb')
88        g = FileStorage(name)
89        f.seek(0, 2)
90        write_int4_str(f, 'ok')
91        g.close()
92        f.close()
93        raises(ShortRead, FileStorage, name)
94        unlink(name)
95
96    def check_repair(self):
97        name = mktemp()
98        g = FileStorage(name)
99        g.close()
100        f = open(name, 'r+b')
101        f.seek(0, 2)
102        p = f.tell()
103        f.write(as_bytes('b'))
104        f.flush()
105        raises(ShortRead, FileStorage, name, readonly=True)
106        h = FileStorage(name, repair=True)
107        f.seek(0, 2)
108        assert p == f.tell()
109        f.close()
110        h.close()
111        unlink(name)
112
113
114class ShelfStorageTest (UTest):
115
116    def a(self):
117        f = File(prefix='shelftest')
118        name = f.get_name()
119        f.close()
120        s = FileStorage(name)
121        c = Connection(s)
122        r = c.get_root()
123        for x in range(10):
124            r["a%s" % x] = Persistent()
125            c.commit()
126        deleted_oids = [
127            r['a0']._p_oid, r['a2']._p_oid, r['a7']._p_oid, r['a8']._p_oid]
128        del r['a0']
129        del r['a2']
130        del r['a7']
131        del r['a8']
132        c.commit()
133        c.pack()
134        c.abort()
135        assert c.get(deleted_oids[0])._p_is_ghost()
136        assert c.get(deleted_oids[1])._p_is_ghost()
137        raises(KeyError, getattr, c.get(deleted_oids[0]), 'a')
138        assert len([repr(oid) for oid, record in s.gen_oid_record()]) == 7
139        c.commit()
140        c.pack()
141        new_oid = s.new_oid()
142        assert new_oid == deleted_oids[-1], (new_oid, deleted_oids)
143        new_oid = s.new_oid()
144        assert new_oid == deleted_oids[-2], (new_oid, deleted_oids)
145        new_oid = s.new_oid()
146        assert new_oid == deleted_oids[-3], (new_oid, deleted_oids)
147        new_oid = s.new_oid()
148        assert new_oid == deleted_oids[-4], (new_oid, deleted_oids)
149        new_oid = s.new_oid()
150        assert new_oid == int8_to_str(11), repr(new_oid)
151        new_oid = s.new_oid()
152        assert new_oid == int8_to_str(12), repr(new_oid)
153
154    def b(self):
155        f = File(prefix='shelftest')
156        name = f.get_name()
157        f.close()
158        s = FileStorage(name)
159        c = Connection(s)
160        r = c.get_root()
161        for x in range(10):
162            r["a%s" % x] = Persistent()
163            c.commit()
164        deleted_oid = r['a9']._p_oid
165        del r['a9']
166        c.commit()
167        c.pack()
168        c.abort()
169        assert len([repr(oid) for oid, record in s.gen_oid_record()]) == 10
170        new_oid = s.new_oid()
171        assert new_oid == deleted_oid
172        new_oid = s.new_oid()
173        assert new_oid == int8_to_str(11)
174
175    def c(self):
176        f = File(prefix='shelftest')
177        name = f.get_name()
178        f.close()
179        s = FileStorage(name)
180        c = Connection(s)
181        r = c.get_root()
182        for x in range(10):
183            r["a%s" % x] = Persistent()
184            c.commit()
185        deleted_oid = r['a9']._p_oid
186        del r['a9']
187        c.commit()
188        c.pack()
189        c.abort()
190        r.clear()
191        c.commit()
192        c.pack()
193        c.abort()
194        new_oid = s.new_oid()
195        assert new_oid == int8_to_str(1), repr(new_oid)
196        new_oid = s.new_oid()
197        assert new_oid == int8_to_str(2), repr(new_oid)
198
199
200if __name__ == "__main__":
201    FileStorageTest()
202    ShelfStorageTest()
203