]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | import os\r |
2 | import sys\r | |
3 | import time\r | |
4 | import stat\r | |
5 | import socket\r | |
6 | import email\r | |
7 | import email.message\r | |
8 | import re\r | |
9 | import StringIO\r | |
10 | from test import test_support\r | |
11 | import unittest\r | |
12 | import mailbox\r | |
13 | import glob\r | |
14 | try:\r | |
15 | import fcntl\r | |
16 | except ImportError:\r | |
17 | pass\r | |
18 | \r | |
19 | # Silence Py3k warning\r | |
20 | rfc822 = test_support.import_module('rfc822', deprecated=True)\r | |
21 | \r | |
22 | class TestBase(unittest.TestCase):\r | |
23 | \r | |
24 | def _check_sample(self, msg):\r | |
25 | # Inspect a mailbox.Message representation of the sample message\r | |
26 | self.assertIsInstance(msg, email.message.Message)\r | |
27 | self.assertIsInstance(msg, mailbox.Message)\r | |
28 | for key, value in _sample_headers.iteritems():\r | |
29 | self.assertIn(value, msg.get_all(key))\r | |
30 | self.assertTrue(msg.is_multipart())\r | |
31 | self.assertEqual(len(msg.get_payload()), len(_sample_payloads))\r | |
32 | for i, payload in enumerate(_sample_payloads):\r | |
33 | part = msg.get_payload(i)\r | |
34 | self.assertIsInstance(part, email.message.Message)\r | |
35 | self.assertNotIsInstance(part, mailbox.Message)\r | |
36 | self.assertEqual(part.get_payload(), payload)\r | |
37 | \r | |
38 | def _delete_recursively(self, target):\r | |
39 | # Delete a file or delete a directory recursively\r | |
40 | if os.path.isdir(target):\r | |
41 | for path, dirs, files in os.walk(target, topdown=False):\r | |
42 | for name in files:\r | |
43 | os.remove(os.path.join(path, name))\r | |
44 | for name in dirs:\r | |
45 | os.rmdir(os.path.join(path, name))\r | |
46 | os.rmdir(target)\r | |
47 | elif os.path.exists(target):\r | |
48 | os.remove(target)\r | |
49 | \r | |
50 | \r | |
51 | class TestMailbox(TestBase):\r | |
52 | \r | |
53 | _factory = None # Overridden by subclasses to reuse tests\r | |
54 | _template = 'From: foo\n\n%s'\r | |
55 | \r | |
56 | def setUp(self):\r | |
57 | self._path = test_support.TESTFN\r | |
58 | self._delete_recursively(self._path)\r | |
59 | self._box = self._factory(self._path)\r | |
60 | \r | |
61 | def tearDown(self):\r | |
62 | self._box.close()\r | |
63 | self._delete_recursively(self._path)\r | |
64 | \r | |
65 | def test_add(self):\r | |
66 | # Add copies of a sample message\r | |
67 | keys = []\r | |
68 | keys.append(self._box.add(self._template % 0))\r | |
69 | self.assertEqual(len(self._box), 1)\r | |
70 | keys.append(self._box.add(mailbox.Message(_sample_message)))\r | |
71 | self.assertEqual(len(self._box), 2)\r | |
72 | keys.append(self._box.add(email.message_from_string(_sample_message)))\r | |
73 | self.assertEqual(len(self._box), 3)\r | |
74 | keys.append(self._box.add(StringIO.StringIO(_sample_message)))\r | |
75 | self.assertEqual(len(self._box), 4)\r | |
76 | keys.append(self._box.add(_sample_message))\r | |
77 | self.assertEqual(len(self._box), 5)\r | |
78 | self.assertEqual(self._box.get_string(keys[0]), self._template % 0)\r | |
79 | for i in (1, 2, 3, 4):\r | |
80 | self._check_sample(self._box[keys[i]])\r | |
81 | \r | |
82 | def test_remove(self):\r | |
83 | # Remove messages using remove()\r | |
84 | self._test_remove_or_delitem(self._box.remove)\r | |
85 | \r | |
86 | def test_delitem(self):\r | |
87 | # Remove messages using __delitem__()\r | |
88 | self._test_remove_or_delitem(self._box.__delitem__)\r | |
89 | \r | |
90 | def _test_remove_or_delitem(self, method):\r | |
91 | # (Used by test_remove() and test_delitem().)\r | |
92 | key0 = self._box.add(self._template % 0)\r | |
93 | key1 = self._box.add(self._template % 1)\r | |
94 | self.assertEqual(len(self._box), 2)\r | |
95 | method(key0)\r | |
96 | l = len(self._box)\r | |
97 | self.assertEqual(l, 1)\r | |
98 | self.assertRaises(KeyError, lambda: self._box[key0])\r | |
99 | self.assertRaises(KeyError, lambda: method(key0))\r | |
100 | self.assertEqual(self._box.get_string(key1), self._template % 1)\r | |
101 | key2 = self._box.add(self._template % 2)\r | |
102 | self.assertEqual(len(self._box), 2)\r | |
103 | method(key2)\r | |
104 | l = len(self._box)\r | |
105 | self.assertEqual(l, 1)\r | |
106 | self.assertRaises(KeyError, lambda: self._box[key2])\r | |
107 | self.assertRaises(KeyError, lambda: method(key2))\r | |
108 | self.assertEqual(self._box.get_string(key1), self._template % 1)\r | |
109 | method(key1)\r | |
110 | self.assertEqual(len(self._box), 0)\r | |
111 | self.assertRaises(KeyError, lambda: self._box[key1])\r | |
112 | self.assertRaises(KeyError, lambda: method(key1))\r | |
113 | \r | |
114 | def test_discard(self, repetitions=10):\r | |
115 | # Discard messages\r | |
116 | key0 = self._box.add(self._template % 0)\r | |
117 | key1 = self._box.add(self._template % 1)\r | |
118 | self.assertEqual(len(self._box), 2)\r | |
119 | self._box.discard(key0)\r | |
120 | self.assertEqual(len(self._box), 1)\r | |
121 | self.assertRaises(KeyError, lambda: self._box[key0])\r | |
122 | self._box.discard(key0)\r | |
123 | self.assertEqual(len(self._box), 1)\r | |
124 | self.assertRaises(KeyError, lambda: self._box[key0])\r | |
125 | \r | |
126 | def test_get(self):\r | |
127 | # Retrieve messages using get()\r | |
128 | key0 = self._box.add(self._template % 0)\r | |
129 | msg = self._box.get(key0)\r | |
130 | self.assertEqual(msg['from'], 'foo')\r | |
131 | self.assertEqual(msg.get_payload(), '0')\r | |
132 | self.assertIs(self._box.get('foo'), None)\r | |
133 | self.assertFalse(self._box.get('foo', False))\r | |
134 | self._box.close()\r | |
135 | self._box = self._factory(self._path, factory=rfc822.Message)\r | |
136 | key1 = self._box.add(self._template % 1)\r | |
137 | msg = self._box.get(key1)\r | |
138 | self.assertEqual(msg['from'], 'foo')\r | |
139 | self.assertEqual(msg.fp.read(), '1')\r | |
140 | \r | |
141 | def test_getitem(self):\r | |
142 | # Retrieve message using __getitem__()\r | |
143 | key0 = self._box.add(self._template % 0)\r | |
144 | msg = self._box[key0]\r | |
145 | self.assertEqual(msg['from'], 'foo')\r | |
146 | self.assertEqual(msg.get_payload(), '0')\r | |
147 | self.assertRaises(KeyError, lambda: self._box['foo'])\r | |
148 | self._box.discard(key0)\r | |
149 | self.assertRaises(KeyError, lambda: self._box[key0])\r | |
150 | \r | |
151 | def test_get_message(self):\r | |
152 | # Get Message representations of messages\r | |
153 | key0 = self._box.add(self._template % 0)\r | |
154 | key1 = self._box.add(_sample_message)\r | |
155 | msg0 = self._box.get_message(key0)\r | |
156 | self.assertIsInstance(msg0, mailbox.Message)\r | |
157 | self.assertEqual(msg0['from'], 'foo')\r | |
158 | self.assertEqual(msg0.get_payload(), '0')\r | |
159 | self._check_sample(self._box.get_message(key1))\r | |
160 | \r | |
161 | def test_get_string(self):\r | |
162 | # Get string representations of messages\r | |
163 | key0 = self._box.add(self._template % 0)\r | |
164 | key1 = self._box.add(_sample_message)\r | |
165 | self.assertEqual(self._box.get_string(key0), self._template % 0)\r | |
166 | self.assertEqual(self._box.get_string(key1), _sample_message)\r | |
167 | \r | |
168 | def test_get_file(self):\r | |
169 | # Get file representations of messages\r | |
170 | key0 = self._box.add(self._template % 0)\r | |
171 | key1 = self._box.add(_sample_message)\r | |
172 | self.assertEqual(self._box.get_file(key0).read().replace(os.linesep, '\n'),\r | |
173 | self._template % 0)\r | |
174 | self.assertEqual(self._box.get_file(key1).read().replace(os.linesep, '\n'),\r | |
175 | _sample_message)\r | |
176 | \r | |
177 | def test_iterkeys(self):\r | |
178 | # Get keys using iterkeys()\r | |
179 | self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)\r | |
180 | \r | |
181 | def test_keys(self):\r | |
182 | # Get keys using keys()\r | |
183 | self._check_iteration(self._box.keys, do_keys=True, do_values=False)\r | |
184 | \r | |
185 | def test_itervalues(self):\r | |
186 | # Get values using itervalues()\r | |
187 | self._check_iteration(self._box.itervalues, do_keys=False,\r | |
188 | do_values=True)\r | |
189 | \r | |
190 | def test_iter(self):\r | |
191 | # Get values using __iter__()\r | |
192 | self._check_iteration(self._box.__iter__, do_keys=False,\r | |
193 | do_values=True)\r | |
194 | \r | |
195 | def test_values(self):\r | |
196 | # Get values using values()\r | |
197 | self._check_iteration(self._box.values, do_keys=False, do_values=True)\r | |
198 | \r | |
199 | def test_iteritems(self):\r | |
200 | # Get keys and values using iteritems()\r | |
201 | self._check_iteration(self._box.iteritems, do_keys=True,\r | |
202 | do_values=True)\r | |
203 | \r | |
204 | def test_items(self):\r | |
205 | # Get keys and values using items()\r | |
206 | self._check_iteration(self._box.items, do_keys=True, do_values=True)\r | |
207 | \r | |
208 | def _check_iteration(self, method, do_keys, do_values, repetitions=10):\r | |
209 | for value in method():\r | |
210 | self.fail("Not empty")\r | |
211 | keys, values = [], []\r | |
212 | for i in xrange(repetitions):\r | |
213 | keys.append(self._box.add(self._template % i))\r | |
214 | values.append(self._template % i)\r | |
215 | if do_keys and not do_values:\r | |
216 | returned_keys = list(method())\r | |
217 | elif do_values and not do_keys:\r | |
218 | returned_values = list(method())\r | |
219 | else:\r | |
220 | returned_keys, returned_values = [], []\r | |
221 | for key, value in method():\r | |
222 | returned_keys.append(key)\r | |
223 | returned_values.append(value)\r | |
224 | if do_keys:\r | |
225 | self.assertEqual(len(keys), len(returned_keys))\r | |
226 | self.assertEqual(set(keys), set(returned_keys))\r | |
227 | if do_values:\r | |
228 | count = 0\r | |
229 | for value in returned_values:\r | |
230 | self.assertEqual(value['from'], 'foo')\r | |
231 | self.assertTrue(int(value.get_payload()) < repetitions,\r | |
232 | (value.get_payload(), repetitions))\r | |
233 | count += 1\r | |
234 | self.assertEqual(len(values), count)\r | |
235 | \r | |
236 | def test_has_key(self):\r | |
237 | # Check existence of keys using has_key()\r | |
238 | self._test_has_key_or_contains(self._box.has_key)\r | |
239 | \r | |
240 | def test_contains(self):\r | |
241 | # Check existence of keys using __contains__()\r | |
242 | self._test_has_key_or_contains(self._box.__contains__)\r | |
243 | \r | |
244 | def _test_has_key_or_contains(self, method):\r | |
245 | # (Used by test_has_key() and test_contains().)\r | |
246 | self.assertFalse(method('foo'))\r | |
247 | key0 = self._box.add(self._template % 0)\r | |
248 | self.assertTrue(method(key0))\r | |
249 | self.assertFalse(method('foo'))\r | |
250 | key1 = self._box.add(self._template % 1)\r | |
251 | self.assertTrue(method(key1))\r | |
252 | self.assertTrue(method(key0))\r | |
253 | self.assertFalse(method('foo'))\r | |
254 | self._box.remove(key0)\r | |
255 | self.assertFalse(method(key0))\r | |
256 | self.assertTrue(method(key1))\r | |
257 | self.assertFalse(method('foo'))\r | |
258 | self._box.remove(key1)\r | |
259 | self.assertFalse(method(key1))\r | |
260 | self.assertFalse(method(key0))\r | |
261 | self.assertFalse(method('foo'))\r | |
262 | \r | |
263 | def test_len(self, repetitions=10):\r | |
264 | # Get message count\r | |
265 | keys = []\r | |
266 | for i in xrange(repetitions):\r | |
267 | self.assertEqual(len(self._box), i)\r | |
268 | keys.append(self._box.add(self._template % i))\r | |
269 | self.assertEqual(len(self._box), i + 1)\r | |
270 | for i in xrange(repetitions):\r | |
271 | self.assertEqual(len(self._box), repetitions - i)\r | |
272 | self._box.remove(keys[i])\r | |
273 | self.assertEqual(len(self._box), repetitions - i - 1)\r | |
274 | \r | |
275 | def test_set_item(self):\r | |
276 | # Modify messages using __setitem__()\r | |
277 | key0 = self._box.add(self._template % 'original 0')\r | |
278 | self.assertEqual(self._box.get_string(key0),\r | |
279 | self._template % 'original 0')\r | |
280 | key1 = self._box.add(self._template % 'original 1')\r | |
281 | self.assertEqual(self._box.get_string(key1),\r | |
282 | self._template % 'original 1')\r | |
283 | self._box[key0] = self._template % 'changed 0'\r | |
284 | self.assertEqual(self._box.get_string(key0),\r | |
285 | self._template % 'changed 0')\r | |
286 | self._box[key1] = self._template % 'changed 1'\r | |
287 | self.assertEqual(self._box.get_string(key1),\r | |
288 | self._template % 'changed 1')\r | |
289 | self._box[key0] = _sample_message\r | |
290 | self._check_sample(self._box[key0])\r | |
291 | self._box[key1] = self._box[key0]\r | |
292 | self._check_sample(self._box[key1])\r | |
293 | self._box[key0] = self._template % 'original 0'\r | |
294 | self.assertEqual(self._box.get_string(key0),\r | |
295 | self._template % 'original 0')\r | |
296 | self._check_sample(self._box[key1])\r | |
297 | self.assertRaises(KeyError,\r | |
298 | lambda: self._box.__setitem__('foo', 'bar'))\r | |
299 | self.assertRaises(KeyError, lambda: self._box['foo'])\r | |
300 | self.assertEqual(len(self._box), 2)\r | |
301 | \r | |
302 | def test_clear(self, iterations=10):\r | |
303 | # Remove all messages using clear()\r | |
304 | keys = []\r | |
305 | for i in xrange(iterations):\r | |
306 | self._box.add(self._template % i)\r | |
307 | for i, key in enumerate(keys):\r | |
308 | self.assertEqual(self._box.get_string(key), self._template % i)\r | |
309 | self._box.clear()\r | |
310 | self.assertEqual(len(self._box), 0)\r | |
311 | for i, key in enumerate(keys):\r | |
312 | self.assertRaises(KeyError, lambda: self._box.get_string(key))\r | |
313 | \r | |
314 | def test_pop(self):\r | |
315 | # Get and remove a message using pop()\r | |
316 | key0 = self._box.add(self._template % 0)\r | |
317 | self.assertIn(key0, self._box)\r | |
318 | key1 = self._box.add(self._template % 1)\r | |
319 | self.assertIn(key1, self._box)\r | |
320 | self.assertEqual(self._box.pop(key0).get_payload(), '0')\r | |
321 | self.assertNotIn(key0, self._box)\r | |
322 | self.assertIn(key1, self._box)\r | |
323 | key2 = self._box.add(self._template % 2)\r | |
324 | self.assertIn(key2, self._box)\r | |
325 | self.assertEqual(self._box.pop(key2).get_payload(), '2')\r | |
326 | self.assertNotIn(key2, self._box)\r | |
327 | self.assertIn(key1, self._box)\r | |
328 | self.assertEqual(self._box.pop(key1).get_payload(), '1')\r | |
329 | self.assertNotIn(key1, self._box)\r | |
330 | self.assertEqual(len(self._box), 0)\r | |
331 | \r | |
332 | def test_popitem(self, iterations=10):\r | |
333 | # Get and remove an arbitrary (key, message) using popitem()\r | |
334 | keys = []\r | |
335 | for i in xrange(10):\r | |
336 | keys.append(self._box.add(self._template % i))\r | |
337 | seen = []\r | |
338 | for i in xrange(10):\r | |
339 | key, msg = self._box.popitem()\r | |
340 | self.assertIn(key, keys)\r | |
341 | self.assertNotIn(key, seen)\r | |
342 | seen.append(key)\r | |
343 | self.assertEqual(int(msg.get_payload()), keys.index(key))\r | |
344 | self.assertEqual(len(self._box), 0)\r | |
345 | for key in keys:\r | |
346 | self.assertRaises(KeyError, lambda: self._box[key])\r | |
347 | \r | |
348 | def test_update(self):\r | |
349 | # Modify multiple messages using update()\r | |
350 | key0 = self._box.add(self._template % 'original 0')\r | |
351 | key1 = self._box.add(self._template % 'original 1')\r | |
352 | key2 = self._box.add(self._template % 'original 2')\r | |
353 | self._box.update({key0: self._template % 'changed 0',\r | |
354 | key2: _sample_message})\r | |
355 | self.assertEqual(len(self._box), 3)\r | |
356 | self.assertEqual(self._box.get_string(key0),\r | |
357 | self._template % 'changed 0')\r | |
358 | self.assertEqual(self._box.get_string(key1),\r | |
359 | self._template % 'original 1')\r | |
360 | self._check_sample(self._box[key2])\r | |
361 | self._box.update([(key2, self._template % 'changed 2'),\r | |
362 | (key1, self._template % 'changed 1'),\r | |
363 | (key0, self._template % 'original 0')])\r | |
364 | self.assertEqual(len(self._box), 3)\r | |
365 | self.assertEqual(self._box.get_string(key0),\r | |
366 | self._template % 'original 0')\r | |
367 | self.assertEqual(self._box.get_string(key1),\r | |
368 | self._template % 'changed 1')\r | |
369 | self.assertEqual(self._box.get_string(key2),\r | |
370 | self._template % 'changed 2')\r | |
371 | self.assertRaises(KeyError,\r | |
372 | lambda: self._box.update({'foo': 'bar',\r | |
373 | key0: self._template % "changed 0"}))\r | |
374 | self.assertEqual(len(self._box), 3)\r | |
375 | self.assertEqual(self._box.get_string(key0),\r | |
376 | self._template % "changed 0")\r | |
377 | self.assertEqual(self._box.get_string(key1),\r | |
378 | self._template % "changed 1")\r | |
379 | self.assertEqual(self._box.get_string(key2),\r | |
380 | self._template % "changed 2")\r | |
381 | \r | |
382 | def test_flush(self):\r | |
383 | # Write changes to disk\r | |
384 | self._test_flush_or_close(self._box.flush, True)\r | |
385 | \r | |
386 | def test_lock_unlock(self):\r | |
387 | # Lock and unlock the mailbox\r | |
388 | self.assertFalse(os.path.exists(self._get_lock_path()))\r | |
389 | self._box.lock()\r | |
390 | self.assertTrue(os.path.exists(self._get_lock_path()))\r | |
391 | self._box.unlock()\r | |
392 | self.assertFalse(os.path.exists(self._get_lock_path()))\r | |
393 | \r | |
394 | def test_close(self):\r | |
395 | # Close mailbox and flush changes to disk\r | |
396 | self._test_flush_or_close(self._box.close, False)\r | |
397 | \r | |
398 | def _test_flush_or_close(self, method, should_call_close):\r | |
399 | contents = [self._template % i for i in xrange(3)]\r | |
400 | self._box.add(contents[0])\r | |
401 | self._box.add(contents[1])\r | |
402 | self._box.add(contents[2])\r | |
403 | method()\r | |
404 | if should_call_close:\r | |
405 | self._box.close()\r | |
406 | self._box = self._factory(self._path)\r | |
407 | keys = self._box.keys()\r | |
408 | self.assertEqual(len(keys), 3)\r | |
409 | for key in keys:\r | |
410 | self.assertIn(self._box.get_string(key), contents)\r | |
411 | \r | |
412 | def test_dump_message(self):\r | |
413 | # Write message representations to disk\r | |
414 | for input in (email.message_from_string(_sample_message),\r | |
415 | _sample_message, StringIO.StringIO(_sample_message)):\r | |
416 | output = StringIO.StringIO()\r | |
417 | self._box._dump_message(input, output)\r | |
418 | self.assertEqual(output.getvalue(),\r | |
419 | _sample_message.replace('\n', os.linesep))\r | |
420 | output = StringIO.StringIO()\r | |
421 | self.assertRaises(TypeError,\r | |
422 | lambda: self._box._dump_message(None, output))\r | |
423 | \r | |
424 | def _get_lock_path(self):\r | |
425 | # Return the path of the dot lock file. May be overridden.\r | |
426 | return self._path + '.lock'\r | |
427 | \r | |
428 | \r | |
429 | class TestMailboxSuperclass(TestBase):\r | |
430 | \r | |
431 | def test_notimplemented(self):\r | |
432 | # Test that all Mailbox methods raise NotImplementedException.\r | |
433 | box = mailbox.Mailbox('path')\r | |
434 | self.assertRaises(NotImplementedError, lambda: box.add(''))\r | |
435 | self.assertRaises(NotImplementedError, lambda: box.remove(''))\r | |
436 | self.assertRaises(NotImplementedError, lambda: box.__delitem__(''))\r | |
437 | self.assertRaises(NotImplementedError, lambda: box.discard(''))\r | |
438 | self.assertRaises(NotImplementedError, lambda: box.__setitem__('', ''))\r | |
439 | self.assertRaises(NotImplementedError, lambda: box.iterkeys())\r | |
440 | self.assertRaises(NotImplementedError, lambda: box.keys())\r | |
441 | self.assertRaises(NotImplementedError, lambda: box.itervalues().next())\r | |
442 | self.assertRaises(NotImplementedError, lambda: box.__iter__().next())\r | |
443 | self.assertRaises(NotImplementedError, lambda: box.values())\r | |
444 | self.assertRaises(NotImplementedError, lambda: box.iteritems().next())\r | |
445 | self.assertRaises(NotImplementedError, lambda: box.items())\r | |
446 | self.assertRaises(NotImplementedError, lambda: box.get(''))\r | |
447 | self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))\r | |
448 | self.assertRaises(NotImplementedError, lambda: box.get_message(''))\r | |
449 | self.assertRaises(NotImplementedError, lambda: box.get_string(''))\r | |
450 | self.assertRaises(NotImplementedError, lambda: box.get_file(''))\r | |
451 | self.assertRaises(NotImplementedError, lambda: box.has_key(''))\r | |
452 | self.assertRaises(NotImplementedError, lambda: box.__contains__(''))\r | |
453 | self.assertRaises(NotImplementedError, lambda: box.__len__())\r | |
454 | self.assertRaises(NotImplementedError, lambda: box.clear())\r | |
455 | self.assertRaises(NotImplementedError, lambda: box.pop(''))\r | |
456 | self.assertRaises(NotImplementedError, lambda: box.popitem())\r | |
457 | self.assertRaises(NotImplementedError, lambda: box.update((('', ''),)))\r | |
458 | self.assertRaises(NotImplementedError, lambda: box.flush())\r | |
459 | self.assertRaises(NotImplementedError, lambda: box.lock())\r | |
460 | self.assertRaises(NotImplementedError, lambda: box.unlock())\r | |
461 | self.assertRaises(NotImplementedError, lambda: box.close())\r | |
462 | \r | |
463 | \r | |
464 | class TestMaildir(TestMailbox):\r | |
465 | \r | |
466 | _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)\r | |
467 | \r | |
468 | def setUp(self):\r | |
469 | TestMailbox.setUp(self)\r | |
470 | if os.name in ('nt', 'os2') or sys.platform == 'cygwin':\r | |
471 | self._box.colon = '!'\r | |
472 | \r | |
473 | def test_add_MM(self):\r | |
474 | # Add a MaildirMessage instance\r | |
475 | msg = mailbox.MaildirMessage(self._template % 0)\r | |
476 | msg.set_subdir('cur')\r | |
477 | msg.set_info('foo')\r | |
478 | key = self._box.add(msg)\r | |
479 | self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %\r | |
480 | (key, self._box.colon))))\r | |
481 | \r | |
482 | def test_get_MM(self):\r | |
483 | # Get a MaildirMessage instance\r | |
484 | msg = mailbox.MaildirMessage(self._template % 0)\r | |
485 | msg.set_subdir('cur')\r | |
486 | msg.set_flags('RF')\r | |
487 | key = self._box.add(msg)\r | |
488 | msg_returned = self._box.get_message(key)\r | |
489 | self.assertIsInstance(msg_returned, mailbox.MaildirMessage)\r | |
490 | self.assertEqual(msg_returned.get_subdir(), 'cur')\r | |
491 | self.assertEqual(msg_returned.get_flags(), 'FR')\r | |
492 | \r | |
493 | def test_set_MM(self):\r | |
494 | # Set with a MaildirMessage instance\r | |
495 | msg0 = mailbox.MaildirMessage(self._template % 0)\r | |
496 | msg0.set_flags('TP')\r | |
497 | key = self._box.add(msg0)\r | |
498 | msg_returned = self._box.get_message(key)\r | |
499 | self.assertEqual(msg_returned.get_subdir(), 'new')\r | |
500 | self.assertEqual(msg_returned.get_flags(), 'PT')\r | |
501 | msg1 = mailbox.MaildirMessage(self._template % 1)\r | |
502 | self._box[key] = msg1\r | |
503 | msg_returned = self._box.get_message(key)\r | |
504 | self.assertEqual(msg_returned.get_subdir(), 'new')\r | |
505 | self.assertEqual(msg_returned.get_flags(), '')\r | |
506 | self.assertEqual(msg_returned.get_payload(), '1')\r | |
507 | msg2 = mailbox.MaildirMessage(self._template % 2)\r | |
508 | msg2.set_info('2,S')\r | |
509 | self._box[key] = msg2\r | |
510 | self._box[key] = self._template % 3\r | |
511 | msg_returned = self._box.get_message(key)\r | |
512 | self.assertEqual(msg_returned.get_subdir(), 'new')\r | |
513 | self.assertEqual(msg_returned.get_flags(), 'S')\r | |
514 | self.assertEqual(msg_returned.get_payload(), '3')\r | |
515 | \r | |
516 | def test_consistent_factory(self):\r | |
517 | # Add a message.\r | |
518 | msg = mailbox.MaildirMessage(self._template % 0)\r | |
519 | msg.set_subdir('cur')\r | |
520 | msg.set_flags('RF')\r | |
521 | key = self._box.add(msg)\r | |
522 | \r | |
523 | # Create new mailbox with\r | |
524 | class FakeMessage(mailbox.MaildirMessage):\r | |
525 | pass\r | |
526 | box = mailbox.Maildir(self._path, factory=FakeMessage)\r | |
527 | box.colon = self._box.colon\r | |
528 | msg2 = box.get_message(key)\r | |
529 | self.assertIsInstance(msg2, FakeMessage)\r | |
530 | \r | |
531 | def test_initialize_new(self):\r | |
532 | # Initialize a non-existent mailbox\r | |
533 | self.tearDown()\r | |
534 | self._box = mailbox.Maildir(self._path)\r | |
535 | self._check_basics(factory=rfc822.Message)\r | |
536 | self._delete_recursively(self._path)\r | |
537 | self._box = self._factory(self._path, factory=None)\r | |
538 | self._check_basics()\r | |
539 | \r | |
540 | def test_initialize_existing(self):\r | |
541 | # Initialize an existing mailbox\r | |
542 | self.tearDown()\r | |
543 | for subdir in '', 'tmp', 'new', 'cur':\r | |
544 | os.mkdir(os.path.normpath(os.path.join(self._path, subdir)))\r | |
545 | self._box = mailbox.Maildir(self._path)\r | |
546 | self._check_basics(factory=rfc822.Message)\r | |
547 | self._box = mailbox.Maildir(self._path, factory=None)\r | |
548 | self._check_basics()\r | |
549 | \r | |
550 | def _check_basics(self, factory=None):\r | |
551 | # (Used by test_open_new() and test_open_existing().)\r | |
552 | self.assertEqual(self._box._path, os.path.abspath(self._path))\r | |
553 | self.assertEqual(self._box._factory, factory)\r | |
554 | for subdir in '', 'tmp', 'new', 'cur':\r | |
555 | path = os.path.join(self._path, subdir)\r | |
556 | mode = os.stat(path)[stat.ST_MODE]\r | |
557 | self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)\r | |
558 | \r | |
559 | def test_list_folders(self):\r | |
560 | # List folders\r | |
561 | self._box.add_folder('one')\r | |
562 | self._box.add_folder('two')\r | |
563 | self._box.add_folder('three')\r | |
564 | self.assertEqual(len(self._box.list_folders()), 3)\r | |
565 | self.assertEqual(set(self._box.list_folders()),\r | |
566 | set(('one', 'two', 'three')))\r | |
567 | \r | |
568 | def test_get_folder(self):\r | |
569 | # Open folders\r | |
570 | self._box.add_folder('foo.bar')\r | |
571 | folder0 = self._box.get_folder('foo.bar')\r | |
572 | folder0.add(self._template % 'bar')\r | |
573 | self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar')))\r | |
574 | folder1 = self._box.get_folder('foo.bar')\r | |
575 | self.assertEqual(folder1.get_string(folder1.keys()[0]),\r | |
576 | self._template % 'bar')\r | |
577 | \r | |
578 | def test_add_and_remove_folders(self):\r | |
579 | # Delete folders\r | |
580 | self._box.add_folder('one')\r | |
581 | self._box.add_folder('two')\r | |
582 | self.assertEqual(len(self._box.list_folders()), 2)\r | |
583 | self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))\r | |
584 | self._box.remove_folder('one')\r | |
585 | self.assertEqual(len(self._box.list_folders()), 1)\r | |
586 | self.assertEqual(set(self._box.list_folders()), set(('two',)))\r | |
587 | self._box.add_folder('three')\r | |
588 | self.assertEqual(len(self._box.list_folders()), 2)\r | |
589 | self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))\r | |
590 | self._box.remove_folder('three')\r | |
591 | self.assertEqual(len(self._box.list_folders()), 1)\r | |
592 | self.assertEqual(set(self._box.list_folders()), set(('two',)))\r | |
593 | self._box.remove_folder('two')\r | |
594 | self.assertEqual(len(self._box.list_folders()), 0)\r | |
595 | self.assertEqual(self._box.list_folders(), [])\r | |
596 | \r | |
597 | def test_clean(self):\r | |
598 | # Remove old files from 'tmp'\r | |
599 | foo_path = os.path.join(self._path, 'tmp', 'foo')\r | |
600 | bar_path = os.path.join(self._path, 'tmp', 'bar')\r | |
601 | with open(foo_path, 'w') as f:\r | |
602 | f.write("@")\r | |
603 | with open(bar_path, 'w') as f:\r | |
604 | f.write("@")\r | |
605 | self._box.clean()\r | |
606 | self.assertTrue(os.path.exists(foo_path))\r | |
607 | self.assertTrue(os.path.exists(bar_path))\r | |
608 | foo_stat = os.stat(foo_path)\r | |
609 | os.utime(foo_path, (time.time() - 129600 - 2,\r | |
610 | foo_stat.st_mtime))\r | |
611 | self._box.clean()\r | |
612 | self.assertFalse(os.path.exists(foo_path))\r | |
613 | self.assertTrue(os.path.exists(bar_path))\r | |
614 | \r | |
615 | def test_create_tmp(self, repetitions=10):\r | |
616 | # Create files in tmp directory\r | |
617 | hostname = socket.gethostname()\r | |
618 | if '/' in hostname:\r | |
619 | hostname = hostname.replace('/', r'\057')\r | |
620 | if ':' in hostname:\r | |
621 | hostname = hostname.replace(':', r'\072')\r | |
622 | pid = os.getpid()\r | |
623 | pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"\r | |
624 | r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)")\r | |
625 | previous_groups = None\r | |
626 | for x in xrange(repetitions):\r | |
627 | tmp_file = self._box._create_tmp()\r | |
628 | head, tail = os.path.split(tmp_file.name)\r | |
629 | self.assertEqual(head, os.path.abspath(os.path.join(self._path,\r | |
630 | "tmp")),\r | |
631 | "File in wrong location: '%s'" % head)\r | |
632 | match = pattern.match(tail)\r | |
633 | self.assertTrue(match is not None, "Invalid file name: '%s'" % tail)\r | |
634 | groups = match.groups()\r | |
635 | if previous_groups is not None:\r | |
636 | self.assertTrue(int(groups[0] >= previous_groups[0]),\r | |
637 | "Non-monotonic seconds: '%s' before '%s'" %\r | |
638 | (previous_groups[0], groups[0]))\r | |
639 | self.assertTrue(int(groups[1] >= previous_groups[1]) or\r | |
640 | groups[0] != groups[1],\r | |
641 | "Non-monotonic milliseconds: '%s' before '%s'" %\r | |
642 | (previous_groups[1], groups[1]))\r | |
643 | self.assertTrue(int(groups[2]) == pid,\r | |
644 | "Process ID mismatch: '%s' should be '%s'" %\r | |
645 | (groups[2], pid))\r | |
646 | self.assertTrue(int(groups[3]) == int(previous_groups[3]) + 1,\r | |
647 | "Non-sequential counter: '%s' before '%s'" %\r | |
648 | (previous_groups[3], groups[3]))\r | |
649 | self.assertTrue(groups[4] == hostname,\r | |
650 | "Host name mismatch: '%s' should be '%s'" %\r | |
651 | (groups[4], hostname))\r | |
652 | previous_groups = groups\r | |
653 | tmp_file.write(_sample_message)\r | |
654 | tmp_file.seek(0)\r | |
655 | self.assertTrue(tmp_file.read() == _sample_message)\r | |
656 | tmp_file.close()\r | |
657 | file_count = len(os.listdir(os.path.join(self._path, "tmp")))\r | |
658 | self.assertTrue(file_count == repetitions,\r | |
659 | "Wrong file count: '%s' should be '%s'" %\r | |
660 | (file_count, repetitions))\r | |
661 | \r | |
662 | def test_refresh(self):\r | |
663 | # Update the table of contents\r | |
664 | self.assertEqual(self._box._toc, {})\r | |
665 | key0 = self._box.add(self._template % 0)\r | |
666 | key1 = self._box.add(self._template % 1)\r | |
667 | self.assertEqual(self._box._toc, {})\r | |
668 | self._box._refresh()\r | |
669 | self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),\r | |
670 | key1: os.path.join('new', key1)})\r | |
671 | key2 = self._box.add(self._template % 2)\r | |
672 | self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),\r | |
673 | key1: os.path.join('new', key1)})\r | |
674 | self._box._refresh()\r | |
675 | self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),\r | |
676 | key1: os.path.join('new', key1),\r | |
677 | key2: os.path.join('new', key2)})\r | |
678 | \r | |
679 | def test_lookup(self):\r | |
680 | # Look up message subpaths in the TOC\r | |
681 | self.assertRaises(KeyError, lambda: self._box._lookup('foo'))\r | |
682 | key0 = self._box.add(self._template % 0)\r | |
683 | self.assertEqual(self._box._lookup(key0), os.path.join('new', key0))\r | |
684 | os.remove(os.path.join(self._path, 'new', key0))\r | |
685 | self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)})\r | |
686 | # Be sure that the TOC is read back from disk (see issue #6896\r | |
687 | # about bad mtime behaviour on some systems).\r | |
688 | self._box.flush()\r | |
689 | self.assertRaises(KeyError, lambda: self._box._lookup(key0))\r | |
690 | self.assertEqual(self._box._toc, {})\r | |
691 | \r | |
692 | def test_lock_unlock(self):\r | |
693 | # Lock and unlock the mailbox. For Maildir, this does nothing.\r | |
694 | self._box.lock()\r | |
695 | self._box.unlock()\r | |
696 | \r | |
697 | def test_folder (self):\r | |
698 | # Test for bug #1569790: verify that folders returned by .get_folder()\r | |
699 | # use the same factory function.\r | |
700 | def dummy_factory (s):\r | |
701 | return None\r | |
702 | box = self._factory(self._path, factory=dummy_factory)\r | |
703 | folder = box.add_folder('folder1')\r | |
704 | self.assertIs(folder._factory, dummy_factory)\r | |
705 | \r | |
706 | folder1_alias = box.get_folder('folder1')\r | |
707 | self.assertIs(folder1_alias._factory, dummy_factory)\r | |
708 | \r | |
709 | def test_directory_in_folder (self):\r | |
710 | # Test that mailboxes still work if there's a stray extra directory\r | |
711 | # in a folder.\r | |
712 | for i in range(10):\r | |
713 | self._box.add(mailbox.Message(_sample_message))\r | |
714 | \r | |
715 | # Create a stray directory\r | |
716 | os.mkdir(os.path.join(self._path, 'cur', 'stray-dir'))\r | |
717 | \r | |
718 | # Check that looping still works with the directory present.\r | |
719 | for msg in self._box:\r | |
720 | pass\r | |
721 | \r | |
722 | def test_file_permissions(self):\r | |
723 | # Verify that message files are created without execute permissions\r | |
724 | if not hasattr(os, "stat") or not hasattr(os, "umask"):\r | |
725 | return\r | |
726 | msg = mailbox.MaildirMessage(self._template % 0)\r | |
727 | orig_umask = os.umask(0)\r | |
728 | try:\r | |
729 | key = self._box.add(msg)\r | |
730 | finally:\r | |
731 | os.umask(orig_umask)\r | |
732 | path = os.path.join(self._path, self._box._lookup(key))\r | |
733 | mode = os.stat(path).st_mode\r | |
734 | self.assertEqual(mode & 0111, 0)\r | |
735 | \r | |
736 | def test_folder_file_perms(self):\r | |
737 | # From bug #3228, we want to verify that the file created inside a Maildir\r | |
738 | # subfolder isn't marked as executable.\r | |
739 | if not hasattr(os, "stat") or not hasattr(os, "umask"):\r | |
740 | return\r | |
741 | \r | |
742 | orig_umask = os.umask(0)\r | |
743 | try:\r | |
744 | subfolder = self._box.add_folder('subfolder')\r | |
745 | finally:\r | |
746 | os.umask(orig_umask)\r | |
747 | \r | |
748 | path = os.path.join(subfolder._path, 'maildirfolder')\r | |
749 | st = os.stat(path)\r | |
750 | perms = st.st_mode\r | |
751 | self.assertFalse((perms & 0111)) # Execute bits should all be off.\r | |
752 | \r | |
753 | def test_reread(self):\r | |
754 | \r | |
755 | # Put the last modified times more than two seconds into the past\r | |
756 | # (because mtime may have only a two second granularity).\r | |
757 | for subdir in ('cur', 'new'):\r | |
758 | os.utime(os.path.join(self._box._path, subdir),\r | |
759 | (time.time()-5,)*2)\r | |
760 | \r | |
761 | # Because mtime has a two second granularity in worst case (FAT), a\r | |
762 | # refresh is done unconditionally if called for within\r | |
763 | # two-second-plus-a-bit of the last one, just in case the mbox has\r | |
764 | # changed; so now we have to wait for that interval to expire.\r | |
765 | time.sleep(2.01 + self._box._skewfactor)\r | |
766 | \r | |
767 | # Re-reading causes the ._toc attribute to be assigned a new dictionary\r | |
768 | # object, so we'll check that the ._toc attribute isn't a different\r | |
769 | # object.\r | |
770 | orig_toc = self._box._toc\r | |
771 | def refreshed():\r | |
772 | return self._box._toc is not orig_toc\r | |
773 | \r | |
774 | self._box._refresh()\r | |
775 | self.assertFalse(refreshed())\r | |
776 | \r | |
777 | # Now, write something into cur and remove it. This changes\r | |
778 | # the mtime and should cause a re-read.\r | |
779 | filename = os.path.join(self._path, 'cur', 'stray-file')\r | |
780 | f = open(filename, 'w')\r | |
781 | f.close()\r | |
782 | os.unlink(filename)\r | |
783 | self._box._refresh()\r | |
784 | self.assertTrue(refreshed())\r | |
785 | \r | |
786 | class _TestMboxMMDF(TestMailbox):\r | |
787 | \r | |
788 | def tearDown(self):\r | |
789 | self._box.close()\r | |
790 | self._delete_recursively(self._path)\r | |
791 | for lock_remnant in glob.glob(self._path + '.*'):\r | |
792 | test_support.unlink(lock_remnant)\r | |
793 | \r | |
794 | def test_add_from_string(self):\r | |
795 | # Add a string starting with 'From ' to the mailbox\r | |
796 | key = self._box.add('From foo@bar blah\nFrom: foo\n\n0')\r | |
797 | self.assertEqual(self._box[key].get_from(), 'foo@bar blah')\r | |
798 | self.assertEqual(self._box[key].get_payload(), '0')\r | |
799 | \r | |
800 | def test_add_mbox_or_mmdf_message(self):\r | |
801 | # Add an mboxMessage or MMDFMessage\r | |
802 | for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):\r | |
803 | msg = class_('From foo@bar blah\nFrom: foo\n\n0')\r | |
804 | key = self._box.add(msg)\r | |
805 | \r | |
806 | def test_open_close_open(self):\r | |
807 | # Open and inspect previously-created mailbox\r | |
808 | values = [self._template % i for i in xrange(3)]\r | |
809 | for value in values:\r | |
810 | self._box.add(value)\r | |
811 | self._box.close()\r | |
812 | mtime = os.path.getmtime(self._path)\r | |
813 | self._box = self._factory(self._path)\r | |
814 | self.assertEqual(len(self._box), 3)\r | |
815 | for key in self._box.iterkeys():\r | |
816 | self.assertIn(self._box.get_string(key), values)\r | |
817 | self._box.close()\r | |
818 | self.assertEqual(mtime, os.path.getmtime(self._path))\r | |
819 | \r | |
820 | def test_add_and_close(self):\r | |
821 | # Verifying that closing a mailbox doesn't change added items\r | |
822 | self._box.add(_sample_message)\r | |
823 | for i in xrange(3):\r | |
824 | self._box.add(self._template % i)\r | |
825 | self._box.add(_sample_message)\r | |
826 | self._box._file.flush()\r | |
827 | self._box._file.seek(0)\r | |
828 | contents = self._box._file.read()\r | |
829 | self._box.close()\r | |
830 | with open(self._path, 'rb') as f:\r | |
831 | self.assertEqual(contents, f.read())\r | |
832 | self._box = self._factory(self._path)\r | |
833 | \r | |
834 | def test_lock_conflict(self):\r | |
835 | # Fork off a subprocess that will lock the file for 2 seconds,\r | |
836 | # unlock it, and then exit.\r | |
837 | if not hasattr(os, 'fork'):\r | |
838 | return\r | |
839 | pid = os.fork()\r | |
840 | if pid == 0:\r | |
841 | # In the child, lock the mailbox.\r | |
842 | self._box.lock()\r | |
843 | time.sleep(2)\r | |
844 | self._box.unlock()\r | |
845 | os._exit(0)\r | |
846 | \r | |
847 | # In the parent, sleep a bit to give the child time to acquire\r | |
848 | # the lock.\r | |
849 | time.sleep(0.5)\r | |
850 | try:\r | |
851 | self.assertRaises(mailbox.ExternalClashError,\r | |
852 | self._box.lock)\r | |
853 | finally:\r | |
854 | # Wait for child to exit. Locking should now succeed.\r | |
855 | exited_pid, status = os.waitpid(pid, 0)\r | |
856 | \r | |
857 | self._box.lock()\r | |
858 | self._box.unlock()\r | |
859 | \r | |
860 | def test_relock(self):\r | |
861 | # Test case for bug #1575506: the mailbox class was locking the\r | |
862 | # wrong file object in its flush() method.\r | |
863 | msg = "Subject: sub\n\nbody\n"\r | |
864 | key1 = self._box.add(msg)\r | |
865 | self._box.flush()\r | |
866 | self._box.close()\r | |
867 | \r | |
868 | self._box = self._factory(self._path)\r | |
869 | self._box.lock()\r | |
870 | key2 = self._box.add(msg)\r | |
871 | self._box.flush()\r | |
872 | self.assertTrue(self._box._locked)\r | |
873 | self._box.close()\r | |
874 | \r | |
875 | \r | |
876 | class TestMbox(_TestMboxMMDF):\r | |
877 | \r | |
878 | _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)\r | |
879 | \r | |
880 | def test_file_perms(self):\r | |
881 | # From bug #3228, we want to verify that the mailbox file isn't executable,\r | |
882 | # even if the umask is set to something that would leave executable bits set.\r | |
883 | # We only run this test on platforms that support umask.\r | |
884 | if hasattr(os, 'umask') and hasattr(os, 'stat'):\r | |
885 | try:\r | |
886 | old_umask = os.umask(0077)\r | |
887 | self._box.close()\r | |
888 | os.unlink(self._path)\r | |
889 | self._box = mailbox.mbox(self._path, create=True)\r | |
890 | self._box.add('')\r | |
891 | self._box.close()\r | |
892 | finally:\r | |
893 | os.umask(old_umask)\r | |
894 | \r | |
895 | st = os.stat(self._path)\r | |
896 | perms = st.st_mode\r | |
897 | self.assertFalse((perms & 0111)) # Execute bits should all be off.\r | |
898 | \r | |
899 | class TestMMDF(_TestMboxMMDF):\r | |
900 | \r | |
901 | _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)\r | |
902 | \r | |
903 | \r | |
904 | class TestMH(TestMailbox):\r | |
905 | \r | |
906 | _factory = lambda self, path, factory=None: mailbox.MH(path, factory)\r | |
907 | \r | |
908 | def test_list_folders(self):\r | |
909 | # List folders\r | |
910 | self._box.add_folder('one')\r | |
911 | self._box.add_folder('two')\r | |
912 | self._box.add_folder('three')\r | |
913 | self.assertEqual(len(self._box.list_folders()), 3)\r | |
914 | self.assertEqual(set(self._box.list_folders()),\r | |
915 | set(('one', 'two', 'three')))\r | |
916 | \r | |
917 | def test_get_folder(self):\r | |
918 | # Open folders\r | |
919 | def dummy_factory (s):\r | |
920 | return None\r | |
921 | self._box = self._factory(self._path, dummy_factory)\r | |
922 | \r | |
923 | new_folder = self._box.add_folder('foo.bar')\r | |
924 | folder0 = self._box.get_folder('foo.bar')\r | |
925 | folder0.add(self._template % 'bar')\r | |
926 | self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar')))\r | |
927 | folder1 = self._box.get_folder('foo.bar')\r | |
928 | self.assertEqual(folder1.get_string(folder1.keys()[0]),\r | |
929 | self._template % 'bar')\r | |
930 | \r | |
931 | # Test for bug #1569790: verify that folders returned by .get_folder()\r | |
932 | # use the same factory function.\r | |
933 | self.assertIs(new_folder._factory, self._box._factory)\r | |
934 | self.assertIs(folder0._factory, self._box._factory)\r | |
935 | \r | |
936 | def test_add_and_remove_folders(self):\r | |
937 | # Delete folders\r | |
938 | self._box.add_folder('one')\r | |
939 | self._box.add_folder('two')\r | |
940 | self.assertEqual(len(self._box.list_folders()), 2)\r | |
941 | self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))\r | |
942 | self._box.remove_folder('one')\r | |
943 | self.assertEqual(len(self._box.list_folders()), 1)\r | |
944 | self.assertEqual(set(self._box.list_folders()), set(('two', )))\r | |
945 | self._box.add_folder('three')\r | |
946 | self.assertEqual(len(self._box.list_folders()), 2)\r | |
947 | self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))\r | |
948 | self._box.remove_folder('three')\r | |
949 | self.assertEqual(len(self._box.list_folders()), 1)\r | |
950 | self.assertEqual(set(self._box.list_folders()), set(('two', )))\r | |
951 | self._box.remove_folder('two')\r | |
952 | self.assertEqual(len(self._box.list_folders()), 0)\r | |
953 | self.assertEqual(self._box.list_folders(), [])\r | |
954 | \r | |
955 | def test_sequences(self):\r | |
956 | # Get and set sequences\r | |
957 | self.assertEqual(self._box.get_sequences(), {})\r | |
958 | msg0 = mailbox.MHMessage(self._template % 0)\r | |
959 | msg0.add_sequence('foo')\r | |
960 | key0 = self._box.add(msg0)\r | |
961 | self.assertEqual(self._box.get_sequences(), {'foo':[key0]})\r | |
962 | msg1 = mailbox.MHMessage(self._template % 1)\r | |
963 | msg1.set_sequences(['bar', 'replied', 'foo'])\r | |
964 | key1 = self._box.add(msg1)\r | |
965 | self.assertEqual(self._box.get_sequences(),\r | |
966 | {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]})\r | |
967 | msg0.set_sequences(['flagged'])\r | |
968 | self._box[key0] = msg0\r | |
969 | self.assertEqual(self._box.get_sequences(),\r | |
970 | {'foo':[key1], 'bar':[key1], 'replied':[key1],\r | |
971 | 'flagged':[key0]})\r | |
972 | self._box.remove(key1)\r | |
973 | self.assertEqual(self._box.get_sequences(), {'flagged':[key0]})\r | |
974 | \r | |
975 | def test_issue2625(self):\r | |
976 | msg0 = mailbox.MHMessage(self._template % 0)\r | |
977 | msg0.add_sequence('foo')\r | |
978 | key0 = self._box.add(msg0)\r | |
979 | refmsg0 = self._box.get_message(key0)\r | |
980 | \r | |
981 | def test_issue7627(self):\r | |
982 | msg0 = mailbox.MHMessage(self._template % 0)\r | |
983 | key0 = self._box.add(msg0)\r | |
984 | self._box.lock()\r | |
985 | self._box.remove(key0)\r | |
986 | self._box.unlock()\r | |
987 | \r | |
988 | def test_pack(self):\r | |
989 | # Pack the contents of the mailbox\r | |
990 | msg0 = mailbox.MHMessage(self._template % 0)\r | |
991 | msg1 = mailbox.MHMessage(self._template % 1)\r | |
992 | msg2 = mailbox.MHMessage(self._template % 2)\r | |
993 | msg3 = mailbox.MHMessage(self._template % 3)\r | |
994 | msg0.set_sequences(['foo', 'unseen'])\r | |
995 | msg1.set_sequences(['foo'])\r | |
996 | msg2.set_sequences(['foo', 'flagged'])\r | |
997 | msg3.set_sequences(['foo', 'bar', 'replied'])\r | |
998 | key0 = self._box.add(msg0)\r | |
999 | key1 = self._box.add(msg1)\r | |
1000 | key2 = self._box.add(msg2)\r | |
1001 | key3 = self._box.add(msg3)\r | |
1002 | self.assertEqual(self._box.get_sequences(),\r | |
1003 | {'foo':[key0,key1,key2,key3], 'unseen':[key0],\r | |
1004 | 'flagged':[key2], 'bar':[key3], 'replied':[key3]})\r | |
1005 | self._box.remove(key2)\r | |
1006 | self.assertEqual(self._box.get_sequences(),\r | |
1007 | {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3],\r | |
1008 | 'replied':[key3]})\r | |
1009 | self._box.pack()\r | |
1010 | self.assertEqual(self._box.keys(), [1, 2, 3])\r | |
1011 | key0 = key0\r | |
1012 | key1 = key0 + 1\r | |
1013 | key2 = key1 + 1\r | |
1014 | self.assertEqual(self._box.get_sequences(),\r | |
1015 | {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})\r | |
1016 | \r | |
1017 | # Test case for packing while holding the mailbox locked.\r | |
1018 | key0 = self._box.add(msg1)\r | |
1019 | key1 = self._box.add(msg1)\r | |
1020 | key2 = self._box.add(msg1)\r | |
1021 | key3 = self._box.add(msg1)\r | |
1022 | \r | |
1023 | self._box.remove(key0)\r | |
1024 | self._box.remove(key2)\r | |
1025 | self._box.lock()\r | |
1026 | self._box.pack()\r | |
1027 | self._box.unlock()\r | |
1028 | self.assertEqual(self._box.get_sequences(),\r | |
1029 | {'foo':[1, 2, 3, 4, 5],\r | |
1030 | 'unseen':[1], 'bar':[3], 'replied':[3]})\r | |
1031 | \r | |
1032 | def _get_lock_path(self):\r | |
1033 | return os.path.join(self._path, '.mh_sequences.lock')\r | |
1034 | \r | |
1035 | \r | |
1036 | class TestBabyl(TestMailbox):\r | |
1037 | \r | |
1038 | _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)\r | |
1039 | \r | |
1040 | def tearDown(self):\r | |
1041 | self._box.close()\r | |
1042 | self._delete_recursively(self._path)\r | |
1043 | for lock_remnant in glob.glob(self._path + '.*'):\r | |
1044 | test_support.unlink(lock_remnant)\r | |
1045 | \r | |
1046 | def test_labels(self):\r | |
1047 | # Get labels from the mailbox\r | |
1048 | self.assertEqual(self._box.get_labels(), [])\r | |
1049 | msg0 = mailbox.BabylMessage(self._template % 0)\r | |
1050 | msg0.add_label('foo')\r | |
1051 | key0 = self._box.add(msg0)\r | |
1052 | self.assertEqual(self._box.get_labels(), ['foo'])\r | |
1053 | msg1 = mailbox.BabylMessage(self._template % 1)\r | |
1054 | msg1.set_labels(['bar', 'answered', 'foo'])\r | |
1055 | key1 = self._box.add(msg1)\r | |
1056 | self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar']))\r | |
1057 | msg0.set_labels(['blah', 'filed'])\r | |
1058 | self._box[key0] = msg0\r | |
1059 | self.assertEqual(set(self._box.get_labels()),\r | |
1060 | set(['foo', 'bar', 'blah']))\r | |
1061 | self._box.remove(key1)\r | |
1062 | self.assertEqual(set(self._box.get_labels()), set(['blah']))\r | |
1063 | \r | |
1064 | \r | |
1065 | class TestMessage(TestBase):\r | |
1066 | \r | |
1067 | _factory = mailbox.Message # Overridden by subclasses to reuse tests\r | |
1068 | \r | |
1069 | def setUp(self):\r | |
1070 | self._path = test_support.TESTFN\r | |
1071 | \r | |
1072 | def tearDown(self):\r | |
1073 | self._delete_recursively(self._path)\r | |
1074 | \r | |
1075 | def test_initialize_with_eMM(self):\r | |
1076 | # Initialize based on email.message.Message instance\r | |
1077 | eMM = email.message_from_string(_sample_message)\r | |
1078 | msg = self._factory(eMM)\r | |
1079 | self._post_initialize_hook(msg)\r | |
1080 | self._check_sample(msg)\r | |
1081 | \r | |
1082 | def test_initialize_with_string(self):\r | |
1083 | # Initialize based on string\r | |
1084 | msg = self._factory(_sample_message)\r | |
1085 | self._post_initialize_hook(msg)\r | |
1086 | self._check_sample(msg)\r | |
1087 | \r | |
1088 | def test_initialize_with_file(self):\r | |
1089 | # Initialize based on contents of file\r | |
1090 | with open(self._path, 'w+') as f:\r | |
1091 | f.write(_sample_message)\r | |
1092 | f.seek(0)\r | |
1093 | msg = self._factory(f)\r | |
1094 | self._post_initialize_hook(msg)\r | |
1095 | self._check_sample(msg)\r | |
1096 | \r | |
1097 | def test_initialize_with_nothing(self):\r | |
1098 | # Initialize without arguments\r | |
1099 | msg = self._factory()\r | |
1100 | self._post_initialize_hook(msg)\r | |
1101 | self.assertIsInstance(msg, email.message.Message)\r | |
1102 | self.assertIsInstance(msg, mailbox.Message)\r | |
1103 | self.assertIsInstance(msg, self._factory)\r | |
1104 | self.assertEqual(msg.keys(), [])\r | |
1105 | self.assertFalse(msg.is_multipart())\r | |
1106 | self.assertEqual(msg.get_payload(), None)\r | |
1107 | \r | |
1108 | def test_initialize_incorrectly(self):\r | |
1109 | # Initialize with invalid argument\r | |
1110 | self.assertRaises(TypeError, lambda: self._factory(object()))\r | |
1111 | \r | |
1112 | def test_become_message(self):\r | |
1113 | # Take on the state of another message\r | |
1114 | eMM = email.message_from_string(_sample_message)\r | |
1115 | msg = self._factory()\r | |
1116 | msg._become_message(eMM)\r | |
1117 | self._check_sample(msg)\r | |
1118 | \r | |
1119 | def test_explain_to(self):\r | |
1120 | # Copy self's format-specific data to other message formats.\r | |
1121 | # This test is superficial; better ones are in TestMessageConversion.\r | |
1122 | msg = self._factory()\r | |
1123 | for class_ in (mailbox.Message, mailbox.MaildirMessage,\r | |
1124 | mailbox.mboxMessage, mailbox.MHMessage,\r | |
1125 | mailbox.BabylMessage, mailbox.MMDFMessage):\r | |
1126 | other_msg = class_()\r | |
1127 | msg._explain_to(other_msg)\r | |
1128 | other_msg = email.message.Message()\r | |
1129 | self.assertRaises(TypeError, lambda: msg._explain_to(other_msg))\r | |
1130 | \r | |
1131 | def _post_initialize_hook(self, msg):\r | |
1132 | # Overridden by subclasses to check extra things after initialization\r | |
1133 | pass\r | |
1134 | \r | |
1135 | \r | |
1136 | class TestMaildirMessage(TestMessage):\r | |
1137 | \r | |
1138 | _factory = mailbox.MaildirMessage\r | |
1139 | \r | |
1140 | def _post_initialize_hook(self, msg):\r | |
1141 | self.assertEqual(msg._subdir, 'new')\r | |
1142 | self.assertEqual(msg._info,'')\r | |
1143 | \r | |
1144 | def test_subdir(self):\r | |
1145 | # Use get_subdir() and set_subdir()\r | |
1146 | msg = mailbox.MaildirMessage(_sample_message)\r | |
1147 | self.assertEqual(msg.get_subdir(), 'new')\r | |
1148 | msg.set_subdir('cur')\r | |
1149 | self.assertEqual(msg.get_subdir(), 'cur')\r | |
1150 | msg.set_subdir('new')\r | |
1151 | self.assertEqual(msg.get_subdir(), 'new')\r | |
1152 | self.assertRaises(ValueError, lambda: msg.set_subdir('tmp'))\r | |
1153 | self.assertEqual(msg.get_subdir(), 'new')\r | |
1154 | msg.set_subdir('new')\r | |
1155 | self.assertEqual(msg.get_subdir(), 'new')\r | |
1156 | self._check_sample(msg)\r | |
1157 | \r | |
1158 | def test_flags(self):\r | |
1159 | # Use get_flags(), set_flags(), add_flag(), remove_flag()\r | |
1160 | msg = mailbox.MaildirMessage(_sample_message)\r | |
1161 | self.assertEqual(msg.get_flags(), '')\r | |
1162 | self.assertEqual(msg.get_subdir(), 'new')\r | |
1163 | msg.set_flags('F')\r | |
1164 | self.assertEqual(msg.get_subdir(), 'new')\r | |
1165 | self.assertEqual(msg.get_flags(), 'F')\r | |
1166 | msg.set_flags('SDTP')\r | |
1167 | self.assertEqual(msg.get_flags(), 'DPST')\r | |
1168 | msg.add_flag('FT')\r | |
1169 | self.assertEqual(msg.get_flags(), 'DFPST')\r | |
1170 | msg.remove_flag('TDRP')\r | |
1171 | self.assertEqual(msg.get_flags(), 'FS')\r | |
1172 | self.assertEqual(msg.get_subdir(), 'new')\r | |
1173 | self._check_sample(msg)\r | |
1174 | \r | |
1175 | def test_date(self):\r | |
1176 | # Use get_date() and set_date()\r | |
1177 | msg = mailbox.MaildirMessage(_sample_message)\r | |
1178 | diff = msg.get_date() - time.time()\r | |
1179 | self.assertTrue(abs(diff) < 60, diff)\r | |
1180 | msg.set_date(0.0)\r | |
1181 | self.assertEqual(msg.get_date(), 0.0)\r | |
1182 | \r | |
1183 | def test_info(self):\r | |
1184 | # Use get_info() and set_info()\r | |
1185 | msg = mailbox.MaildirMessage(_sample_message)\r | |
1186 | self.assertEqual(msg.get_info(), '')\r | |
1187 | msg.set_info('1,foo=bar')\r | |
1188 | self.assertEqual(msg.get_info(), '1,foo=bar')\r | |
1189 | self.assertRaises(TypeError, lambda: msg.set_info(None))\r | |
1190 | self._check_sample(msg)\r | |
1191 | \r | |
1192 | def test_info_and_flags(self):\r | |
1193 | # Test interaction of info and flag methods\r | |
1194 | msg = mailbox.MaildirMessage(_sample_message)\r | |
1195 | self.assertEqual(msg.get_info(), '')\r | |
1196 | msg.set_flags('SF')\r | |
1197 | self.assertEqual(msg.get_flags(), 'FS')\r | |
1198 | self.assertEqual(msg.get_info(), '2,FS')\r | |
1199 | msg.set_info('1,')\r | |
1200 | self.assertEqual(msg.get_flags(), '')\r | |
1201 | self.assertEqual(msg.get_info(), '1,')\r | |
1202 | msg.remove_flag('RPT')\r | |
1203 | self.assertEqual(msg.get_flags(), '')\r | |
1204 | self.assertEqual(msg.get_info(), '1,')\r | |
1205 | msg.add_flag('D')\r | |
1206 | self.assertEqual(msg.get_flags(), 'D')\r | |
1207 | self.assertEqual(msg.get_info(), '2,D')\r | |
1208 | self._check_sample(msg)\r | |
1209 | \r | |
1210 | \r | |
1211 | class _TestMboxMMDFMessage(TestMessage):\r | |
1212 | \r | |
1213 | _factory = mailbox._mboxMMDFMessage\r | |
1214 | \r | |
1215 | def _post_initialize_hook(self, msg):\r | |
1216 | self._check_from(msg)\r | |
1217 | \r | |
1218 | def test_initialize_with_unixfrom(self):\r | |
1219 | # Initialize with a message that already has a _unixfrom attribute\r | |
1220 | msg = mailbox.Message(_sample_message)\r | |
1221 | msg.set_unixfrom('From foo@bar blah')\r | |
1222 | msg = mailbox.mboxMessage(msg)\r | |
1223 | self.assertEqual(msg.get_from(), 'foo@bar blah')\r | |
1224 | \r | |
1225 | def test_from(self):\r | |
1226 | # Get and set "From " line\r | |
1227 | msg = mailbox.mboxMessage(_sample_message)\r | |
1228 | self._check_from(msg)\r | |
1229 | msg.set_from('foo bar')\r | |
1230 | self.assertEqual(msg.get_from(), 'foo bar')\r | |
1231 | msg.set_from('foo@bar', True)\r | |
1232 | self._check_from(msg, 'foo@bar')\r | |
1233 | msg.set_from('blah@temp', time.localtime())\r | |
1234 | self._check_from(msg, 'blah@temp')\r | |
1235 | \r | |
1236 | def test_flags(self):\r | |
1237 | # Use get_flags(), set_flags(), add_flag(), remove_flag()\r | |
1238 | msg = mailbox.mboxMessage(_sample_message)\r | |
1239 | self.assertEqual(msg.get_flags(), '')\r | |
1240 | msg.set_flags('F')\r | |
1241 | self.assertEqual(msg.get_flags(), 'F')\r | |
1242 | msg.set_flags('XODR')\r | |
1243 | self.assertEqual(msg.get_flags(), 'RODX')\r | |
1244 | msg.add_flag('FA')\r | |
1245 | self.assertEqual(msg.get_flags(), 'RODFAX')\r | |
1246 | msg.remove_flag('FDXA')\r | |
1247 | self.assertEqual(msg.get_flags(), 'RO')\r | |
1248 | self._check_sample(msg)\r | |
1249 | \r | |
1250 | def _check_from(self, msg, sender=None):\r | |
1251 | # Check contents of "From " line\r | |
1252 | if sender is None:\r | |
1253 | sender = "MAILER-DAEMON"\r | |
1254 | self.assertTrue(re.match(sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:"\r | |
1255 | r"\d{2} \d{4}", msg.get_from()))\r | |
1256 | \r | |
1257 | \r | |
1258 | class TestMboxMessage(_TestMboxMMDFMessage):\r | |
1259 | \r | |
1260 | _factory = mailbox.mboxMessage\r | |
1261 | \r | |
1262 | \r | |
1263 | class TestMHMessage(TestMessage):\r | |
1264 | \r | |
1265 | _factory = mailbox.MHMessage\r | |
1266 | \r | |
1267 | def _post_initialize_hook(self, msg):\r | |
1268 | self.assertEqual(msg._sequences, [])\r | |
1269 | \r | |
1270 | def test_sequences(self):\r | |
1271 | # Get, set, join, and leave sequences\r | |
1272 | msg = mailbox.MHMessage(_sample_message)\r | |
1273 | self.assertEqual(msg.get_sequences(), [])\r | |
1274 | msg.set_sequences(['foobar'])\r | |
1275 | self.assertEqual(msg.get_sequences(), ['foobar'])\r | |
1276 | msg.set_sequences([])\r | |
1277 | self.assertEqual(msg.get_sequences(), [])\r | |
1278 | msg.add_sequence('unseen')\r | |
1279 | self.assertEqual(msg.get_sequences(), ['unseen'])\r | |
1280 | msg.add_sequence('flagged')\r | |
1281 | self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])\r | |
1282 | msg.add_sequence('flagged')\r | |
1283 | self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])\r | |
1284 | msg.remove_sequence('unseen')\r | |
1285 | self.assertEqual(msg.get_sequences(), ['flagged'])\r | |
1286 | msg.add_sequence('foobar')\r | |
1287 | self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])\r | |
1288 | msg.remove_sequence('replied')\r | |
1289 | self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])\r | |
1290 | msg.set_sequences(['foobar', 'replied'])\r | |
1291 | self.assertEqual(msg.get_sequences(), ['foobar', 'replied'])\r | |
1292 | \r | |
1293 | \r | |
1294 | class TestBabylMessage(TestMessage):\r | |
1295 | \r | |
1296 | _factory = mailbox.BabylMessage\r | |
1297 | \r | |
1298 | def _post_initialize_hook(self, msg):\r | |
1299 | self.assertEqual(msg._labels, [])\r | |
1300 | \r | |
1301 | def test_labels(self):\r | |
1302 | # Get, set, join, and leave labels\r | |
1303 | msg = mailbox.BabylMessage(_sample_message)\r | |
1304 | self.assertEqual(msg.get_labels(), [])\r | |
1305 | msg.set_labels(['foobar'])\r | |
1306 | self.assertEqual(msg.get_labels(), ['foobar'])\r | |
1307 | msg.set_labels([])\r | |
1308 | self.assertEqual(msg.get_labels(), [])\r | |
1309 | msg.add_label('filed')\r | |
1310 | self.assertEqual(msg.get_labels(), ['filed'])\r | |
1311 | msg.add_label('resent')\r | |
1312 | self.assertEqual(msg.get_labels(), ['filed', 'resent'])\r | |
1313 | msg.add_label('resent')\r | |
1314 | self.assertEqual(msg.get_labels(), ['filed', 'resent'])\r | |
1315 | msg.remove_label('filed')\r | |
1316 | self.assertEqual(msg.get_labels(), ['resent'])\r | |
1317 | msg.add_label('foobar')\r | |
1318 | self.assertEqual(msg.get_labels(), ['resent', 'foobar'])\r | |
1319 | msg.remove_label('unseen')\r | |
1320 | self.assertEqual(msg.get_labels(), ['resent', 'foobar'])\r | |
1321 | msg.set_labels(['foobar', 'answered'])\r | |
1322 | self.assertEqual(msg.get_labels(), ['foobar', 'answered'])\r | |
1323 | \r | |
1324 | def test_visible(self):\r | |
1325 | # Get, set, and update visible headers\r | |
1326 | msg = mailbox.BabylMessage(_sample_message)\r | |
1327 | visible = msg.get_visible()\r | |
1328 | self.assertEqual(visible.keys(), [])\r | |
1329 | self.assertIs(visible.get_payload(), None)\r | |
1330 | visible['User-Agent'] = 'FooBar 1.0'\r | |
1331 | visible['X-Whatever'] = 'Blah'\r | |
1332 | self.assertEqual(msg.get_visible().keys(), [])\r | |
1333 | msg.set_visible(visible)\r | |
1334 | visible = msg.get_visible()\r | |
1335 | self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])\r | |
1336 | self.assertEqual(visible['User-Agent'], 'FooBar 1.0')\r | |
1337 | self.assertEqual(visible['X-Whatever'], 'Blah')\r | |
1338 | self.assertIs(visible.get_payload(), None)\r | |
1339 | msg.update_visible()\r | |
1340 | self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])\r | |
1341 | self.assertIs(visible.get_payload(), None)\r | |
1342 | visible = msg.get_visible()\r | |
1343 | self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To',\r | |
1344 | 'Subject'])\r | |
1345 | for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):\r | |
1346 | self.assertEqual(visible[header], msg[header])\r | |
1347 | \r | |
1348 | \r | |
1349 | class TestMMDFMessage(_TestMboxMMDFMessage):\r | |
1350 | \r | |
1351 | _factory = mailbox.MMDFMessage\r | |
1352 | \r | |
1353 | \r | |
1354 | class TestMessageConversion(TestBase):\r | |
1355 | \r | |
1356 | def test_plain_to_x(self):\r | |
1357 | # Convert Message to all formats\r | |
1358 | for class_ in (mailbox.Message, mailbox.MaildirMessage,\r | |
1359 | mailbox.mboxMessage, mailbox.MHMessage,\r | |
1360 | mailbox.BabylMessage, mailbox.MMDFMessage):\r | |
1361 | msg_plain = mailbox.Message(_sample_message)\r | |
1362 | msg = class_(msg_plain)\r | |
1363 | self._check_sample(msg)\r | |
1364 | \r | |
1365 | def test_x_to_plain(self):\r | |
1366 | # Convert all formats to Message\r | |
1367 | for class_ in (mailbox.Message, mailbox.MaildirMessage,\r | |
1368 | mailbox.mboxMessage, mailbox.MHMessage,\r | |
1369 | mailbox.BabylMessage, mailbox.MMDFMessage):\r | |
1370 | msg = class_(_sample_message)\r | |
1371 | msg_plain = mailbox.Message(msg)\r | |
1372 | self._check_sample(msg_plain)\r | |
1373 | \r | |
1374 | def test_x_to_invalid(self):\r | |
1375 | # Convert all formats to an invalid format\r | |
1376 | for class_ in (mailbox.Message, mailbox.MaildirMessage,\r | |
1377 | mailbox.mboxMessage, mailbox.MHMessage,\r | |
1378 | mailbox.BabylMessage, mailbox.MMDFMessage):\r | |
1379 | self.assertRaises(TypeError, lambda: class_(False))\r | |
1380 | \r | |
1381 | def test_maildir_to_maildir(self):\r | |
1382 | # Convert MaildirMessage to MaildirMessage\r | |
1383 | msg_maildir = mailbox.MaildirMessage(_sample_message)\r | |
1384 | msg_maildir.set_flags('DFPRST')\r | |
1385 | msg_maildir.set_subdir('cur')\r | |
1386 | date = msg_maildir.get_date()\r | |
1387 | msg = mailbox.MaildirMessage(msg_maildir)\r | |
1388 | self._check_sample(msg)\r | |
1389 | self.assertEqual(msg.get_flags(), 'DFPRST')\r | |
1390 | self.assertEqual(msg.get_subdir(), 'cur')\r | |
1391 | self.assertEqual(msg.get_date(), date)\r | |
1392 | \r | |
1393 | def test_maildir_to_mboxmmdf(self):\r | |
1394 | # Convert MaildirMessage to mboxmessage and MMDFMessage\r | |
1395 | pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),\r | |
1396 | ('T', 'D'), ('DFPRST', 'RDFA'))\r | |
1397 | for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):\r | |
1398 | msg_maildir = mailbox.MaildirMessage(_sample_message)\r | |
1399 | msg_maildir.set_date(0.0)\r | |
1400 | for setting, result in pairs:\r | |
1401 | msg_maildir.set_flags(setting)\r | |
1402 | msg = class_(msg_maildir)\r | |
1403 | self.assertEqual(msg.get_flags(), result)\r | |
1404 | self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' %\r | |
1405 | time.asctime(time.gmtime(0.0)))\r | |
1406 | msg_maildir.set_subdir('cur')\r | |
1407 | self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA')\r | |
1408 | \r | |
1409 | def test_maildir_to_mh(self):\r | |
1410 | # Convert MaildirMessage to MHMessage\r | |
1411 | msg_maildir = mailbox.MaildirMessage(_sample_message)\r | |
1412 | pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']),\r | |
1413 | ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),\r | |
1414 | ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))\r | |
1415 | for setting, result in pairs:\r | |
1416 | msg_maildir.set_flags(setting)\r | |
1417 | self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(),\r | |
1418 | result)\r | |
1419 | \r | |
1420 | def test_maildir_to_babyl(self):\r | |
1421 | # Convert MaildirMessage to Babyl\r | |
1422 | msg_maildir = mailbox.MaildirMessage(_sample_message)\r | |
1423 | pairs = (('D', ['unseen']), ('F', ['unseen']),\r | |
1424 | ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),\r | |
1425 | ('S', []), ('T', ['unseen', 'deleted']),\r | |
1426 | ('DFPRST', ['deleted', 'answered', 'forwarded']))\r | |
1427 | for setting, result in pairs:\r | |
1428 | msg_maildir.set_flags(setting)\r | |
1429 | self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(),\r | |
1430 | result)\r | |
1431 | \r | |
1432 | def test_mboxmmdf_to_maildir(self):\r | |
1433 | # Convert mboxMessage and MMDFMessage to MaildirMessage\r | |
1434 | for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):\r | |
1435 | msg_mboxMMDF = class_(_sample_message)\r | |
1436 | msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0))\r | |
1437 | pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),\r | |
1438 | ('RODFA', 'FRST'))\r | |
1439 | for setting, result in pairs:\r | |
1440 | msg_mboxMMDF.set_flags(setting)\r | |
1441 | msg = mailbox.MaildirMessage(msg_mboxMMDF)\r | |
1442 | self.assertEqual(msg.get_flags(), result)\r | |
1443 | self.assertEqual(msg.get_date(), 0.0)\r | |
1444 | msg_mboxMMDF.set_flags('O')\r | |
1445 | self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(),\r | |
1446 | 'cur')\r | |
1447 | \r | |
1448 | def test_mboxmmdf_to_mboxmmdf(self):\r | |
1449 | # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage\r | |
1450 | for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):\r | |
1451 | msg_mboxMMDF = class_(_sample_message)\r | |
1452 | msg_mboxMMDF.set_flags('RODFA')\r | |
1453 | msg_mboxMMDF.set_from('foo@bar')\r | |
1454 | for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage):\r | |
1455 | msg2 = class2_(msg_mboxMMDF)\r | |
1456 | self.assertEqual(msg2.get_flags(), 'RODFA')\r | |
1457 | self.assertEqual(msg2.get_from(), 'foo@bar')\r | |
1458 | \r | |
1459 | def test_mboxmmdf_to_mh(self):\r | |
1460 | # Convert mboxMessage and MMDFMessage to MHMessage\r | |
1461 | for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):\r | |
1462 | msg_mboxMMDF = class_(_sample_message)\r | |
1463 | pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']),\r | |
1464 | ('F', ['unseen', 'flagged']),\r | |
1465 | ('A', ['unseen', 'replied']),\r | |
1466 | ('RODFA', ['replied', 'flagged']))\r | |
1467 | for setting, result in pairs:\r | |
1468 | msg_mboxMMDF.set_flags(setting)\r | |
1469 | self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(),\r | |
1470 | result)\r | |
1471 | \r | |
1472 | def test_mboxmmdf_to_babyl(self):\r | |
1473 | # Convert mboxMessage and MMDFMessage to BabylMessage\r | |
1474 | for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):\r | |
1475 | msg = class_(_sample_message)\r | |
1476 | pairs = (('R', []), ('O', ['unseen']),\r | |
1477 | ('D', ['unseen', 'deleted']), ('F', ['unseen']),\r | |
1478 | ('A', ['unseen', 'answered']),\r | |
1479 | ('RODFA', ['deleted', 'answered']))\r | |
1480 | for setting, result in pairs:\r | |
1481 | msg.set_flags(setting)\r | |
1482 | self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)\r | |
1483 | \r | |
1484 | def test_mh_to_maildir(self):\r | |
1485 | # Convert MHMessage to MaildirMessage\r | |
1486 | pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))\r | |
1487 | for setting, result in pairs:\r | |
1488 | msg = mailbox.MHMessage(_sample_message)\r | |
1489 | msg.add_sequence(setting)\r | |
1490 | self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)\r | |
1491 | self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')\r | |
1492 | msg = mailbox.MHMessage(_sample_message)\r | |
1493 | msg.add_sequence('unseen')\r | |
1494 | msg.add_sequence('replied')\r | |
1495 | msg.add_sequence('flagged')\r | |
1496 | self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR')\r | |
1497 | self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')\r | |
1498 | \r | |
1499 | def test_mh_to_mboxmmdf(self):\r | |
1500 | # Convert MHMessage to mboxMessage and MMDFMessage\r | |
1501 | pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))\r | |
1502 | for setting, result in pairs:\r | |
1503 | msg = mailbox.MHMessage(_sample_message)\r | |
1504 | msg.add_sequence(setting)\r | |
1505 | for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):\r | |
1506 | self.assertEqual(class_(msg).get_flags(), result)\r | |
1507 | msg = mailbox.MHMessage(_sample_message)\r | |
1508 | msg.add_sequence('unseen')\r | |
1509 | msg.add_sequence('replied')\r | |
1510 | msg.add_sequence('flagged')\r | |
1511 | for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):\r | |
1512 | self.assertEqual(class_(msg).get_flags(), 'OFA')\r | |
1513 | \r | |
1514 | def test_mh_to_mh(self):\r | |
1515 | # Convert MHMessage to MHMessage\r | |
1516 | msg = mailbox.MHMessage(_sample_message)\r | |
1517 | msg.add_sequence('unseen')\r | |
1518 | msg.add_sequence('replied')\r | |
1519 | msg.add_sequence('flagged')\r | |
1520 | self.assertEqual(mailbox.MHMessage(msg).get_sequences(),\r | |
1521 | ['unseen', 'replied', 'flagged'])\r | |
1522 | \r | |
1523 | def test_mh_to_babyl(self):\r | |
1524 | # Convert MHMessage to BabylMessage\r | |
1525 | pairs = (('unseen', ['unseen']), ('replied', ['answered']),\r | |
1526 | ('flagged', []))\r | |
1527 | for setting, result in pairs:\r | |
1528 | msg = mailbox.MHMessage(_sample_message)\r | |
1529 | msg.add_sequence(setting)\r | |
1530 | self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)\r | |
1531 | msg = mailbox.MHMessage(_sample_message)\r | |
1532 | msg.add_sequence('unseen')\r | |
1533 | msg.add_sequence('replied')\r | |
1534 | msg.add_sequence('flagged')\r | |
1535 | self.assertEqual(mailbox.BabylMessage(msg).get_labels(),\r | |
1536 | ['unseen', 'answered'])\r | |
1537 | \r | |
1538 | def test_babyl_to_maildir(self):\r | |
1539 | # Convert BabylMessage to MaildirMessage\r | |
1540 | pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),\r | |
1541 | ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),\r | |
1542 | ('resent', 'PS'))\r | |
1543 | for setting, result in pairs:\r | |
1544 | msg = mailbox.BabylMessage(_sample_message)\r | |
1545 | msg.add_label(setting)\r | |
1546 | self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)\r | |
1547 | self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')\r | |
1548 | msg = mailbox.BabylMessage(_sample_message)\r | |
1549 | for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',\r | |
1550 | 'edited', 'resent'):\r | |
1551 | msg.add_label(label)\r | |
1552 | self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT')\r | |
1553 | self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')\r | |
1554 | \r | |
1555 | def test_babyl_to_mboxmmdf(self):\r | |
1556 | # Convert BabylMessage to mboxMessage and MMDFMessage\r | |
1557 | pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),\r | |
1558 | ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),\r | |
1559 | ('resent', 'RO'))\r | |
1560 | for setting, result in pairs:\r | |
1561 | for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):\r | |
1562 | msg = mailbox.BabylMessage(_sample_message)\r | |
1563 | msg.add_label(setting)\r | |
1564 | self.assertEqual(class_(msg).get_flags(), result)\r | |
1565 | msg = mailbox.BabylMessage(_sample_message)\r | |
1566 | for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',\r | |
1567 | 'edited', 'resent'):\r | |
1568 | msg.add_label(label)\r | |
1569 | for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):\r | |
1570 | self.assertEqual(class_(msg).get_flags(), 'ODA')\r | |
1571 | \r | |
1572 | def test_babyl_to_mh(self):\r | |
1573 | # Convert BabylMessage to MHMessage\r | |
1574 | pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []),\r | |
1575 | ('answered', ['replied']), ('forwarded', []), ('edited', []),\r | |
1576 | ('resent', []))\r | |
1577 | for setting, result in pairs:\r | |
1578 | msg = mailbox.BabylMessage(_sample_message)\r | |
1579 | msg.add_label(setting)\r | |
1580 | self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result)\r | |
1581 | msg = mailbox.BabylMessage(_sample_message)\r | |
1582 | for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',\r | |
1583 | 'edited', 'resent'):\r | |
1584 | msg.add_label(label)\r | |
1585 | self.assertEqual(mailbox.MHMessage(msg).get_sequences(),\r | |
1586 | ['unseen', 'replied'])\r | |
1587 | \r | |
1588 | def test_babyl_to_babyl(self):\r | |
1589 | # Convert BabylMessage to BabylMessage\r | |
1590 | msg = mailbox.BabylMessage(_sample_message)\r | |
1591 | msg.update_visible()\r | |
1592 | for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',\r | |
1593 | 'edited', 'resent'):\r | |
1594 | msg.add_label(label)\r | |
1595 | msg2 = mailbox.BabylMessage(msg)\r | |
1596 | self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed',\r | |
1597 | 'answered', 'forwarded', 'edited',\r | |
1598 | 'resent'])\r | |
1599 | self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys())\r | |
1600 | for key in msg.get_visible().keys():\r | |
1601 | self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key])\r | |
1602 | \r | |
1603 | \r | |
1604 | class TestProxyFileBase(TestBase):\r | |
1605 | \r | |
1606 | def _test_read(self, proxy):\r | |
1607 | # Read by byte\r | |
1608 | proxy.seek(0)\r | |
1609 | self.assertEqual(proxy.read(), 'bar')\r | |
1610 | proxy.seek(1)\r | |
1611 | self.assertEqual(proxy.read(), 'ar')\r | |
1612 | proxy.seek(0)\r | |
1613 | self.assertEqual(proxy.read(2), 'ba')\r | |
1614 | proxy.seek(1)\r | |
1615 | self.assertEqual(proxy.read(-1), 'ar')\r | |
1616 | proxy.seek(2)\r | |
1617 | self.assertEqual(proxy.read(1000), 'r')\r | |
1618 | \r | |
1619 | def _test_readline(self, proxy):\r | |
1620 | # Read by line\r | |
1621 | proxy.seek(0)\r | |
1622 | self.assertEqual(proxy.readline(), 'foo' + os.linesep)\r | |
1623 | self.assertEqual(proxy.readline(), 'bar' + os.linesep)\r | |
1624 | self.assertEqual(proxy.readline(), 'fred' + os.linesep)\r | |
1625 | self.assertEqual(proxy.readline(), 'bob')\r | |
1626 | proxy.seek(2)\r | |
1627 | self.assertEqual(proxy.readline(), 'o' + os.linesep)\r | |
1628 | proxy.seek(6 + 2 * len(os.linesep))\r | |
1629 | self.assertEqual(proxy.readline(), 'fred' + os.linesep)\r | |
1630 | proxy.seek(6 + 2 * len(os.linesep))\r | |
1631 | self.assertEqual(proxy.readline(2), 'fr')\r | |
1632 | self.assertEqual(proxy.readline(-10), 'ed' + os.linesep)\r | |
1633 | \r | |
1634 | def _test_readlines(self, proxy):\r | |
1635 | # Read multiple lines\r | |
1636 | proxy.seek(0)\r | |
1637 | self.assertEqual(proxy.readlines(), ['foo' + os.linesep,\r | |
1638 | 'bar' + os.linesep,\r | |
1639 | 'fred' + os.linesep, 'bob'])\r | |
1640 | proxy.seek(0)\r | |
1641 | self.assertEqual(proxy.readlines(2), ['foo' + os.linesep])\r | |
1642 | proxy.seek(3 + len(os.linesep))\r | |
1643 | self.assertEqual(proxy.readlines(4 + len(os.linesep)),\r | |
1644 | ['bar' + os.linesep, 'fred' + os.linesep])\r | |
1645 | proxy.seek(3)\r | |
1646 | self.assertEqual(proxy.readlines(1000), [os.linesep, 'bar' + os.linesep,\r | |
1647 | 'fred' + os.linesep, 'bob'])\r | |
1648 | \r | |
1649 | def _test_iteration(self, proxy):\r | |
1650 | # Iterate by line\r | |
1651 | proxy.seek(0)\r | |
1652 | iterator = iter(proxy)\r | |
1653 | self.assertEqual(list(iterator),\r | |
1654 | ['foo' + os.linesep, 'bar' + os.linesep, 'fred' + os.linesep, 'bob'])\r | |
1655 | \r | |
1656 | def _test_seek_and_tell(self, proxy):\r | |
1657 | # Seek and use tell to check position\r | |
1658 | proxy.seek(3)\r | |
1659 | self.assertEqual(proxy.tell(), 3)\r | |
1660 | self.assertEqual(proxy.read(len(os.linesep)), os.linesep)\r | |
1661 | proxy.seek(2, 1)\r | |
1662 | self.assertEqual(proxy.read(1 + len(os.linesep)), 'r' + os.linesep)\r | |
1663 | proxy.seek(-3 - len(os.linesep), 2)\r | |
1664 | self.assertEqual(proxy.read(3), 'bar')\r | |
1665 | proxy.seek(2, 0)\r | |
1666 | self.assertEqual(proxy.read(), 'o' + os.linesep + 'bar' + os.linesep)\r | |
1667 | proxy.seek(100)\r | |
1668 | self.assertEqual(proxy.read(), '')\r | |
1669 | \r | |
1670 | def _test_close(self, proxy):\r | |
1671 | # Close a file\r | |
1672 | proxy.close()\r | |
1673 | self.assertRaises(AttributeError, lambda: proxy.close())\r | |
1674 | \r | |
1675 | \r | |
1676 | class TestProxyFile(TestProxyFileBase):\r | |
1677 | \r | |
1678 | def setUp(self):\r | |
1679 | self._path = test_support.TESTFN\r | |
1680 | self._file = open(self._path, 'wb+')\r | |
1681 | \r | |
1682 | def tearDown(self):\r | |
1683 | self._file.close()\r | |
1684 | self._delete_recursively(self._path)\r | |
1685 | \r | |
1686 | def test_initialize(self):\r | |
1687 | # Initialize and check position\r | |
1688 | self._file.write('foo')\r | |
1689 | pos = self._file.tell()\r | |
1690 | proxy0 = mailbox._ProxyFile(self._file)\r | |
1691 | self.assertEqual(proxy0.tell(), pos)\r | |
1692 | self.assertEqual(self._file.tell(), pos)\r | |
1693 | proxy1 = mailbox._ProxyFile(self._file, 0)\r | |
1694 | self.assertEqual(proxy1.tell(), 0)\r | |
1695 | self.assertEqual(self._file.tell(), pos)\r | |
1696 | \r | |
1697 | def test_read(self):\r | |
1698 | self._file.write('bar')\r | |
1699 | self._test_read(mailbox._ProxyFile(self._file))\r | |
1700 | \r | |
1701 | def test_readline(self):\r | |
1702 | self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,\r | |
1703 | os.linesep))\r | |
1704 | self._test_readline(mailbox._ProxyFile(self._file))\r | |
1705 | \r | |
1706 | def test_readlines(self):\r | |
1707 | self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,\r | |
1708 | os.linesep))\r | |
1709 | self._test_readlines(mailbox._ProxyFile(self._file))\r | |
1710 | \r | |
1711 | def test_iteration(self):\r | |
1712 | self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,\r | |
1713 | os.linesep))\r | |
1714 | self._test_iteration(mailbox._ProxyFile(self._file))\r | |
1715 | \r | |
1716 | def test_seek_and_tell(self):\r | |
1717 | self._file.write('foo%sbar%s' % (os.linesep, os.linesep))\r | |
1718 | self._test_seek_and_tell(mailbox._ProxyFile(self._file))\r | |
1719 | \r | |
1720 | def test_close(self):\r | |
1721 | self._file.write('foo%sbar%s' % (os.linesep, os.linesep))\r | |
1722 | self._test_close(mailbox._ProxyFile(self._file))\r | |
1723 | \r | |
1724 | \r | |
1725 | class TestPartialFile(TestProxyFileBase):\r | |
1726 | \r | |
1727 | def setUp(self):\r | |
1728 | self._path = test_support.TESTFN\r | |
1729 | self._file = open(self._path, 'wb+')\r | |
1730 | \r | |
1731 | def tearDown(self):\r | |
1732 | self._file.close()\r | |
1733 | self._delete_recursively(self._path)\r | |
1734 | \r | |
1735 | def test_initialize(self):\r | |
1736 | # Initialize and check position\r | |
1737 | self._file.write('foo' + os.linesep + 'bar')\r | |
1738 | pos = self._file.tell()\r | |
1739 | proxy = mailbox._PartialFile(self._file, 2, 5)\r | |
1740 | self.assertEqual(proxy.tell(), 0)\r | |
1741 | self.assertEqual(self._file.tell(), pos)\r | |
1742 | \r | |
1743 | def test_read(self):\r | |
1744 | self._file.write('***bar***')\r | |
1745 | self._test_read(mailbox._PartialFile(self._file, 3, 6))\r | |
1746 | \r | |
1747 | def test_readline(self):\r | |
1748 | self._file.write('!!!!!foo%sbar%sfred%sbob!!!!!' %\r | |
1749 | (os.linesep, os.linesep, os.linesep))\r | |
1750 | self._test_readline(mailbox._PartialFile(self._file, 5,\r | |
1751 | 18 + 3 * len(os.linesep)))\r | |
1752 | \r | |
1753 | def test_readlines(self):\r | |
1754 | self._file.write('foo%sbar%sfred%sbob?????' %\r | |
1755 | (os.linesep, os.linesep, os.linesep))\r | |
1756 | self._test_readlines(mailbox._PartialFile(self._file, 0,\r | |
1757 | 13 + 3 * len(os.linesep)))\r | |
1758 | \r | |
1759 | def test_iteration(self):\r | |
1760 | self._file.write('____foo%sbar%sfred%sbob####' %\r | |
1761 | (os.linesep, os.linesep, os.linesep))\r | |
1762 | self._test_iteration(mailbox._PartialFile(self._file, 4,\r | |
1763 | 17 + 3 * len(os.linesep)))\r | |
1764 | \r | |
1765 | def test_seek_and_tell(self):\r | |
1766 | self._file.write('(((foo%sbar%s$$$' % (os.linesep, os.linesep))\r | |
1767 | self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,\r | |
1768 | 9 + 2 * len(os.linesep)))\r | |
1769 | \r | |
1770 | def test_close(self):\r | |
1771 | self._file.write('&foo%sbar%s^' % (os.linesep, os.linesep))\r | |
1772 | self._test_close(mailbox._PartialFile(self._file, 1,\r | |
1773 | 6 + 3 * len(os.linesep)))\r | |
1774 | \r | |
1775 | \r | |
1776 | ## Start: tests from the original module (for backward compatibility).\r | |
1777 | \r | |
1778 | FROM_ = "From some.body@dummy.domain Sat Jul 24 13:43:35 2004\n"\r | |
1779 | DUMMY_MESSAGE = """\\r | |
1780 | From: some.body@dummy.domain\r | |
1781 | To: me@my.domain\r | |
1782 | Subject: Simple Test\r | |
1783 | \r | |
1784 | This is a dummy message.\r | |
1785 | """\r | |
1786 | \r | |
1787 | class MaildirTestCase(unittest.TestCase):\r | |
1788 | \r | |
1789 | def setUp(self):\r | |
1790 | # create a new maildir mailbox to work with:\r | |
1791 | self._dir = test_support.TESTFN\r | |
1792 | os.mkdir(self._dir)\r | |
1793 | os.mkdir(os.path.join(self._dir, "cur"))\r | |
1794 | os.mkdir(os.path.join(self._dir, "tmp"))\r | |
1795 | os.mkdir(os.path.join(self._dir, "new"))\r | |
1796 | self._counter = 1\r | |
1797 | self._msgfiles = []\r | |
1798 | \r | |
1799 | def tearDown(self):\r | |
1800 | map(os.unlink, self._msgfiles)\r | |
1801 | os.rmdir(os.path.join(self._dir, "cur"))\r | |
1802 | os.rmdir(os.path.join(self._dir, "tmp"))\r | |
1803 | os.rmdir(os.path.join(self._dir, "new"))\r | |
1804 | os.rmdir(self._dir)\r | |
1805 | \r | |
1806 | def createMessage(self, dir, mbox=False):\r | |
1807 | t = int(time.time() % 1000000)\r | |
1808 | pid = self._counter\r | |
1809 | self._counter += 1\r | |
1810 | filename = os.extsep.join((str(t), str(pid), "myhostname", "mydomain"))\r | |
1811 | tmpname = os.path.join(self._dir, "tmp", filename)\r | |
1812 | newname = os.path.join(self._dir, dir, filename)\r | |
1813 | with open(tmpname, "w") as fp:\r | |
1814 | self._msgfiles.append(tmpname)\r | |
1815 | if mbox:\r | |
1816 | fp.write(FROM_)\r | |
1817 | fp.write(DUMMY_MESSAGE)\r | |
1818 | if hasattr(os, "link"):\r | |
1819 | os.link(tmpname, newname)\r | |
1820 | else:\r | |
1821 | with open(newname, "w") as fp:\r | |
1822 | fp.write(DUMMY_MESSAGE)\r | |
1823 | self._msgfiles.append(newname)\r | |
1824 | return tmpname\r | |
1825 | \r | |
1826 | def test_empty_maildir(self):\r | |
1827 | """Test an empty maildir mailbox"""\r | |
1828 | # Test for regression on bug #117490:\r | |
1829 | # Make sure the boxes attribute actually gets set.\r | |
1830 | self.mbox = mailbox.Maildir(test_support.TESTFN)\r | |
1831 | #self.assertTrue(hasattr(self.mbox, "boxes"))\r | |
1832 | #self.assertTrue(len(self.mbox.boxes) == 0)\r | |
1833 | self.assertIs(self.mbox.next(), None)\r | |
1834 | self.assertIs(self.mbox.next(), None)\r | |
1835 | \r | |
1836 | def test_nonempty_maildir_cur(self):\r | |
1837 | self.createMessage("cur")\r | |
1838 | self.mbox = mailbox.Maildir(test_support.TESTFN)\r | |
1839 | #self.assertTrue(len(self.mbox.boxes) == 1)\r | |
1840 | self.assertIsNot(self.mbox.next(), None)\r | |
1841 | self.assertIs(self.mbox.next(), None)\r | |
1842 | self.assertIs(self.mbox.next(), None)\r | |
1843 | \r | |
1844 | def test_nonempty_maildir_new(self):\r | |
1845 | self.createMessage("new")\r | |
1846 | self.mbox = mailbox.Maildir(test_support.TESTFN)\r | |
1847 | #self.assertTrue(len(self.mbox.boxes) == 1)\r | |
1848 | self.assertIsNot(self.mbox.next(), None)\r | |
1849 | self.assertIs(self.mbox.next(), None)\r | |
1850 | self.assertIs(self.mbox.next(), None)\r | |
1851 | \r | |
1852 | def test_nonempty_maildir_both(self):\r | |
1853 | self.createMessage("cur")\r | |
1854 | self.createMessage("new")\r | |
1855 | self.mbox = mailbox.Maildir(test_support.TESTFN)\r | |
1856 | #self.assertTrue(len(self.mbox.boxes) == 2)\r | |
1857 | self.assertIsNot(self.mbox.next(), None)\r | |
1858 | self.assertIsNot(self.mbox.next(), None)\r | |
1859 | self.assertIs(self.mbox.next(), None)\r | |
1860 | self.assertIs(self.mbox.next(), None)\r | |
1861 | \r | |
1862 | def test_unix_mbox(self):\r | |
1863 | ### should be better!\r | |
1864 | import email.parser\r | |
1865 | fname = self.createMessage("cur", True)\r | |
1866 | n = 0\r | |
1867 | for msg in mailbox.PortableUnixMailbox(open(fname),\r | |
1868 | email.parser.Parser().parse):\r | |
1869 | n += 1\r | |
1870 | self.assertEqual(msg["subject"], "Simple Test")\r | |
1871 | self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE))\r | |
1872 | self.assertEqual(n, 1)\r | |
1873 | \r | |
1874 | ## End: classes from the original module (for backward compatibility).\r | |
1875 | \r | |
1876 | \r | |
1877 | _sample_message = """\\r | |
1878 | Return-Path: <gkj@gregorykjohnson.com>\r | |
1879 | X-Original-To: gkj+person@localhost\r | |
1880 | Delivered-To: gkj+person@localhost\r | |
1881 | Received: from localhost (localhost [127.0.0.1])\r | |
1882 | by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17\r | |
1883 | for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)\r | |
1884 | Delivered-To: gkj@sundance.gregorykjohnson.com\r | |
1885 | Received: from localhost [127.0.0.1]\r | |
1886 | by localhost with POP3 (fetchmail-6.2.5)\r | |
1887 | for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)\r | |
1888 | Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])\r | |
1889 | by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746\r | |
1890 | for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)\r | |
1891 | Received: by andy.gregorykjohnson.com (Postfix, from userid 1000)\r | |
1892 | id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)\r | |
1893 | Date: Wed, 13 Jul 2005 17:23:11 -0400\r | |
1894 | From: "Gregory K. Johnson" <gkj@gregorykjohnson.com>\r | |
1895 | To: gkj@gregorykjohnson.com\r | |
1896 | Subject: Sample message\r | |
1897 | Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com>\r | |
1898 | Mime-Version: 1.0\r | |
1899 | Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"\r | |
1900 | Content-Disposition: inline\r | |
1901 | User-Agent: Mutt/1.5.9i\r | |
1902 | \r | |
1903 | \r | |
1904 | --NMuMz9nt05w80d4+\r | |
1905 | Content-Type: text/plain; charset=us-ascii\r | |
1906 | Content-Disposition: inline\r | |
1907 | \r | |
1908 | This is a sample message.\r | |
1909 | \r | |
1910 | --\r | |
1911 | Gregory K. Johnson\r | |
1912 | \r | |
1913 | --NMuMz9nt05w80d4+\r | |
1914 | Content-Type: application/octet-stream\r | |
1915 | Content-Disposition: attachment; filename="text.gz"\r | |
1916 | Content-Transfer-Encoding: base64\r | |
1917 | \r | |
1918 | H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs\r | |
1919 | 3FYlAAAA\r | |
1920 | \r | |
1921 | --NMuMz9nt05w80d4+--\r | |
1922 | """\r | |
1923 | \r | |
1924 | _sample_headers = {\r | |
1925 | "Return-Path":"<gkj@gregorykjohnson.com>",\r | |
1926 | "X-Original-To":"gkj+person@localhost",\r | |
1927 | "Delivered-To":"gkj+person@localhost",\r | |
1928 | "Received":"""from localhost (localhost [127.0.0.1])\r | |
1929 | by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17\r | |
1930 | for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",\r | |
1931 | "Delivered-To":"gkj@sundance.gregorykjohnson.com",\r | |
1932 | "Received":"""from localhost [127.0.0.1]\r | |
1933 | by localhost with POP3 (fetchmail-6.2.5)\r | |
1934 | for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",\r | |
1935 | "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])\r | |
1936 | by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746\r | |
1937 | for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",\r | |
1938 | "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)\r | |
1939 | id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",\r | |
1940 | "Date":"Wed, 13 Jul 2005 17:23:11 -0400",\r | |
1941 | "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""",\r | |
1942 | "To":"gkj@gregorykjohnson.com",\r | |
1943 | "Subject":"Sample message",\r | |
1944 | "Mime-Version":"1.0",\r | |
1945 | "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",\r | |
1946 | "Content-Disposition":"inline",\r | |
1947 | "User-Agent": "Mutt/1.5.9i" }\r | |
1948 | \r | |
1949 | _sample_payloads = ("""This is a sample message.\r | |
1950 | \r | |
1951 | --\r | |
1952 | Gregory K. Johnson\r | |
1953 | """,\r | |
1954 | """H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs\r | |
1955 | 3FYlAAAA\r | |
1956 | """)\r | |
1957 | \r | |
1958 | \r | |
1959 | def test_main():\r | |
1960 | tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH,\r | |
1961 | TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage,\r | |
1962 | TestMHMessage, TestBabylMessage, TestMMDFMessage,\r | |
1963 | TestMessageConversion, TestProxyFile, TestPartialFile,\r | |
1964 | MaildirTestCase)\r | |
1965 | test_support.run_unittest(*tests)\r | |
1966 | test_support.reap_children()\r | |
1967 | \r | |
1968 | \r | |
1969 | if __name__ == '__main__':\r | |
1970 | test_main()\r |