]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | # -*- coding: iso-8859-1 -*-\r |
2 | # Copyright (C) 2001,2002 Python Software Foundation\r | |
3 | # csv package unit tests\r | |
4 | \r | |
5 | import sys\r | |
6 | import os\r | |
7 | import unittest\r | |
8 | from StringIO import StringIO\r | |
9 | import tempfile\r | |
10 | import csv\r | |
11 | import gc\r | |
12 | import io\r | |
13 | from test import test_support\r | |
14 | \r | |
15 | class Test_Csv(unittest.TestCase):\r | |
16 | """\r | |
17 | Test the underlying C csv parser in ways that are not appropriate\r | |
18 | from the high level interface. Further tests of this nature are done\r | |
19 | in TestDialectRegistry.\r | |
20 | """\r | |
21 | def _test_arg_valid(self, ctor, arg):\r | |
22 | self.assertRaises(TypeError, ctor)\r | |
23 | self.assertRaises(TypeError, ctor, None)\r | |
24 | self.assertRaises(TypeError, ctor, arg, bad_attr = 0)\r | |
25 | self.assertRaises(TypeError, ctor, arg, delimiter = 0)\r | |
26 | self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')\r | |
27 | self.assertRaises(csv.Error, ctor, arg, 'foo')\r | |
28 | self.assertRaises(TypeError, ctor, arg, delimiter=None)\r | |
29 | self.assertRaises(TypeError, ctor, arg, delimiter=1)\r | |
30 | self.assertRaises(TypeError, ctor, arg, quotechar=1)\r | |
31 | self.assertRaises(TypeError, ctor, arg, lineterminator=None)\r | |
32 | self.assertRaises(TypeError, ctor, arg, lineterminator=1)\r | |
33 | self.assertRaises(TypeError, ctor, arg, quoting=None)\r | |
34 | self.assertRaises(TypeError, ctor, arg,\r | |
35 | quoting=csv.QUOTE_ALL, quotechar='')\r | |
36 | self.assertRaises(TypeError, ctor, arg,\r | |
37 | quoting=csv.QUOTE_ALL, quotechar=None)\r | |
38 | \r | |
39 | def test_reader_arg_valid(self):\r | |
40 | self._test_arg_valid(csv.reader, [])\r | |
41 | \r | |
42 | def test_writer_arg_valid(self):\r | |
43 | self._test_arg_valid(csv.writer, StringIO())\r | |
44 | \r | |
45 | def _test_default_attrs(self, ctor, *args):\r | |
46 | obj = ctor(*args)\r | |
47 | # Check defaults\r | |
48 | self.assertEqual(obj.dialect.delimiter, ',')\r | |
49 | self.assertEqual(obj.dialect.doublequote, True)\r | |
50 | self.assertEqual(obj.dialect.escapechar, None)\r | |
51 | self.assertEqual(obj.dialect.lineterminator, "\r\n")\r | |
52 | self.assertEqual(obj.dialect.quotechar, '"')\r | |
53 | self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)\r | |
54 | self.assertEqual(obj.dialect.skipinitialspace, False)\r | |
55 | self.assertEqual(obj.dialect.strict, False)\r | |
56 | # Try deleting or changing attributes (they are read-only)\r | |
57 | self.assertRaises(TypeError, delattr, obj.dialect, 'delimiter')\r | |
58 | self.assertRaises(TypeError, setattr, obj.dialect, 'delimiter', ':')\r | |
59 | self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')\r | |
60 | self.assertRaises(AttributeError, setattr, obj.dialect,\r | |
61 | 'quoting', None)\r | |
62 | \r | |
63 | def test_reader_attrs(self):\r | |
64 | self._test_default_attrs(csv.reader, [])\r | |
65 | \r | |
66 | def test_writer_attrs(self):\r | |
67 | self._test_default_attrs(csv.writer, StringIO())\r | |
68 | \r | |
69 | def _test_kw_attrs(self, ctor, *args):\r | |
70 | # Now try with alternate options\r | |
71 | kwargs = dict(delimiter=':', doublequote=False, escapechar='\\',\r | |
72 | lineterminator='\r', quotechar='*',\r | |
73 | quoting=csv.QUOTE_NONE, skipinitialspace=True,\r | |
74 | strict=True)\r | |
75 | obj = ctor(*args, **kwargs)\r | |
76 | self.assertEqual(obj.dialect.delimiter, ':')\r | |
77 | self.assertEqual(obj.dialect.doublequote, False)\r | |
78 | self.assertEqual(obj.dialect.escapechar, '\\')\r | |
79 | self.assertEqual(obj.dialect.lineterminator, "\r")\r | |
80 | self.assertEqual(obj.dialect.quotechar, '*')\r | |
81 | self.assertEqual(obj.dialect.quoting, csv.QUOTE_NONE)\r | |
82 | self.assertEqual(obj.dialect.skipinitialspace, True)\r | |
83 | self.assertEqual(obj.dialect.strict, True)\r | |
84 | \r | |
85 | def test_reader_kw_attrs(self):\r | |
86 | self._test_kw_attrs(csv.reader, [])\r | |
87 | \r | |
88 | def test_writer_kw_attrs(self):\r | |
89 | self._test_kw_attrs(csv.writer, StringIO())\r | |
90 | \r | |
91 | def _test_dialect_attrs(self, ctor, *args):\r | |
92 | # Now try with dialect-derived options\r | |
93 | class dialect:\r | |
94 | delimiter='-'\r | |
95 | doublequote=False\r | |
96 | escapechar='^'\r | |
97 | lineterminator='$'\r | |
98 | quotechar='#'\r | |
99 | quoting=csv.QUOTE_ALL\r | |
100 | skipinitialspace=True\r | |
101 | strict=False\r | |
102 | args = args + (dialect,)\r | |
103 | obj = ctor(*args)\r | |
104 | self.assertEqual(obj.dialect.delimiter, '-')\r | |
105 | self.assertEqual(obj.dialect.doublequote, False)\r | |
106 | self.assertEqual(obj.dialect.escapechar, '^')\r | |
107 | self.assertEqual(obj.dialect.lineterminator, "$")\r | |
108 | self.assertEqual(obj.dialect.quotechar, '#')\r | |
109 | self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)\r | |
110 | self.assertEqual(obj.dialect.skipinitialspace, True)\r | |
111 | self.assertEqual(obj.dialect.strict, False)\r | |
112 | \r | |
113 | def test_reader_dialect_attrs(self):\r | |
114 | self._test_dialect_attrs(csv.reader, [])\r | |
115 | \r | |
116 | def test_writer_dialect_attrs(self):\r | |
117 | self._test_dialect_attrs(csv.writer, StringIO())\r | |
118 | \r | |
119 | \r | |
120 | def _write_test(self, fields, expect, **kwargs):\r | |
121 | fd, name = tempfile.mkstemp()\r | |
122 | fileobj = os.fdopen(fd, "w+b")\r | |
123 | try:\r | |
124 | writer = csv.writer(fileobj, **kwargs)\r | |
125 | writer.writerow(fields)\r | |
126 | fileobj.seek(0)\r | |
127 | self.assertEqual(fileobj.read(),\r | |
128 | expect + writer.dialect.lineterminator)\r | |
129 | finally:\r | |
130 | fileobj.close()\r | |
131 | os.unlink(name)\r | |
132 | \r | |
133 | def test_write_arg_valid(self):\r | |
134 | self.assertRaises(csv.Error, self._write_test, None, '')\r | |
135 | self._write_test((), '')\r | |
136 | self._write_test([None], '""')\r | |
137 | self.assertRaises(csv.Error, self._write_test,\r | |
138 | [None], None, quoting = csv.QUOTE_NONE)\r | |
139 | # Check that exceptions are passed up the chain\r | |
140 | class BadList:\r | |
141 | def __len__(self):\r | |
142 | return 10;\r | |
143 | def __getitem__(self, i):\r | |
144 | if i > 2:\r | |
145 | raise IOError\r | |
146 | self.assertRaises(IOError, self._write_test, BadList(), '')\r | |
147 | class BadItem:\r | |
148 | def __str__(self):\r | |
149 | raise IOError\r | |
150 | self.assertRaises(IOError, self._write_test, [BadItem()], '')\r | |
151 | \r | |
152 | def test_write_bigfield(self):\r | |
153 | # This exercises the buffer realloc functionality\r | |
154 | bigstring = 'X' * 50000\r | |
155 | self._write_test([bigstring,bigstring], '%s,%s' % \\r | |
156 | (bigstring, bigstring))\r | |
157 | \r | |
158 | def test_write_quoting(self):\r | |
159 | self._write_test(['a',1,'p,q'], 'a,1,"p,q"')\r | |
160 | self.assertRaises(csv.Error,\r | |
161 | self._write_test,\r | |
162 | ['a',1,'p,q'], 'a,1,p,q',\r | |
163 | quoting = csv.QUOTE_NONE)\r | |
164 | self._write_test(['a',1,'p,q'], 'a,1,"p,q"',\r | |
165 | quoting = csv.QUOTE_MINIMAL)\r | |
166 | self._write_test(['a',1,'p,q'], '"a",1,"p,q"',\r | |
167 | quoting = csv.QUOTE_NONNUMERIC)\r | |
168 | self._write_test(['a',1,'p,q'], '"a","1","p,q"',\r | |
169 | quoting = csv.QUOTE_ALL)\r | |
170 | self._write_test(['a\nb',1], '"a\nb","1"',\r | |
171 | quoting = csv.QUOTE_ALL)\r | |
172 | \r | |
173 | def test_write_escape(self):\r | |
174 | self._write_test(['a',1,'p,q'], 'a,1,"p,q"',\r | |
175 | escapechar='\\')\r | |
176 | self.assertRaises(csv.Error,\r | |
177 | self._write_test,\r | |
178 | ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',\r | |
179 | escapechar=None, doublequote=False)\r | |
180 | self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',\r | |
181 | escapechar='\\', doublequote = False)\r | |
182 | self._write_test(['"'], '""""',\r | |
183 | escapechar='\\', quoting = csv.QUOTE_MINIMAL)\r | |
184 | self._write_test(['"'], '\\"',\r | |
185 | escapechar='\\', quoting = csv.QUOTE_MINIMAL,\r | |
186 | doublequote = False)\r | |
187 | self._write_test(['"'], '\\"',\r | |
188 | escapechar='\\', quoting = csv.QUOTE_NONE)\r | |
189 | self._write_test(['a',1,'p,q'], 'a,1,p\\,q',\r | |
190 | escapechar='\\', quoting = csv.QUOTE_NONE)\r | |
191 | \r | |
192 | def test_writerows(self):\r | |
193 | class BrokenFile:\r | |
194 | def write(self, buf):\r | |
195 | raise IOError\r | |
196 | writer = csv.writer(BrokenFile())\r | |
197 | self.assertRaises(IOError, writer.writerows, [['a']])\r | |
198 | fd, name = tempfile.mkstemp()\r | |
199 | fileobj = os.fdopen(fd, "w+b")\r | |
200 | try:\r | |
201 | writer = csv.writer(fileobj)\r | |
202 | self.assertRaises(TypeError, writer.writerows, None)\r | |
203 | writer.writerows([['a','b'],['c','d']])\r | |
204 | fileobj.seek(0)\r | |
205 | self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n")\r | |
206 | finally:\r | |
207 | fileobj.close()\r | |
208 | os.unlink(name)\r | |
209 | \r | |
210 | def _read_test(self, input, expect, **kwargs):\r | |
211 | reader = csv.reader(input, **kwargs)\r | |
212 | result = list(reader)\r | |
213 | self.assertEqual(result, expect)\r | |
214 | \r | |
215 | def test_read_oddinputs(self):\r | |
216 | self._read_test([], [])\r | |
217 | self._read_test([''], [[]])\r | |
218 | self.assertRaises(csv.Error, self._read_test,\r | |
219 | ['"ab"c'], None, strict = 1)\r | |
220 | # cannot handle null bytes for the moment\r | |
221 | self.assertRaises(csv.Error, self._read_test,\r | |
222 | ['ab\0c'], None, strict = 1)\r | |
223 | self._read_test(['"ab"c'], [['abc']], doublequote = 0)\r | |
224 | \r | |
225 | def test_read_eol(self):\r | |
226 | self._read_test(['a,b'], [['a','b']])\r | |
227 | self._read_test(['a,b\n'], [['a','b']])\r | |
228 | self._read_test(['a,b\r\n'], [['a','b']])\r | |
229 | self._read_test(['a,b\r'], [['a','b']])\r | |
230 | self.assertRaises(csv.Error, self._read_test, ['a,b\rc,d'], [])\r | |
231 | self.assertRaises(csv.Error, self._read_test, ['a,b\nc,d'], [])\r | |
232 | self.assertRaises(csv.Error, self._read_test, ['a,b\r\nc,d'], [])\r | |
233 | \r | |
234 | def test_read_escape(self):\r | |
235 | self._read_test(['a,\\b,c'], [['a', 'b', 'c']], escapechar='\\')\r | |
236 | self._read_test(['a,b\\,c'], [['a', 'b,c']], escapechar='\\')\r | |
237 | self._read_test(['a,"b\\,c"'], [['a', 'b,c']], escapechar='\\')\r | |
238 | self._read_test(['a,"b,\\c"'], [['a', 'b,c']], escapechar='\\')\r | |
239 | self._read_test(['a,"b,c\\""'], [['a', 'b,c"']], escapechar='\\')\r | |
240 | self._read_test(['a,"b,c"\\'], [['a', 'b,c\\']], escapechar='\\')\r | |
241 | \r | |
242 | def test_read_quoting(self):\r | |
243 | self._read_test(['1,",3,",5'], [['1', ',3,', '5']])\r | |
244 | self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],\r | |
245 | quotechar=None, escapechar='\\')\r | |
246 | self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],\r | |
247 | quoting=csv.QUOTE_NONE, escapechar='\\')\r | |
248 | # will this fail where locale uses comma for decimals?\r | |
249 | self._read_test([',3,"5",7.3, 9'], [['', 3, '5', 7.3, 9]],\r | |
250 | quoting=csv.QUOTE_NONNUMERIC)\r | |
251 | self._read_test(['"a\nb", 7'], [['a\nb', ' 7']])\r | |
252 | self.assertRaises(ValueError, self._read_test,\r | |
253 | ['abc,3'], [[]],\r | |
254 | quoting=csv.QUOTE_NONNUMERIC)\r | |
255 | \r | |
256 | def test_read_bigfield(self):\r | |
257 | # This exercises the buffer realloc functionality and field size\r | |
258 | # limits.\r | |
259 | limit = csv.field_size_limit()\r | |
260 | try:\r | |
261 | size = 50000\r | |
262 | bigstring = 'X' * size\r | |
263 | bigline = '%s,%s' % (bigstring, bigstring)\r | |
264 | self._read_test([bigline], [[bigstring, bigstring]])\r | |
265 | csv.field_size_limit(size)\r | |
266 | self._read_test([bigline], [[bigstring, bigstring]])\r | |
267 | self.assertEqual(csv.field_size_limit(), size)\r | |
268 | csv.field_size_limit(size-1)\r | |
269 | self.assertRaises(csv.Error, self._read_test, [bigline], [])\r | |
270 | self.assertRaises(TypeError, csv.field_size_limit, None)\r | |
271 | self.assertRaises(TypeError, csv.field_size_limit, 1, None)\r | |
272 | finally:\r | |
273 | csv.field_size_limit(limit)\r | |
274 | \r | |
275 | def test_read_linenum(self):\r | |
276 | for r in (csv.reader(['line,1', 'line,2', 'line,3']),\r | |
277 | csv.DictReader(['line,1', 'line,2', 'line,3'],\r | |
278 | fieldnames=['a', 'b', 'c'])):\r | |
279 | self.assertEqual(r.line_num, 0)\r | |
280 | r.next()\r | |
281 | self.assertEqual(r.line_num, 1)\r | |
282 | r.next()\r | |
283 | self.assertEqual(r.line_num, 2)\r | |
284 | r.next()\r | |
285 | self.assertEqual(r.line_num, 3)\r | |
286 | self.assertRaises(StopIteration, r.next)\r | |
287 | self.assertEqual(r.line_num, 3)\r | |
288 | \r | |
289 | def test_roundtrip_quoteed_newlines(self):\r | |
290 | fd, name = tempfile.mkstemp()\r | |
291 | fileobj = os.fdopen(fd, "w+b")\r | |
292 | try:\r | |
293 | writer = csv.writer(fileobj)\r | |
294 | self.assertRaises(TypeError, writer.writerows, None)\r | |
295 | rows = [['a\nb','b'],['c','x\r\nd']]\r | |
296 | writer.writerows(rows)\r | |
297 | fileobj.seek(0)\r | |
298 | for i, row in enumerate(csv.reader(fileobj)):\r | |
299 | self.assertEqual(row, rows[i])\r | |
300 | finally:\r | |
301 | fileobj.close()\r | |
302 | os.unlink(name)\r | |
303 | \r | |
304 | class TestDialectRegistry(unittest.TestCase):\r | |
305 | def test_registry_badargs(self):\r | |
306 | self.assertRaises(TypeError, csv.list_dialects, None)\r | |
307 | self.assertRaises(TypeError, csv.get_dialect)\r | |
308 | self.assertRaises(csv.Error, csv.get_dialect, None)\r | |
309 | self.assertRaises(csv.Error, csv.get_dialect, "nonesuch")\r | |
310 | self.assertRaises(TypeError, csv.unregister_dialect)\r | |
311 | self.assertRaises(csv.Error, csv.unregister_dialect, None)\r | |
312 | self.assertRaises(csv.Error, csv.unregister_dialect, "nonesuch")\r | |
313 | self.assertRaises(TypeError, csv.register_dialect, None)\r | |
314 | self.assertRaises(TypeError, csv.register_dialect, None, None)\r | |
315 | self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 0, 0)\r | |
316 | self.assertRaises(TypeError, csv.register_dialect, "nonesuch",\r | |
317 | badargument=None)\r | |
318 | self.assertRaises(TypeError, csv.register_dialect, "nonesuch",\r | |
319 | quoting=None)\r | |
320 | self.assertRaises(TypeError, csv.register_dialect, [])\r | |
321 | \r | |
322 | def test_registry(self):\r | |
323 | class myexceltsv(csv.excel):\r | |
324 | delimiter = "\t"\r | |
325 | name = "myexceltsv"\r | |
326 | expected_dialects = csv.list_dialects() + [name]\r | |
327 | expected_dialects.sort()\r | |
328 | csv.register_dialect(name, myexceltsv)\r | |
329 | self.addCleanup(csv.unregister_dialect, name)\r | |
330 | self.assertEqual(csv.get_dialect(name).delimiter, '\t')\r | |
331 | got_dialects = sorted(csv.list_dialects())\r | |
332 | self.assertEqual(expected_dialects, got_dialects)\r | |
333 | \r | |
334 | def test_register_kwargs(self):\r | |
335 | name = 'fedcba'\r | |
336 | csv.register_dialect(name, delimiter=';')\r | |
337 | self.addCleanup(csv.unregister_dialect, name)\r | |
338 | self.assertEqual(csv.get_dialect(name).delimiter, ';')\r | |
339 | self.assertEqual([['X', 'Y', 'Z']], list(csv.reader(['X;Y;Z'], name)))\r | |
340 | \r | |
341 | def test_incomplete_dialect(self):\r | |
342 | class myexceltsv(csv.Dialect):\r | |
343 | delimiter = "\t"\r | |
344 | self.assertRaises(csv.Error, myexceltsv)\r | |
345 | \r | |
346 | def test_space_dialect(self):\r | |
347 | class space(csv.excel):\r | |
348 | delimiter = " "\r | |
349 | quoting = csv.QUOTE_NONE\r | |
350 | escapechar = "\\"\r | |
351 | \r | |
352 | fd, name = tempfile.mkstemp()\r | |
353 | fileobj = os.fdopen(fd, "w+b")\r | |
354 | try:\r | |
355 | fileobj.write("abc def\nc1ccccc1 benzene\n")\r | |
356 | fileobj.seek(0)\r | |
357 | rdr = csv.reader(fileobj, dialect=space())\r | |
358 | self.assertEqual(rdr.next(), ["abc", "def"])\r | |
359 | self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])\r | |
360 | finally:\r | |
361 | fileobj.close()\r | |
362 | os.unlink(name)\r | |
363 | \r | |
364 | def test_dialect_apply(self):\r | |
365 | class testA(csv.excel):\r | |
366 | delimiter = "\t"\r | |
367 | class testB(csv.excel):\r | |
368 | delimiter = ":"\r | |
369 | class testC(csv.excel):\r | |
370 | delimiter = "|"\r | |
371 | \r | |
372 | csv.register_dialect('testC', testC)\r | |
373 | try:\r | |
374 | fd, name = tempfile.mkstemp()\r | |
375 | fileobj = os.fdopen(fd, "w+b")\r | |
376 | try:\r | |
377 | writer = csv.writer(fileobj)\r | |
378 | writer.writerow([1,2,3])\r | |
379 | fileobj.seek(0)\r | |
380 | self.assertEqual(fileobj.read(), "1,2,3\r\n")\r | |
381 | finally:\r | |
382 | fileobj.close()\r | |
383 | os.unlink(name)\r | |
384 | \r | |
385 | fd, name = tempfile.mkstemp()\r | |
386 | fileobj = os.fdopen(fd, "w+b")\r | |
387 | try:\r | |
388 | writer = csv.writer(fileobj, testA)\r | |
389 | writer.writerow([1,2,3])\r | |
390 | fileobj.seek(0)\r | |
391 | self.assertEqual(fileobj.read(), "1\t2\t3\r\n")\r | |
392 | finally:\r | |
393 | fileobj.close()\r | |
394 | os.unlink(name)\r | |
395 | \r | |
396 | fd, name = tempfile.mkstemp()\r | |
397 | fileobj = os.fdopen(fd, "w+b")\r | |
398 | try:\r | |
399 | writer = csv.writer(fileobj, dialect=testB())\r | |
400 | writer.writerow([1,2,3])\r | |
401 | fileobj.seek(0)\r | |
402 | self.assertEqual(fileobj.read(), "1:2:3\r\n")\r | |
403 | finally:\r | |
404 | fileobj.close()\r | |
405 | os.unlink(name)\r | |
406 | \r | |
407 | fd, name = tempfile.mkstemp()\r | |
408 | fileobj = os.fdopen(fd, "w+b")\r | |
409 | try:\r | |
410 | writer = csv.writer(fileobj, dialect='testC')\r | |
411 | writer.writerow([1,2,3])\r | |
412 | fileobj.seek(0)\r | |
413 | self.assertEqual(fileobj.read(), "1|2|3\r\n")\r | |
414 | finally:\r | |
415 | fileobj.close()\r | |
416 | os.unlink(name)\r | |
417 | \r | |
418 | fd, name = tempfile.mkstemp()\r | |
419 | fileobj = os.fdopen(fd, "w+b")\r | |
420 | try:\r | |
421 | writer = csv.writer(fileobj, dialect=testA, delimiter=';')\r | |
422 | writer.writerow([1,2,3])\r | |
423 | fileobj.seek(0)\r | |
424 | self.assertEqual(fileobj.read(), "1;2;3\r\n")\r | |
425 | finally:\r | |
426 | fileobj.close()\r | |
427 | os.unlink(name)\r | |
428 | \r | |
429 | finally:\r | |
430 | csv.unregister_dialect('testC')\r | |
431 | \r | |
432 | def test_bad_dialect(self):\r | |
433 | # Unknown parameter\r | |
434 | self.assertRaises(TypeError, csv.reader, [], bad_attr = 0)\r | |
435 | # Bad values\r | |
436 | self.assertRaises(TypeError, csv.reader, [], delimiter = None)\r | |
437 | self.assertRaises(TypeError, csv.reader, [], quoting = -1)\r | |
438 | self.assertRaises(TypeError, csv.reader, [], quoting = 100)\r | |
439 | \r | |
440 | class TestCsvBase(unittest.TestCase):\r | |
441 | def readerAssertEqual(self, input, expected_result):\r | |
442 | fd, name = tempfile.mkstemp()\r | |
443 | fileobj = os.fdopen(fd, "w+b")\r | |
444 | try:\r | |
445 | fileobj.write(input)\r | |
446 | fileobj.seek(0)\r | |
447 | reader = csv.reader(fileobj, dialect = self.dialect)\r | |
448 | fields = list(reader)\r | |
449 | self.assertEqual(fields, expected_result)\r | |
450 | finally:\r | |
451 | fileobj.close()\r | |
452 | os.unlink(name)\r | |
453 | \r | |
454 | def writerAssertEqual(self, input, expected_result):\r | |
455 | fd, name = tempfile.mkstemp()\r | |
456 | fileobj = os.fdopen(fd, "w+b")\r | |
457 | try:\r | |
458 | writer = csv.writer(fileobj, dialect = self.dialect)\r | |
459 | writer.writerows(input)\r | |
460 | fileobj.seek(0)\r | |
461 | self.assertEqual(fileobj.read(), expected_result)\r | |
462 | finally:\r | |
463 | fileobj.close()\r | |
464 | os.unlink(name)\r | |
465 | \r | |
466 | class TestDialectExcel(TestCsvBase):\r | |
467 | dialect = 'excel'\r | |
468 | \r | |
469 | def test_single(self):\r | |
470 | self.readerAssertEqual('abc', [['abc']])\r | |
471 | \r | |
472 | def test_simple(self):\r | |
473 | self.readerAssertEqual('1,2,3,4,5', [['1','2','3','4','5']])\r | |
474 | \r | |
475 | def test_blankline(self):\r | |
476 | self.readerAssertEqual('', [])\r | |
477 | \r | |
478 | def test_empty_fields(self):\r | |
479 | self.readerAssertEqual(',', [['', '']])\r | |
480 | \r | |
481 | def test_singlequoted(self):\r | |
482 | self.readerAssertEqual('""', [['']])\r | |
483 | \r | |
484 | def test_singlequoted_left_empty(self):\r | |
485 | self.readerAssertEqual('"",', [['','']])\r | |
486 | \r | |
487 | def test_singlequoted_right_empty(self):\r | |
488 | self.readerAssertEqual(',""', [['','']])\r | |
489 | \r | |
490 | def test_single_quoted_quote(self):\r | |
491 | self.readerAssertEqual('""""', [['"']])\r | |
492 | \r | |
493 | def test_quoted_quotes(self):\r | |
494 | self.readerAssertEqual('""""""', [['""']])\r | |
495 | \r | |
496 | def test_inline_quote(self):\r | |
497 | self.readerAssertEqual('a""b', [['a""b']])\r | |
498 | \r | |
499 | def test_inline_quotes(self):\r | |
500 | self.readerAssertEqual('a"b"c', [['a"b"c']])\r | |
501 | \r | |
502 | def test_quotes_and_more(self):\r | |
503 | # Excel would never write a field containing '"a"b', but when\r | |
504 | # reading one, it will return 'ab'.\r | |
505 | self.readerAssertEqual('"a"b', [['ab']])\r | |
506 | \r | |
507 | def test_lone_quote(self):\r | |
508 | self.readerAssertEqual('a"b', [['a"b']])\r | |
509 | \r | |
510 | def test_quote_and_quote(self):\r | |
511 | # Excel would never write a field containing '"a" "b"', but when\r | |
512 | # reading one, it will return 'a "b"'.\r | |
513 | self.readerAssertEqual('"a" "b"', [['a "b"']])\r | |
514 | \r | |
515 | def test_space_and_quote(self):\r | |
516 | self.readerAssertEqual(' "a"', [[' "a"']])\r | |
517 | \r | |
518 | def test_quoted(self):\r | |
519 | self.readerAssertEqual('1,2,3,"I think, therefore I am",5,6',\r | |
520 | [['1', '2', '3',\r | |
521 | 'I think, therefore I am',\r | |
522 | '5', '6']])\r | |
523 | \r | |
524 | def test_quoted_quote(self):\r | |
525 | self.readerAssertEqual('1,2,3,"""I see,"" said the blind man","as he picked up his hammer and saw"',\r | |
526 | [['1', '2', '3',\r | |
527 | '"I see," said the blind man',\r | |
528 | 'as he picked up his hammer and saw']])\r | |
529 | \r | |
530 | def test_quoted_nl(self):\r | |
531 | input = '''\\r | |
532 | 1,2,3,"""I see,""\r | |
533 | said the blind man","as he picked up his\r | |
534 | hammer and saw"\r | |
535 | 9,8,7,6'''\r | |
536 | self.readerAssertEqual(input,\r | |
537 | [['1', '2', '3',\r | |
538 | '"I see,"\nsaid the blind man',\r | |
539 | 'as he picked up his\nhammer and saw'],\r | |
540 | ['9','8','7','6']])\r | |
541 | \r | |
542 | def test_dubious_quote(self):\r | |
543 | self.readerAssertEqual('12,12,1",', [['12', '12', '1"', '']])\r | |
544 | \r | |
545 | def test_null(self):\r | |
546 | self.writerAssertEqual([], '')\r | |
547 | \r | |
548 | def test_single_writer(self):\r | |
549 | self.writerAssertEqual([['abc']], 'abc\r\n')\r | |
550 | \r | |
551 | def test_simple_writer(self):\r | |
552 | self.writerAssertEqual([[1, 2, 'abc', 3, 4]], '1,2,abc,3,4\r\n')\r | |
553 | \r | |
554 | def test_quotes(self):\r | |
555 | self.writerAssertEqual([[1, 2, 'a"bc"', 3, 4]], '1,2,"a""bc""",3,4\r\n')\r | |
556 | \r | |
557 | def test_quote_fieldsep(self):\r | |
558 | self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')\r | |
559 | \r | |
560 | def test_newlines(self):\r | |
561 | self.writerAssertEqual([[1, 2, 'a\nbc', 3, 4]], '1,2,"a\nbc",3,4\r\n')\r | |
562 | \r | |
563 | class EscapedExcel(csv.excel):\r | |
564 | quoting = csv.QUOTE_NONE\r | |
565 | escapechar = '\\'\r | |
566 | \r | |
567 | class TestEscapedExcel(TestCsvBase):\r | |
568 | dialect = EscapedExcel()\r | |
569 | \r | |
570 | def test_escape_fieldsep(self):\r | |
571 | self.writerAssertEqual([['abc,def']], 'abc\\,def\r\n')\r | |
572 | \r | |
573 | def test_read_escape_fieldsep(self):\r | |
574 | self.readerAssertEqual('abc\\,def\r\n', [['abc,def']])\r | |
575 | \r | |
576 | class QuotedEscapedExcel(csv.excel):\r | |
577 | quoting = csv.QUOTE_NONNUMERIC\r | |
578 | escapechar = '\\'\r | |
579 | \r | |
580 | class TestQuotedEscapedExcel(TestCsvBase):\r | |
581 | dialect = QuotedEscapedExcel()\r | |
582 | \r | |
583 | def test_write_escape_fieldsep(self):\r | |
584 | self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')\r | |
585 | \r | |
586 | def test_read_escape_fieldsep(self):\r | |
587 | self.readerAssertEqual('"abc\\,def"\r\n', [['abc,def']])\r | |
588 | \r | |
589 | class TestDictFields(unittest.TestCase):\r | |
590 | ### "long" means the row is longer than the number of fieldnames\r | |
591 | ### "short" means there are fewer elements in the row than fieldnames\r | |
592 | def test_write_simple_dict(self):\r | |
593 | fd, name = tempfile.mkstemp()\r | |
594 | fileobj = io.open(fd, 'w+b')\r | |
595 | try:\r | |
596 | writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])\r | |
597 | writer.writeheader()\r | |
598 | fileobj.seek(0)\r | |
599 | self.assertEqual(fileobj.readline(), "f1,f2,f3\r\n")\r | |
600 | writer.writerow({"f1": 10, "f3": "abc"})\r | |
601 | fileobj.seek(0)\r | |
602 | fileobj.readline() # header\r | |
603 | self.assertEqual(fileobj.read(), "10,,abc\r\n")\r | |
604 | finally:\r | |
605 | fileobj.close()\r | |
606 | os.unlink(name)\r | |
607 | \r | |
608 | def test_write_no_fields(self):\r | |
609 | fileobj = StringIO()\r | |
610 | self.assertRaises(TypeError, csv.DictWriter, fileobj)\r | |
611 | \r | |
612 | def test_read_dict_fields(self):\r | |
613 | fd, name = tempfile.mkstemp()\r | |
614 | fileobj = os.fdopen(fd, "w+b")\r | |
615 | try:\r | |
616 | fileobj.write("1,2,abc\r\n")\r | |
617 | fileobj.seek(0)\r | |
618 | reader = csv.DictReader(fileobj,\r | |
619 | fieldnames=["f1", "f2", "f3"])\r | |
620 | self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})\r | |
621 | finally:\r | |
622 | fileobj.close()\r | |
623 | os.unlink(name)\r | |
624 | \r | |
625 | def test_read_dict_no_fieldnames(self):\r | |
626 | fd, name = tempfile.mkstemp()\r | |
627 | fileobj = os.fdopen(fd, "w+b")\r | |
628 | try:\r | |
629 | fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")\r | |
630 | fileobj.seek(0)\r | |
631 | reader = csv.DictReader(fileobj)\r | |
632 | self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])\r | |
633 | self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})\r | |
634 | finally:\r | |
635 | fileobj.close()\r | |
636 | os.unlink(name)\r | |
637 | \r | |
638 | # Two test cases to make sure existing ways of implicitly setting\r | |
639 | # fieldnames continue to work. Both arise from discussion in issue3436.\r | |
640 | def test_read_dict_fieldnames_from_file(self):\r | |
641 | fd, name = tempfile.mkstemp()\r | |
642 | f = os.fdopen(fd, "w+b")\r | |
643 | try:\r | |
644 | f.write("f1,f2,f3\r\n1,2,abc\r\n")\r | |
645 | f.seek(0)\r | |
646 | reader = csv.DictReader(f, fieldnames=csv.reader(f).next())\r | |
647 | self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])\r | |
648 | self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})\r | |
649 | finally:\r | |
650 | f.close()\r | |
651 | os.unlink(name)\r | |
652 | \r | |
653 | def test_read_dict_fieldnames_chain(self):\r | |
654 | import itertools\r | |
655 | fd, name = tempfile.mkstemp()\r | |
656 | f = os.fdopen(fd, "w+b")\r | |
657 | try:\r | |
658 | f.write("f1,f2,f3\r\n1,2,abc\r\n")\r | |
659 | f.seek(0)\r | |
660 | reader = csv.DictReader(f)\r | |
661 | first = next(reader)\r | |
662 | for row in itertools.chain([first], reader):\r | |
663 | self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])\r | |
664 | self.assertEqual(row, {"f1": '1', "f2": '2', "f3": 'abc'})\r | |
665 | finally:\r | |
666 | f.close()\r | |
667 | os.unlink(name)\r | |
668 | \r | |
669 | def test_read_long(self):\r | |
670 | fd, name = tempfile.mkstemp()\r | |
671 | fileobj = os.fdopen(fd, "w+b")\r | |
672 | try:\r | |
673 | fileobj.write("1,2,abc,4,5,6\r\n")\r | |
674 | fileobj.seek(0)\r | |
675 | reader = csv.DictReader(fileobj,\r | |
676 | fieldnames=["f1", "f2"])\r | |
677 | self.assertEqual(reader.next(), {"f1": '1', "f2": '2',\r | |
678 | None: ["abc", "4", "5", "6"]})\r | |
679 | finally:\r | |
680 | fileobj.close()\r | |
681 | os.unlink(name)\r | |
682 | \r | |
683 | def test_read_long_with_rest(self):\r | |
684 | fd, name = tempfile.mkstemp()\r | |
685 | fileobj = os.fdopen(fd, "w+b")\r | |
686 | try:\r | |
687 | fileobj.write("1,2,abc,4,5,6\r\n")\r | |
688 | fileobj.seek(0)\r | |
689 | reader = csv.DictReader(fileobj,\r | |
690 | fieldnames=["f1", "f2"], restkey="_rest")\r | |
691 | self.assertEqual(reader.next(), {"f1": '1', "f2": '2',\r | |
692 | "_rest": ["abc", "4", "5", "6"]})\r | |
693 | finally:\r | |
694 | fileobj.close()\r | |
695 | os.unlink(name)\r | |
696 | \r | |
697 | def test_read_long_with_rest_no_fieldnames(self):\r | |
698 | fd, name = tempfile.mkstemp()\r | |
699 | fileobj = os.fdopen(fd, "w+b")\r | |
700 | try:\r | |
701 | fileobj.write("f1,f2\r\n1,2,abc,4,5,6\r\n")\r | |
702 | fileobj.seek(0)\r | |
703 | reader = csv.DictReader(fileobj, restkey="_rest")\r | |
704 | self.assertEqual(reader.fieldnames, ["f1", "f2"])\r | |
705 | self.assertEqual(reader.next(), {"f1": '1', "f2": '2',\r | |
706 | "_rest": ["abc", "4", "5", "6"]})\r | |
707 | finally:\r | |
708 | fileobj.close()\r | |
709 | os.unlink(name)\r | |
710 | \r | |
711 | def test_read_short(self):\r | |
712 | fd, name = tempfile.mkstemp()\r | |
713 | fileobj = os.fdopen(fd, "w+b")\r | |
714 | try:\r | |
715 | fileobj.write("1,2,abc,4,5,6\r\n1,2,abc\r\n")\r | |
716 | fileobj.seek(0)\r | |
717 | reader = csv.DictReader(fileobj,\r | |
718 | fieldnames="1 2 3 4 5 6".split(),\r | |
719 | restval="DEFAULT")\r | |
720 | self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',\r | |
721 | "4": '4', "5": '5', "6": '6'})\r | |
722 | self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',\r | |
723 | "4": 'DEFAULT', "5": 'DEFAULT',\r | |
724 | "6": 'DEFAULT'})\r | |
725 | finally:\r | |
726 | fileobj.close()\r | |
727 | os.unlink(name)\r | |
728 | \r | |
729 | def test_read_multi(self):\r | |
730 | sample = [\r | |
731 | '2147483648,43.0e12,17,abc,def\r\n',\r | |
732 | '147483648,43.0e2,17,abc,def\r\n',\r | |
733 | '47483648,43.0,170,abc,def\r\n'\r | |
734 | ]\r | |
735 | \r | |
736 | reader = csv.DictReader(sample,\r | |
737 | fieldnames="i1 float i2 s1 s2".split())\r | |
738 | self.assertEqual(reader.next(), {"i1": '2147483648',\r | |
739 | "float": '43.0e12',\r | |
740 | "i2": '17',\r | |
741 | "s1": 'abc',\r | |
742 | "s2": 'def'})\r | |
743 | \r | |
744 | def test_read_with_blanks(self):\r | |
745 | reader = csv.DictReader(["1,2,abc,4,5,6\r\n","\r\n",\r | |
746 | "1,2,abc,4,5,6\r\n"],\r | |
747 | fieldnames="1 2 3 4 5 6".split())\r | |
748 | self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',\r | |
749 | "4": '4', "5": '5', "6": '6'})\r | |
750 | self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',\r | |
751 | "4": '4', "5": '5', "6": '6'})\r | |
752 | \r | |
753 | def test_read_semi_sep(self):\r | |
754 | reader = csv.DictReader(["1;2;abc;4;5;6\r\n"],\r | |
755 | fieldnames="1 2 3 4 5 6".split(),\r | |
756 | delimiter=';')\r | |
757 | self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',\r | |
758 | "4": '4', "5": '5', "6": '6'})\r | |
759 | \r | |
760 | class TestArrayWrites(unittest.TestCase):\r | |
761 | def test_int_write(self):\r | |
762 | import array\r | |
763 | contents = [(20-i) for i in range(20)]\r | |
764 | a = array.array('i', contents)\r | |
765 | \r | |
766 | fd, name = tempfile.mkstemp()\r | |
767 | fileobj = os.fdopen(fd, "w+b")\r | |
768 | try:\r | |
769 | writer = csv.writer(fileobj, dialect="excel")\r | |
770 | writer.writerow(a)\r | |
771 | expected = ",".join([str(i) for i in a])+"\r\n"\r | |
772 | fileobj.seek(0)\r | |
773 | self.assertEqual(fileobj.read(), expected)\r | |
774 | finally:\r | |
775 | fileobj.close()\r | |
776 | os.unlink(name)\r | |
777 | \r | |
778 | def test_double_write(self):\r | |
779 | import array\r | |
780 | contents = [(20-i)*0.1 for i in range(20)]\r | |
781 | a = array.array('d', contents)\r | |
782 | fd, name = tempfile.mkstemp()\r | |
783 | fileobj = os.fdopen(fd, "w+b")\r | |
784 | try:\r | |
785 | writer = csv.writer(fileobj, dialect="excel")\r | |
786 | writer.writerow(a)\r | |
787 | expected = ",".join([str(i) for i in a])+"\r\n"\r | |
788 | fileobj.seek(0)\r | |
789 | self.assertEqual(fileobj.read(), expected)\r | |
790 | finally:\r | |
791 | fileobj.close()\r | |
792 | os.unlink(name)\r | |
793 | \r | |
794 | def test_float_write(self):\r | |
795 | import array\r | |
796 | contents = [(20-i)*0.1 for i in range(20)]\r | |
797 | a = array.array('f', contents)\r | |
798 | fd, name = tempfile.mkstemp()\r | |
799 | fileobj = os.fdopen(fd, "w+b")\r | |
800 | try:\r | |
801 | writer = csv.writer(fileobj, dialect="excel")\r | |
802 | writer.writerow(a)\r | |
803 | expected = ",".join([str(i) for i in a])+"\r\n"\r | |
804 | fileobj.seek(0)\r | |
805 | self.assertEqual(fileobj.read(), expected)\r | |
806 | finally:\r | |
807 | fileobj.close()\r | |
808 | os.unlink(name)\r | |
809 | \r | |
810 | def test_char_write(self):\r | |
811 | import array, string\r | |
812 | a = array.array('c', string.letters)\r | |
813 | fd, name = tempfile.mkstemp()\r | |
814 | fileobj = os.fdopen(fd, "w+b")\r | |
815 | try:\r | |
816 | writer = csv.writer(fileobj, dialect="excel")\r | |
817 | writer.writerow(a)\r | |
818 | expected = ",".join(a)+"\r\n"\r | |
819 | fileobj.seek(0)\r | |
820 | self.assertEqual(fileobj.read(), expected)\r | |
821 | finally:\r | |
822 | fileobj.close()\r | |
823 | os.unlink(name)\r | |
824 | \r | |
825 | class TestDialectValidity(unittest.TestCase):\r | |
826 | def test_quoting(self):\r | |
827 | class mydialect(csv.Dialect):\r | |
828 | delimiter = ";"\r | |
829 | escapechar = '\\'\r | |
830 | doublequote = False\r | |
831 | skipinitialspace = True\r | |
832 | lineterminator = '\r\n'\r | |
833 | quoting = csv.QUOTE_NONE\r | |
834 | d = mydialect()\r | |
835 | \r | |
836 | mydialect.quoting = None\r | |
837 | self.assertRaises(csv.Error, mydialect)\r | |
838 | \r | |
839 | mydialect.doublequote = True\r | |
840 | mydialect.quoting = csv.QUOTE_ALL\r | |
841 | mydialect.quotechar = '"'\r | |
842 | d = mydialect()\r | |
843 | \r | |
844 | mydialect.quotechar = "''"\r | |
845 | self.assertRaises(csv.Error, mydialect)\r | |
846 | \r | |
847 | mydialect.quotechar = 4\r | |
848 | self.assertRaises(csv.Error, mydialect)\r | |
849 | \r | |
850 | def test_delimiter(self):\r | |
851 | class mydialect(csv.Dialect):\r | |
852 | delimiter = ";"\r | |
853 | escapechar = '\\'\r | |
854 | doublequote = False\r | |
855 | skipinitialspace = True\r | |
856 | lineterminator = '\r\n'\r | |
857 | quoting = csv.QUOTE_NONE\r | |
858 | d = mydialect()\r | |
859 | \r | |
860 | mydialect.delimiter = ":::"\r | |
861 | self.assertRaises(csv.Error, mydialect)\r | |
862 | \r | |
863 | mydialect.delimiter = 4\r | |
864 | self.assertRaises(csv.Error, mydialect)\r | |
865 | \r | |
866 | def test_lineterminator(self):\r | |
867 | class mydialect(csv.Dialect):\r | |
868 | delimiter = ";"\r | |
869 | escapechar = '\\'\r | |
870 | doublequote = False\r | |
871 | skipinitialspace = True\r | |
872 | lineterminator = '\r\n'\r | |
873 | quoting = csv.QUOTE_NONE\r | |
874 | d = mydialect()\r | |
875 | \r | |
876 | mydialect.lineterminator = ":::"\r | |
877 | d = mydialect()\r | |
878 | \r | |
879 | mydialect.lineterminator = 4\r | |
880 | self.assertRaises(csv.Error, mydialect)\r | |
881 | \r | |
882 | \r | |
883 | class TestSniffer(unittest.TestCase):\r | |
884 | sample1 = """\\r | |
885 | Harry's, Arlington Heights, IL, 2/1/03, Kimi Hayes\r | |
886 | Shark City, Glendale Heights, IL, 12/28/02, Prezence\r | |
887 | Tommy's Place, Blue Island, IL, 12/28/02, Blue Sunday/White Crow\r | |
888 | Stonecutters Seafood and Chop House, Lemont, IL, 12/19/02, Week Back\r | |
889 | """\r | |
890 | sample2 = """\\r | |
891 | 'Harry''s':'Arlington Heights':'IL':'2/1/03':'Kimi Hayes'\r | |
892 | 'Shark City':'Glendale Heights':'IL':'12/28/02':'Prezence'\r | |
893 | 'Tommy''s Place':'Blue Island':'IL':'12/28/02':'Blue Sunday/White Crow'\r | |
894 | 'Stonecutters ''Seafood'' and Chop House':'Lemont':'IL':'12/19/02':'Week Back'\r | |
895 | """\r | |
896 | header = '''\\r | |
897 | "venue","city","state","date","performers"\r | |
898 | '''\r | |
899 | sample3 = '''\\r | |
900 | 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03\r | |
901 | 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03\r | |
902 | 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03\r | |
903 | '''\r | |
904 | \r | |
905 | sample4 = '''\\r | |
906 | 2147483648;43.0e12;17;abc;def\r | |
907 | 147483648;43.0e2;17;abc;def\r | |
908 | 47483648;43.0;170;abc;def\r | |
909 | '''\r | |
910 | \r | |
911 | sample5 = "aaa\tbbb\r\nAAA\t\r\nBBB\t\r\n"\r | |
912 | sample6 = "a|b|c\r\nd|e|f\r\n"\r | |
913 | sample7 = "'a'|'b'|'c'\r\n'd'|e|f\r\n"\r | |
914 | \r | |
915 | def test_has_header(self):\r | |
916 | sniffer = csv.Sniffer()\r | |
917 | self.assertEqual(sniffer.has_header(self.sample1), False)\r | |
918 | self.assertEqual(sniffer.has_header(self.header+self.sample1), True)\r | |
919 | \r | |
920 | def test_sniff(self):\r | |
921 | sniffer = csv.Sniffer()\r | |
922 | dialect = sniffer.sniff(self.sample1)\r | |
923 | self.assertEqual(dialect.delimiter, ",")\r | |
924 | self.assertEqual(dialect.quotechar, '"')\r | |
925 | self.assertEqual(dialect.skipinitialspace, True)\r | |
926 | \r | |
927 | dialect = sniffer.sniff(self.sample2)\r | |
928 | self.assertEqual(dialect.delimiter, ":")\r | |
929 | self.assertEqual(dialect.quotechar, "'")\r | |
930 | self.assertEqual(dialect.skipinitialspace, False)\r | |
931 | \r | |
932 | def test_delimiters(self):\r | |
933 | sniffer = csv.Sniffer()\r | |
934 | dialect = sniffer.sniff(self.sample3)\r | |
935 | # given that all three lines in sample3 are equal,\r | |
936 | # I think that any character could have been 'guessed' as the\r | |
937 | # delimiter, depending on dictionary order\r | |
938 | self.assertIn(dialect.delimiter, self.sample3)\r | |
939 | dialect = sniffer.sniff(self.sample3, delimiters="?,")\r | |
940 | self.assertEqual(dialect.delimiter, "?")\r | |
941 | dialect = sniffer.sniff(self.sample3, delimiters="/,")\r | |
942 | self.assertEqual(dialect.delimiter, "/")\r | |
943 | dialect = sniffer.sniff(self.sample4)\r | |
944 | self.assertEqual(dialect.delimiter, ";")\r | |
945 | dialect = sniffer.sniff(self.sample5)\r | |
946 | self.assertEqual(dialect.delimiter, "\t")\r | |
947 | dialect = sniffer.sniff(self.sample6)\r | |
948 | self.assertEqual(dialect.delimiter, "|")\r | |
949 | dialect = sniffer.sniff(self.sample7)\r | |
950 | self.assertEqual(dialect.delimiter, "|")\r | |
951 | self.assertEqual(dialect.quotechar, "'")\r | |
952 | \r | |
953 | def test_doublequote(self):\r | |
954 | sniffer = csv.Sniffer()\r | |
955 | dialect = sniffer.sniff(self.header)\r | |
956 | self.assertFalse(dialect.doublequote)\r | |
957 | dialect = sniffer.sniff(self.sample2)\r | |
958 | self.assertTrue(dialect.doublequote)\r | |
959 | \r | |
960 | if not hasattr(sys, "gettotalrefcount"):\r | |
961 | if test_support.verbose: print "*** skipping leakage tests ***"\r | |
962 | else:\r | |
963 | class NUL:\r | |
964 | def write(s, *args):\r | |
965 | pass\r | |
966 | writelines = write\r | |
967 | \r | |
968 | class TestLeaks(unittest.TestCase):\r | |
969 | def test_create_read(self):\r | |
970 | delta = 0\r | |
971 | lastrc = sys.gettotalrefcount()\r | |
972 | for i in xrange(20):\r | |
973 | gc.collect()\r | |
974 | self.assertEqual(gc.garbage, [])\r | |
975 | rc = sys.gettotalrefcount()\r | |
976 | csv.reader(["a,b,c\r\n"])\r | |
977 | csv.reader(["a,b,c\r\n"])\r | |
978 | csv.reader(["a,b,c\r\n"])\r | |
979 | delta = rc-lastrc\r | |
980 | lastrc = rc\r | |
981 | # if csv.reader() leaks, last delta should be 3 or more\r | |
982 | self.assertEqual(delta < 3, True)\r | |
983 | \r | |
984 | def test_create_write(self):\r | |
985 | delta = 0\r | |
986 | lastrc = sys.gettotalrefcount()\r | |
987 | s = NUL()\r | |
988 | for i in xrange(20):\r | |
989 | gc.collect()\r | |
990 | self.assertEqual(gc.garbage, [])\r | |
991 | rc = sys.gettotalrefcount()\r | |
992 | csv.writer(s)\r | |
993 | csv.writer(s)\r | |
994 | csv.writer(s)\r | |
995 | delta = rc-lastrc\r | |
996 | lastrc = rc\r | |
997 | # if csv.writer() leaks, last delta should be 3 or more\r | |
998 | self.assertEqual(delta < 3, True)\r | |
999 | \r | |
1000 | def test_read(self):\r | |
1001 | delta = 0\r | |
1002 | rows = ["a,b,c\r\n"]*5\r | |
1003 | lastrc = sys.gettotalrefcount()\r | |
1004 | for i in xrange(20):\r | |
1005 | gc.collect()\r | |
1006 | self.assertEqual(gc.garbage, [])\r | |
1007 | rc = sys.gettotalrefcount()\r | |
1008 | rdr = csv.reader(rows)\r | |
1009 | for row in rdr:\r | |
1010 | pass\r | |
1011 | delta = rc-lastrc\r | |
1012 | lastrc = rc\r | |
1013 | # if reader leaks during read, delta should be 5 or more\r | |
1014 | self.assertEqual(delta < 5, True)\r | |
1015 | \r | |
1016 | def test_write(self):\r | |
1017 | delta = 0\r | |
1018 | rows = [[1,2,3]]*5\r | |
1019 | s = NUL()\r | |
1020 | lastrc = sys.gettotalrefcount()\r | |
1021 | for i in xrange(20):\r | |
1022 | gc.collect()\r | |
1023 | self.assertEqual(gc.garbage, [])\r | |
1024 | rc = sys.gettotalrefcount()\r | |
1025 | writer = csv.writer(s)\r | |
1026 | for row in rows:\r | |
1027 | writer.writerow(row)\r | |
1028 | delta = rc-lastrc\r | |
1029 | lastrc = rc\r | |
1030 | # if writer leaks during write, last delta should be 5 or more\r | |
1031 | self.assertEqual(delta < 5, True)\r | |
1032 | \r | |
1033 | # commented out for now - csv module doesn't yet support Unicode\r | |
1034 | ## class TestUnicode(unittest.TestCase):\r | |
1035 | ## def test_unicode_read(self):\r | |
1036 | ## import codecs\r | |
1037 |