1"""Test script for the dbm.open function based on testdumbdbm.py"""
2
3import unittest
4import dbm
5import os
6from test.support import import_helper
7from test.support import os_helper
8
9try:
10    from dbm import ndbm
11except ImportError:
12    ndbm = None
13
14dirname = os_helper.TESTFN
15_fname = os.path.join(dirname, os_helper.TESTFN)
16
17#
18# Iterates over every database module supported by dbm currently available.
19#
20def dbm_iterator():
21    for name in dbm._names:
22        try:
23            mod = __import__(name, fromlist=['open'])
24        except ImportError:
25            continue
26        dbm._modules[name] = mod
27        yield mod
28
29#
30# Clean up all scratch databases we might have created during testing
31#
32def cleaunup_test_dir():
33    os_helper.rmtree(dirname)
34
35def setup_test_dir():
36    cleaunup_test_dir()
37    os.mkdir(dirname)
38
39
40class AnyDBMTestCase:
41    _dict = {'a': b'Python:',
42             'b': b'Programming',
43             'c': b'the',
44             'd': b'way',
45             'f': b'Guido',
46             'g': b'intended',
47             }
48
49    def init_db(self):
50        f = dbm.open(_fname, 'n')
51        for k in self._dict:
52            f[k.encode("ascii")] = self._dict[k]
53        f.close()
54
55    def keys_helper(self, f):
56        keys = sorted(k.decode("ascii") for k in f.keys())
57        dkeys = sorted(self._dict.keys())
58        self.assertEqual(keys, dkeys)
59        return keys
60
61    def test_error(self):
62        self.assertTrue(issubclass(self.module.error, OSError))
63
64    def test_anydbm_not_existing(self):
65        self.assertRaises(dbm.error, dbm.open, _fname)
66
67    def test_anydbm_creation(self):
68        f = dbm.open(_fname, 'c')
69        self.assertEqual(list(f.keys()), [])
70        for key in self._dict:
71            f[key.encode("ascii")] = self._dict[key]
72        self.read_helper(f)
73        f.close()
74
75    def test_anydbm_creation_n_file_exists_with_invalid_contents(self):
76        # create an empty file
77        os_helper.create_empty_file(_fname)
78        with dbm.open(_fname, 'n') as f:
79            self.assertEqual(len(f), 0)
80
81    def test_anydbm_modification(self):
82        self.init_db()
83        f = dbm.open(_fname, 'c')
84        self._dict['g'] = f[b'g'] = b"indented"
85        self.read_helper(f)
86        # setdefault() works as in the dict interface
87        self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo')
88        self.assertEqual(f[b'xxx'], b'foo')
89        f.close()
90
91    def test_anydbm_read(self):
92        self.init_db()
93        f = dbm.open(_fname, 'r')
94        self.read_helper(f)
95        # get() works as in the dict interface
96        self.assertEqual(f.get(b'a'), self._dict['a'])
97        self.assertEqual(f.get(b'xxx', b'foo'), b'foo')
98        self.assertIsNone(f.get(b'xxx'))
99        with self.assertRaises(KeyError):
100            f[b'xxx']
101        f.close()
102
103    def test_anydbm_keys(self):
104        self.init_db()
105        f = dbm.open(_fname, 'r')
106        keys = self.keys_helper(f)
107        f.close()
108
109    def test_empty_value(self):
110        if getattr(dbm._defaultmod, 'library', None) == 'Berkeley DB':
111            self.skipTest("Berkeley DB doesn't distinguish the empty value "
112                          "from the absent one")
113        f = dbm.open(_fname, 'c')
114        self.assertEqual(f.keys(), [])
115        f[b'empty'] = b''
116        self.assertEqual(f.keys(), [b'empty'])
117        self.assertIn(b'empty', f)
118        self.assertEqual(f[b'empty'], b'')
119        self.assertEqual(f.get(b'empty'), b'')
120        self.assertEqual(f.setdefault(b'empty'), b'')
121        f.close()
122
123    def test_anydbm_access(self):
124        self.init_db()
125        f = dbm.open(_fname, 'r')
126        key = "a".encode("ascii")
127        self.assertIn(key, f)
128        assert(f[key] == b"Python:")
129        f.close()
130
131    def test_open_with_bytes(self):
132        dbm.open(os.fsencode(_fname), "c").close()
133
134    def test_open_with_pathlib_path(self):
135        dbm.open(os_helper.FakePath(_fname), "c").close()
136
137    def test_open_with_pathlib_path_bytes(self):
138        dbm.open(os_helper.FakePath(os.fsencode(_fname)), "c").close()
139
140    def read_helper(self, f):
141        keys = self.keys_helper(f)
142        for key in self._dict:
143            self.assertEqual(self._dict[key], f[key.encode("ascii")])
144
145    def test_keys(self):
146        with dbm.open(_fname, 'c') as d:
147            self.assertEqual(d.keys(), [])
148            a = [(b'a', b'b'), (b'12345678910', b'019237410982340912840198242')]
149            for k, v in a:
150                d[k] = v
151            self.assertEqual(sorted(d.keys()), sorted(k for (k, v) in a))
152            for k, v in a:
153                self.assertIn(k, d)
154                self.assertEqual(d[k], v)
155            self.assertNotIn(b'xxx', d)
156            self.assertRaises(KeyError, lambda: d[b'xxx'])
157
158    def setUp(self):
159        self.addCleanup(setattr, dbm, '_defaultmod', dbm._defaultmod)
160        dbm._defaultmod = self.module
161        self.addCleanup(cleaunup_test_dir)
162        setup_test_dir()
163
164
165class WhichDBTestCase(unittest.TestCase):
166    def test_whichdb(self):
167        self.addCleanup(setattr, dbm, '_defaultmod', dbm._defaultmod)
168        _bytes_fname = os.fsencode(_fname)
169        fnames = [_fname, os_helper.FakePath(_fname),
170                  _bytes_fname, os_helper.FakePath(_bytes_fname)]
171        for module in dbm_iterator():
172            # Check whether whichdb correctly guesses module name
173            # for databases opened with "module" module.
174            name = module.__name__
175            setup_test_dir()
176            dbm._defaultmod = module
177            # Try with empty files first
178            with module.open(_fname, 'c'): pass
179            for path in fnames:
180                self.assertEqual(name, self.dbm.whichdb(path))
181            # Now add a key
182            with module.open(_fname, 'w') as f:
183                f[b"1"] = b"1"
184                # and test that we can find it
185                self.assertIn(b"1", f)
186                # and read it
187                self.assertEqual(f[b"1"], b"1")
188            for path in fnames:
189                self.assertEqual(name, self.dbm.whichdb(path))
190
191    @unittest.skipUnless(ndbm, reason='Test requires ndbm')
192    def test_whichdb_ndbm(self):
193        # Issue 17198: check that ndbm which is referenced in whichdb is defined
194        with open(_fname + '.db', 'wb'): pass
195        _bytes_fname = os.fsencode(_fname)
196        fnames = [_fname, os_helper.FakePath(_fname),
197                  _bytes_fname, os_helper.FakePath(_bytes_fname)]
198        for path in fnames:
199            self.assertIsNone(self.dbm.whichdb(path))
200
201    def setUp(self):
202        self.addCleanup(cleaunup_test_dir)
203        setup_test_dir()
204        self.dbm = import_helper.import_fresh_module('dbm')
205
206
207for mod in dbm_iterator():
208    assert mod.__name__.startswith('dbm.')
209    suffix = mod.__name__[4:]
210    testname = f'TestCase_{suffix}'
211    globals()[testname] = type(testname,
212                               (AnyDBMTestCase, unittest.TestCase),
213                               {'module': mod})
214
215
216if __name__ == "__main__":
217    unittest.main()
218