]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | #! /usr/bin/env python\r |
2 | """Test script for the bsddb C module by Roger E. Masse\r | |
3 | Adapted to unittest format and expanded scope by Raymond Hettinger\r | |
4 | """\r | |
5 | import os, sys\r | |
6 | import unittest\r | |
7 | from test import test_support\r | |
8 | \r | |
9 | # Skip test if _bsddb wasn't built.\r | |
10 | test_support.import_module('_bsddb')\r | |
11 | \r | |
12 | bsddb = test_support.import_module('bsddb', deprecated=True)\r | |
13 | # Just so we know it's imported:\r | |
14 | test_support.import_module('dbhash', deprecated=True)\r | |
15 | \r | |
16 | \r | |
17 | class TestBSDDB(unittest.TestCase):\r | |
18 | openflag = 'c'\r | |
19 | \r | |
20 | def setUp(self):\r | |
21 | self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768)\r | |
22 | self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='')\r | |
23 | for k, v in self.d.iteritems():\r | |
24 | self.f[k] = v\r | |
25 | \r | |
26 | def tearDown(self):\r | |
27 | self.f.sync()\r | |
28 | self.f.close()\r | |
29 | if self.fname is None:\r | |
30 | return\r | |
31 | try:\r | |
32 | os.remove(self.fname)\r | |
33 | except os.error:\r | |
34 | pass\r | |
35 | \r | |
36 | def test_getitem(self):\r | |
37 | for k, v in self.d.iteritems():\r | |
38 | self.assertEqual(self.f[k], v)\r | |
39 | \r | |
40 | def test_len(self):\r | |
41 | self.assertEqual(len(self.f), len(self.d))\r | |
42 | \r | |
43 | def test_change(self):\r | |
44 | self.f['r'] = 'discovered'\r | |
45 | self.assertEqual(self.f['r'], 'discovered')\r | |
46 | self.assertIn('r', self.f.keys())\r | |
47 | self.assertIn('discovered', self.f.values())\r | |
48 | \r | |
49 | def test_close_and_reopen(self):\r | |
50 | if self.fname is None:\r | |
51 | # if we're using an in-memory only db, we can't reopen it\r | |
52 | # so finish here.\r | |
53 | return\r | |
54 | self.f.close()\r | |
55 | self.f = self.openmethod[0](self.fname, 'w')\r | |
56 | for k, v in self.d.iteritems():\r | |
57 | self.assertEqual(self.f[k], v)\r | |
58 | \r | |
59 | def assertSetEquals(self, seqn1, seqn2):\r | |
60 | self.assertEqual(set(seqn1), set(seqn2))\r | |
61 | \r | |
62 | def test_mapping_iteration_methods(self):\r | |
63 | f = self.f\r | |
64 | d = self.d\r | |
65 | self.assertSetEquals(d, f)\r | |
66 | self.assertSetEquals(d.keys(), f.keys())\r | |
67 | self.assertSetEquals(d.values(), f.values())\r | |
68 | self.assertSetEquals(d.items(), f.items())\r | |
69 | self.assertSetEquals(d.iterkeys(), f.iterkeys())\r | |
70 | self.assertSetEquals(d.itervalues(), f.itervalues())\r | |
71 | self.assertSetEquals(d.iteritems(), f.iteritems())\r | |
72 | \r | |
73 | def test_iter_while_modifying_values(self):\r | |
74 | di = iter(self.d)\r | |
75 | while 1:\r | |
76 | try:\r | |
77 | key = di.next()\r | |
78 | self.d[key] = 'modified '+key\r | |
79 | except StopIteration:\r | |
80 | break\r | |
81 | \r | |
82 | # it should behave the same as a dict. modifying values\r | |
83 | # of existing keys should not break iteration. (adding\r | |
84 | # or removing keys should)\r | |
85 | loops_left = len(self.f)\r | |
86 | fi = iter(self.f)\r | |
87 | while 1:\r | |
88 | try:\r | |
89 | key = fi.next()\r | |
90 | self.f[key] = 'modified '+key\r | |
91 | loops_left -= 1\r | |
92 | except StopIteration:\r | |
93 | break\r | |
94 | self.assertEqual(loops_left, 0)\r | |
95 | \r | |
96 | self.test_mapping_iteration_methods()\r | |
97 | \r | |
98 | def test_iter_abort_on_changed_size(self):\r | |
99 | def DictIterAbort():\r | |
100 | di = iter(self.d)\r | |
101 | while 1:\r | |
102 | try:\r | |
103 | di.next()\r | |
104 | self.d['newkey'] = 'SPAM'\r | |
105 | except StopIteration:\r | |
106 | break\r | |
107 | self.assertRaises(RuntimeError, DictIterAbort)\r | |
108 | \r | |
109 | def DbIterAbort():\r | |
110 | fi = iter(self.f)\r | |
111 | while 1:\r | |
112 | try:\r | |
113 | fi.next()\r | |
114 | self.f['newkey'] = 'SPAM'\r | |
115 | except StopIteration:\r | |
116 | break\r | |
117 | self.assertRaises(RuntimeError, DbIterAbort)\r | |
118 | \r | |
119 | def test_iteritems_abort_on_changed_size(self):\r | |
120 | def DictIteritemsAbort():\r | |
121 | di = self.d.iteritems()\r | |
122 | while 1:\r | |
123 | try:\r | |
124 | di.next()\r | |
125 | self.d['newkey'] = 'SPAM'\r | |
126 | except StopIteration:\r | |
127 | break\r | |
128 | self.assertRaises(RuntimeError, DictIteritemsAbort)\r | |
129 | \r | |
130 | def DbIteritemsAbort():\r | |
131 | fi = self.f.iteritems()\r | |
132 | while 1:\r | |
133 | try:\r | |
134 | key, value = fi.next()\r | |
135 | del self.f[key]\r | |
136 | except StopIteration:\r | |
137 | break\r | |
138 | self.assertRaises(RuntimeError, DbIteritemsAbort)\r | |
139 | \r | |
140 | def test_iteritems_while_modifying_values(self):\r | |
141 | di = self.d.iteritems()\r | |
142 | while 1:\r | |
143 | try:\r | |
144 | k, v = di.next()\r | |
145 | self.d[k] = 'modified '+v\r | |
146 | except StopIteration:\r | |
147 | break\r | |
148 | \r | |
149 | # it should behave the same as a dict. modifying values\r | |
150 | # of existing keys should not break iteration. (adding\r | |
151 | # or removing keys should)\r | |
152 | loops_left = len(self.f)\r | |
153 | fi = self.f.iteritems()\r | |
154 | while 1:\r | |
155 | try:\r | |
156 | k, v = fi.next()\r | |
157 | self.f[k] = 'modified '+v\r | |
158 | loops_left -= 1\r | |
159 | except StopIteration:\r | |
160 | break\r | |
161 | self.assertEqual(loops_left, 0)\r | |
162 | \r | |
163 | self.test_mapping_iteration_methods()\r | |
164 | \r | |
165 | def test_first_next_looping(self):\r | |
166 | items = [self.f.first()]\r | |
167 | for i in xrange(1, len(self.f)):\r | |
168 | items.append(self.f.next())\r | |
169 | self.assertSetEquals(items, self.d.items())\r | |
170 | \r | |
171 | def test_previous_last_looping(self):\r | |
172 | items = [self.f.last()]\r | |
173 | for i in xrange(1, len(self.f)):\r | |
174 | items.append(self.f.previous())\r | |
175 | self.assertSetEquals(items, self.d.items())\r | |
176 | \r | |
177 | def test_first_while_deleting(self):\r | |
178 | # Test for bug 1725856\r | |
179 | self.assertTrue(len(self.d) >= 2, "test requires >=2 items")\r | |
180 | for _ in self.d:\r | |
181 | key = self.f.first()[0]\r | |
182 | del self.f[key]\r | |
183 | self.assertEqual([], self.f.items(), "expected empty db after test")\r | |
184 | \r | |
185 | def test_last_while_deleting(self):\r | |
186 | # Test for bug 1725856's evil twin\r | |
187 | self.assertTrue(len(self.d) >= 2, "test requires >=2 items")\r | |
188 | for _ in self.d:\r | |
189 | key = self.f.last()[0]\r | |
190 | del self.f[key]\r | |
191 | self.assertEqual([], self.f.items(), "expected empty db after test")\r | |
192 | \r | |
193 | def test_set_location(self):\r | |
194 | self.assertEqual(self.f.set_location('e'), ('e', self.d['e']))\r | |
195 | \r | |
196 | def test_contains(self):\r | |
197 | for k in self.d:\r | |
198 | self.assertIn(k, self.f)\r | |
199 | self.assertNotIn('not here', self.f)\r | |
200 | \r | |
201 | def test_has_key(self):\r | |
202 | for k in self.d:\r | |
203 | self.assertTrue(self.f.has_key(k))\r | |
204 | self.assertTrue(not self.f.has_key('not here'))\r | |
205 | \r | |
206 | def test_clear(self):\r | |
207 | self.f.clear()\r | |
208 | self.assertEqual(len(self.f), 0)\r | |
209 | \r | |
210 | def test__no_deadlock_first(self, debug=0):\r | |
211 | # do this so that testers can see what function we're in in\r | |
212 | # verbose mode when we deadlock.\r | |
213 | sys.stdout.flush()\r | |
214 | \r | |
215 | # in pybsddb's _DBWithCursor this causes an internal DBCursor\r | |
216 | # object is created. Other test_ methods in this class could\r | |
217 | # inadvertently cause the deadlock but an explicit test is needed.\r | |
218 | if debug: print "A"\r | |
219 | k,v = self.f.first()\r | |
220 | if debug: print "B", k\r | |
221 | self.f[k] = "deadlock. do not pass go. do not collect $200."\r | |
222 | if debug: print "C"\r | |
223 | # if the bsddb implementation leaves the DBCursor open during\r | |
224 | # the database write and locking+threading support is enabled\r | |
225 | # the cursor's read lock will deadlock the write lock request..\r | |
226 | \r | |
227 | # test the iterator interface\r | |
228 | if True:\r | |
229 | if debug: print "D"\r | |
230 | i = self.f.iteritems()\r | |
231 | k,v = i.next()\r | |
232 | if debug: print "E"\r | |
233 | self.f[k] = "please don't deadlock"\r | |
234 | if debug: print "F"\r | |
235 | while 1:\r | |
236 | try:\r | |
237 | k,v = i.next()\r | |
238 | except StopIteration:\r | |
239 | break\r | |
240 | if debug: print "F2"\r | |
241 | \r | |
242 | i = iter(self.f)\r | |
243 | if debug: print "G"\r | |
244 | while i:\r | |
245 | try:\r | |
246 | if debug: print "H"\r | |
247 | k = i.next()\r | |
248 | if debug: print "I"\r | |
249 | self.f[k] = "deadlocks-r-us"\r | |
250 | if debug: print "J"\r | |
251 | except StopIteration:\r | |
252 | i = None\r | |
253 | if debug: print "K"\r | |
254 | \r | |
255 | # test the legacy cursor interface mixed with writes\r | |
256 | self.assertIn(self.f.first()[0], self.d)\r | |
257 | k = self.f.next()[0]\r | |
258 | self.assertIn(k, self.d)\r | |
259 | self.f[k] = "be gone with ye deadlocks"\r | |
260 | self.assertTrue(self.f[k], "be gone with ye deadlocks")\r | |
261 | \r | |
262 | def test_for_cursor_memleak(self):\r | |
263 | # do the bsddb._DBWithCursor iterator internals leak cursors?\r | |
264 | nc1 = len(self.f._cursor_refs)\r | |
265 | # create iterator\r | |
266 | i = self.f.iteritems()\r | |
267 | nc2 = len(self.f._cursor_refs)\r | |
268 | # use the iterator (should run to the first yield, creating the cursor)\r | |
269 | k, v = i.next()\r | |
270 | nc3 = len(self.f._cursor_refs)\r | |
271 | # destroy the iterator; this should cause the weakref callback\r | |
272 | # to remove the cursor object from self.f._cursor_refs\r | |
273 | del i\r | |
274 | nc4 = len(self.f._cursor_refs)\r | |
275 | \r | |
276 | self.assertEqual(nc1, nc2)\r | |
277 | self.assertEqual(nc1, nc4)\r | |
278 | self.assertTrue(nc3 == nc1+1)\r | |
279 | \r | |
280 | def test_popitem(self):\r | |
281 | k, v = self.f.popitem()\r | |
282 | self.assertIn(k, self.d)\r | |
283 | self.assertIn(v, self.d.values())\r | |
284 | self.assertNotIn(k, self.f)\r | |
285 | self.assertEqual(len(self.d)-1, len(self.f))\r | |
286 | \r | |
287 | def test_pop(self):\r | |
288 | k = 'w'\r | |
289 | v = self.f.pop(k)\r | |
290 | self.assertEqual(v, self.d[k])\r | |
291 | self.assertNotIn(k, self.f)\r | |
292 | self.assertNotIn(v, self.f.values())\r | |
293 | self.assertEqual(len(self.d)-1, len(self.f))\r | |
294 | \r | |
295 | def test_get(self):\r | |
296 | self.assertEqual(self.f.get('NotHere'), None)\r | |
297 | self.assertEqual(self.f.get('NotHere', 'Default'), 'Default')\r | |
298 | self.assertEqual(self.f.get('q', 'Default'), self.d['q'])\r | |
299 | \r | |
300 | def test_setdefault(self):\r | |
301 | self.assertEqual(self.f.setdefault('new', 'dog'), 'dog')\r | |
302 | self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r'])\r | |
303 | \r | |
304 | def test_update(self):\r | |
305 | new = dict(y='life', u='of', i='brian')\r | |
306 | self.f.update(new)\r | |
307 | self.d.update(new)\r | |
308 | for k, v in self.d.iteritems():\r | |
309 | self.assertEqual(self.f[k], v)\r | |
310 | \r | |
311 | def test_keyordering(self):\r | |
312 | if self.openmethod[0] is not bsddb.btopen:\r | |
313 | return\r | |
314 | keys = self.d.keys()\r | |
315 | keys.sort()\r | |
316 | self.assertEqual(self.f.first()[0], keys[0])\r | |
317 | self.assertEqual(self.f.next()[0], keys[1])\r | |
318 | self.assertEqual(self.f.last()[0], keys[-1])\r | |
319 | self.assertEqual(self.f.previous()[0], keys[-2])\r | |
320 | self.assertEqual(list(self.f), keys)\r | |
321 | \r | |
322 | class TestBTree(TestBSDDB):\r | |
323 | fname = test_support.TESTFN\r | |
324 | openmethod = [bsddb.btopen]\r | |
325 | \r | |
326 | class TestBTree_InMemory(TestBSDDB):\r | |
327 | fname = None\r | |
328 | openmethod = [bsddb.btopen]\r | |
329 | \r | |
330 | class TestBTree_InMemory_Truncate(TestBSDDB):\r | |
331 | fname = None\r | |
332 | openflag = 'n'\r | |
333 | openmethod = [bsddb.btopen]\r | |
334 | \r | |
335 | class TestHashTable(TestBSDDB):\r | |
336 | fname = test_support.TESTFN\r | |
337 | openmethod = [bsddb.hashopen]\r | |
338 | \r | |
339 | class TestHashTable_InMemory(TestBSDDB):\r | |
340 | fname = None\r | |
341 | openmethod = [bsddb.hashopen]\r | |
342 | \r | |
343 | ## # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85\r | |
344 | ## # appears broken... at least on\r | |
345 | ## # Solaris Intel - rmasse 1/97\r | |
346 | \r | |
347 | def test_main(verbose=None):\r | |
348 | test_support.run_unittest(\r | |
349 | TestBTree,\r | |
350 | TestHashTable,\r | |
351 | TestBTree_InMemory,\r | |
352 | TestHashTable_InMemory,\r | |
353 | TestBTree_InMemory_Truncate,\r | |
354 | )\r | |
355 | \r | |
356 | if __name__ == "__main__":\r | |
357 | test_main(verbose=True)\r |