1##############################################################################
2#
3# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
4# All Rights Reserved.
5#
6# This software is subject to the provisions of the Zope Public License,
7# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11# FOR A PARTICULAR PURPOSE.
12#
13##############################################################################
14from ZODB.DB import DB
15from ZODB.tests import (
16    BasicStorage,
17    ConflictResolution,
18    HistoryStorage,
19    IteratorStorage,
20    MTStorage,
21    PackableStorage,
22    RevisionStorage,
23    StorageTestBase,
24    Synchronization,
25    )
26
27import os
28if os.environ.get('USE_ZOPE_TESTING_DOCTEST'):
29    from zope.testing import doctest
30else:
31    import doctest
32import random
33import re
34import transaction
35import unittest
36import ZODB.DemoStorage
37import ZODB.tests.hexstorage
38import ZODB.tests.util
39import ZODB.utils
40
41from ZODB.utils import load_current
42
43from zope.testing import renormalizing
44
45class DemoStorageTests(
46    StorageTestBase.StorageTestBase,
47    BasicStorage.BasicStorage,
48    ConflictResolution.ConflictResolvingStorage,
49    HistoryStorage.HistoryStorage,
50    IteratorStorage.ExtendedIteratorStorage,
51    IteratorStorage.IteratorStorage,
52    MTStorage.MTStorage,
53    PackableStorage.PackableStorage,
54    RevisionStorage.RevisionStorage,
55    Synchronization.SynchronizedStorage,
56    ):
57
58    def setUp(self):
59        StorageTestBase.StorageTestBase.setUp(self)
60        self._storage = ZODB.DemoStorage.DemoStorage()
61
62    def checkOversizeNote(self):
63        # This base class test checks for the common case where a storage
64        # doesnt support huge transaction metadata. This storage doesnt
65        # have this limit, so we inhibit this test here.
66        pass
67
68    def checkLoadDelegation(self):
69        # Minimal test of loadEX w/o version -- ironically
70        db = DB(self._storage) # creates object 0. :)
71        s2 = ZODB.DemoStorage.DemoStorage(base=self._storage)
72        self.assertEqual(load_current(s2, ZODB.utils.z64),
73                         load_current(self._storage, ZODB.utils.z64))
74
75    def checkLengthAndBool(self):
76        self.assertEqual(len(self._storage), 0)
77        self.assertTrue(not self._storage)
78        db = DB(self._storage) # creates object 0. :)
79        self.assertEqual(len(self._storage), 1)
80        self.assertTrue(self._storage)
81        with db.transaction() as conn:
82            for i in range(10):
83                conn.root()[i] = conn.root().__class__()
84        self.assertEqual(len(self._storage), 11)
85        self.assertTrue(self._storage)
86        db.close()
87
88    def checkLoadBeforeUndo(self):
89        pass # we don't support undo yet
90    checkUndoZombie = checkLoadBeforeUndo
91
92    def checkBaseHistory(self):
93        def base_only():
94            yield 11
95            yield 12
96            yield 13
97            self._storage = self._storage.push()
98        self._checkHistory(base_only())
99        self._storage = self._storage.pop()
100        def base_and_changes():
101            yield 11
102            yield 12
103            self._storage = self._storage.push()
104            yield 13
105            yield 14
106        self._checkHistory(base_and_changes())
107        self._storage = self._storage.pop()
108
109class DemoStorageHexTests(DemoStorageTests):
110
111    def setUp(self):
112        StorageTestBase.StorageTestBase.setUp(self)
113        self._storage = ZODB.tests.hexstorage.HexStorage(
114            ZODB.DemoStorage.DemoStorage())
115
116class DemoStorageWrappedBase(DemoStorageTests):
117
118    def setUp(self):
119        StorageTestBase.StorageTestBase.setUp(self)
120        self._base = self._makeBaseStorage()
121        self._storage = ZODB.DemoStorage.DemoStorage(base=self._base)
122
123    def tearDown(self):
124        self._base.close()
125        StorageTestBase.StorageTestBase.tearDown(self)
126
127    def _makeBaseStorage(self):
128        raise NotImplementedError
129
130    def checkPackOnlyOneObject(self):
131        pass # Wrapping demo storages don't do gc
132
133    def checkPackWithMultiDatabaseReferences(self):
134        pass # we never do gc
135    checkPackAllRevisions = checkPackWithMultiDatabaseReferences
136
137class DemoStorageWrappedAroundMappingStorage(DemoStorageWrappedBase):
138
139    def _makeBaseStorage(self):
140        from ZODB.MappingStorage import MappingStorage
141        return MappingStorage()
142
143class DemoStorageWrappedAroundFileStorage(DemoStorageWrappedBase):
144
145    def _makeBaseStorage(self):
146        from ZODB.FileStorage import FileStorage
147        return FileStorage('FileStorageTests.fs')
148
149class DemoStorageWrappedAroundHexMappingStorage(DemoStorageWrappedBase):
150
151    def _makeBaseStorage(self):
152        from ZODB.MappingStorage import MappingStorage
153        return ZODB.tests.hexstorage.HexStorage(MappingStorage())
154
155
156def setUp(test):
157    random.seed(0)
158    ZODB.tests.util.setUp(test)
159
160def testSomeDelegation():
161    r"""
162    >>> import six
163    >>> class S(object):
164    ...     def __init__(self, name):
165    ...         self.name = name
166    ...     def getSize(self):
167    ...         six.print_(self.name, 'size')
168    ...     def close(self):
169    ...         six.print_(self.name, 'closed')
170    ...     sortKey = __len__ = getTid = None
171    ...     tpc_finish = tpc_vote = tpc_transaction = None
172    ...     _lock = ZODB.utils.Lock()
173    ...     getName = lambda self: 'S'
174    ...     isReadOnly = tpc_transaction = None
175    ...     supportsUndo = undo = undoLog = undoInfo = None
176    ...     supportsTransactionalUndo = None
177    ...     def new_oid(self):
178    ...         return '\0' * 8
179    ...     def tpc_begin(self, t, tid, status):
180    ...         six.print_('begin', tid, status)
181    ...     def tpc_abort(self, t):
182    ...         pass
183
184    >>> from ZODB.DemoStorage import DemoStorage
185    >>> storage = DemoStorage(base=S(1), changes=S(2))
186
187    >>> storage.getSize()
188    2 size
189
190    >>> storage.close()
191    1 closed
192    2 closed
193
194    >>> storage.tpc_begin(1, 2, 3)
195    begin 2 3
196    >>> storage.tpc_abort(1)
197
198    >>>
199
200    """
201
202def blob_pos_key_error_with_non_blob_base():
203    """
204    >>> storage = ZODB.DemoStorage.DemoStorage()
205    >>> storage.loadBlob(ZODB.utils.p64(1), ZODB.utils.p64(1))
206    Traceback (most recent call last):
207    ...
208    POSKeyError: 0x01
209
210    >>> storage.openCommittedBlobFile(ZODB.utils.p64(1), ZODB.utils.p64(1))
211    Traceback (most recent call last):
212    ...
213    POSKeyError: 0x01
214
215    """
216
217def load_before_base_storage_current():
218    """
219    Here we'll exercise that DemoStorage's loadBefore method works
220    properly when deferring to a record that is current in the
221    base storage.
222
223    >>> import time
224    >>> import transaction
225    >>> import ZODB.DB
226    >>> import ZODB.DemoStorage
227    >>> import ZODB.MappingStorage
228    >>> import ZODB.utils
229
230    >>> base = ZODB.MappingStorage.MappingStorage()
231    >>> basedb = ZODB.DB(base)
232    >>> conn = basedb.open()
233    >>> conn.root()['foo'] = 'bar'
234    >>> transaction.commit()
235    >>> conn.close()
236    >>> storage = ZODB.DemoStorage.DemoStorage(base=base)
237    >>> db = ZODB.DB(storage)
238    >>> conn = db.open()
239    >>> conn.root()['foo'] = 'baz'
240    >>> time.sleep(.1) # Windows has a low-resolution clock
241    >>> transaction.commit()
242
243    >>> oid = ZODB.utils.z64
244    >>> base_current = load_current(storage.base, oid)
245    >>> tid = ZODB.utils.p64(ZODB.utils.u64(base_current[1]) + 1)
246    >>> base_record = storage.base.loadBefore(oid, tid)
247    >>> base_record[-1] is None
248    True
249    >>> base_current == base_record[:2]
250    True
251
252    >>> t = storage.loadBefore(oid, tid)
253
254    The data and tid are the values from the base storage, but the
255    next tid is from changes.
256
257    >>> t[:2] == base_record[:2]
258    True
259    >>> t[-1] == load_current(storage.changes, oid)[1]
260    True
261
262    >>> conn.close()
263    >>> db.close()
264    >>> base.close()
265    """
266
267def test_suite():
268    suite = unittest.TestSuite((
269        doctest.DocTestSuite(
270            setUp=setUp, tearDown=ZODB.tests.util.tearDown,
271            checker=ZODB.tests.util.checker
272            ),
273        doctest.DocFileSuite(
274            '../DemoStorage.test',
275            setUp=setUp,
276            tearDown=ZODB.tests.util.tearDown,
277            checker=ZODB.tests.util.checker,
278            ),
279        ))
280    suite.addTest(unittest.makeSuite(DemoStorageTests, 'check'))
281    suite.addTest(unittest.makeSuite(DemoStorageHexTests, 'check'))
282    suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundFileStorage,
283                                     'check'))
284    suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundMappingStorage,
285                                     'check'))
286    suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundHexMappingStorage,
287                                     'check'))
288    return suite
289