1"""Miscellaneous bsddb module test cases
2"""
3
4import os, sys
5import unittest
6
7from test_all import db, dbshelve, hashopen, test_support, get_new_environment_path, get_new_database_path
8
9#----------------------------------------------------------------------
10
11class MiscTestCase(unittest.TestCase):
12    def setUp(self):
13        self.filename = get_new_database_path()
14        self.homeDir = get_new_environment_path()
15
16    def tearDown(self):
17        test_support.unlink(self.filename)
18        test_support.rmtree(self.homeDir)
19
20    def test01_badpointer(self):
21        dbs = dbshelve.open(self.filename)
22        dbs.close()
23        self.assertRaises(db.DBError, dbs.get, "foo")
24
25    def test02_db_home(self):
26        env = db.DBEnv()
27        # check for crash fixed when db_home is used before open()
28        self.assertIsNone(env.db_home)
29        env.open(self.homeDir, db.DB_CREATE)
30        if sys.version_info[0] < 3 :
31            self.assertEqual(self.homeDir, env.db_home)
32        else :
33            self.assertEqual(bytes(self.homeDir, "ascii"), env.db_home)
34
35    def test03_repr_closed_db(self):
36        db = hashopen(self.filename)
37        db.close()
38        rp = repr(db)
39        self.assertEqual(rp, "{}")
40
41    def test04_repr_db(self) :
42        db = hashopen(self.filename)
43        d = {}
44        for i in xrange(100) :
45            db[repr(i)] = repr(100*i)
46            d[repr(i)] = repr(100*i)
47        db.close()
48        db = hashopen(self.filename)
49        rp = repr(db)
50        self.assertEqual(rp, repr(d))
51        db.close()
52
53    # http://sourceforge.net/tracker/index.php?func=detail&aid=1708868&group_id=13900&atid=313900
54    #
55    # See the bug report for details.
56    #
57    # The problem was that make_key_dbt() was not allocating a copy of
58    # string keys but FREE_DBT() was always being told to free it when the
59    # database was opened with DB_THREAD.
60    def test05_double_free_make_key_dbt(self):
61        try:
62            db1 = db.DB()
63            db1.open(self.filename, None, db.DB_BTREE,
64                     db.DB_CREATE | db.DB_THREAD)
65
66            curs = db1.cursor()
67            t = curs.get("/foo", db.DB_SET)
68            # double free happened during exit from DBC_get
69        finally:
70            db1.close()
71            test_support.unlink(self.filename)
72
73    def test06_key_with_null_bytes(self):
74        try:
75            db1 = db.DB()
76            db1.open(self.filename, None, db.DB_HASH, db.DB_CREATE)
77            db1['a'] = 'eh?'
78            db1['a\x00'] = 'eh zed.'
79            db1['a\x00a'] = 'eh zed eh?'
80            db1['aaa'] = 'eh eh eh!'
81            keys = db1.keys()
82            keys.sort()
83            self.assertEqual(['a', 'a\x00', 'a\x00a', 'aaa'], keys)
84            self.assertEqual(db1['a'], 'eh?')
85            self.assertEqual(db1['a\x00'], 'eh zed.')
86            self.assertEqual(db1['a\x00a'], 'eh zed eh?')
87            self.assertEqual(db1['aaa'], 'eh eh eh!')
88        finally:
89            db1.close()
90            test_support.unlink(self.filename)
91
92    def test07_DB_set_flags_persists(self):
93        try:
94            db1 = db.DB()
95            db1.set_flags(db.DB_DUPSORT)
96            db1.open(self.filename, db.DB_HASH, db.DB_CREATE)
97            db1['a'] = 'eh'
98            db1['a'] = 'A'
99            self.assertEqual([('a', 'A')], db1.items())
100            db1.put('a', 'Aa')
101            self.assertEqual([('a', 'A'), ('a', 'Aa')], db1.items())
102            db1.close()
103            db1 = db.DB()
104            # no set_flags call, we're testing that it reads and obeys
105            # the flags on open.
106            db1.open(self.filename, db.DB_HASH)
107            self.assertEqual([('a', 'A'), ('a', 'Aa')], db1.items())
108            # if it read the flags right this will replace all values
109            # for key 'a' instead of adding a new one.  (as a dict should)
110            db1['a'] = 'new A'
111            self.assertEqual([('a', 'new A')], db1.items())
112        finally:
113            db1.close()
114            test_support.unlink(self.filename)
115
116
117    def test08_ExceptionTypes(self) :
118        self.assertTrue(issubclass(db.DBError, Exception))
119        for i, j in db.__dict__.items() :
120            if i.startswith("DB") and i.endswith("Error") :
121                self.assertTrue(issubclass(j, db.DBError), msg=i)
122                if i not in ("DBKeyEmptyError", "DBNotFoundError") :
123                    self.assertFalse(issubclass(j, KeyError), msg=i)
124
125        # This two exceptions have two bases
126        self.assertTrue(issubclass(db.DBKeyEmptyError, KeyError))
127        self.assertTrue(issubclass(db.DBNotFoundError, KeyError))
128
129
130#----------------------------------------------------------------------
131
132
133def test_suite():
134    return unittest.makeSuite(MiscTestCase)
135
136
137if __name__ == '__main__':
138    unittest.main(defaultTest='test_suite')
139