]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | # As a test suite for the os module, this is woefully inadequate, but this\r |
2 | # does add tests for a few functions which have been determined to be more\r | |
3 | # portable than they had been thought to be.\r | |
4 | \r | |
5 | import os\r | |
6 | import errno\r | |
7 | import unittest\r | |
8 | import warnings\r | |
9 | import sys\r | |
10 | import signal\r | |
11 | import subprocess\r | |
12 | import time\r | |
13 | from test import test_support\r | |
14 | import mmap\r | |
15 | import uuid\r | |
16 | \r | |
17 | warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, __name__)\r | |
18 | warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, __name__)\r | |
19 | \r | |
20 | # Tests creating TESTFN\r | |
21 | class FileTests(unittest.TestCase):\r | |
22 | def setUp(self):\r | |
23 | if os.path.exists(test_support.TESTFN):\r | |
24 | os.unlink(test_support.TESTFN)\r | |
25 | tearDown = setUp\r | |
26 | \r | |
27 | def test_access(self):\r | |
28 | f = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)\r | |
29 | os.close(f)\r | |
30 | self.assertTrue(os.access(test_support.TESTFN, os.W_OK))\r | |
31 | \r | |
32 | def test_closerange(self):\r | |
33 | first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)\r | |
34 | # We must allocate two consecutive file descriptors, otherwise\r | |
35 | # it will mess up other file descriptors (perhaps even the three\r | |
36 | # standard ones).\r | |
37 | second = os.dup(first)\r | |
38 | try:\r | |
39 | retries = 0\r | |
40 | while second != first + 1:\r | |
41 | os.close(first)\r | |
42 | retries += 1\r | |
43 | if retries > 10:\r | |
44 | # XXX test skipped\r | |
45 | self.skipTest("couldn't allocate two consecutive fds")\r | |
46 | first, second = second, os.dup(second)\r | |
47 | finally:\r | |
48 | os.close(second)\r | |
49 | # close a fd that is open, and one that isn't\r | |
50 | os.closerange(first, first + 2)\r | |
51 | self.assertRaises(OSError, os.write, first, "a")\r | |
52 | \r | |
53 | @test_support.cpython_only\r | |
54 | def test_rename(self):\r | |
55 | path = unicode(test_support.TESTFN)\r | |
56 | old = sys.getrefcount(path)\r | |
57 | self.assertRaises(TypeError, os.rename, path, 0)\r | |
58 | new = sys.getrefcount(path)\r | |
59 | self.assertEqual(old, new)\r | |
60 | \r | |
61 | \r | |
62 | class TemporaryFileTests(unittest.TestCase):\r | |
63 | def setUp(self):\r | |
64 | self.files = []\r | |
65 | os.mkdir(test_support.TESTFN)\r | |
66 | \r | |
67 | def tearDown(self):\r | |
68 | for name in self.files:\r | |
69 | os.unlink(name)\r | |
70 | os.rmdir(test_support.TESTFN)\r | |
71 | \r | |
72 | def check_tempfile(self, name):\r | |
73 | # make sure it doesn't already exist:\r | |
74 | self.assertFalse(os.path.exists(name),\r | |
75 | "file already exists for temporary file")\r | |
76 | # make sure we can create the file\r | |
77 | open(name, "w")\r | |
78 | self.files.append(name)\r | |
79 | \r | |
80 | def test_tempnam(self):\r | |
81 | if not hasattr(os, "tempnam"):\r | |
82 | return\r | |
83 | with warnings.catch_warnings():\r | |
84 | warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,\r | |
85 | r"test_os$")\r | |
86 | warnings.filterwarnings("ignore", "tempnam", DeprecationWarning)\r | |
87 | self.check_tempfile(os.tempnam())\r | |
88 | \r | |
89 | name = os.tempnam(test_support.TESTFN)\r | |
90 | self.check_tempfile(name)\r | |
91 | \r | |
92 | name = os.tempnam(test_support.TESTFN, "pfx")\r | |
93 | self.assertTrue(os.path.basename(name)[:3] == "pfx")\r | |
94 | self.check_tempfile(name)\r | |
95 | \r | |
96 | def test_tmpfile(self):\r | |
97 | if not hasattr(os, "tmpfile"):\r | |
98 | return\r | |
99 | # As with test_tmpnam() below, the Windows implementation of tmpfile()\r | |
100 | # attempts to create a file in the root directory of the current drive.\r | |
101 | # On Vista and Server 2008, this test will always fail for normal users\r | |
102 | # as writing to the root directory requires elevated privileges. With\r | |
103 | # XP and below, the semantics of tmpfile() are the same, but the user\r | |
104 | # running the test is more likely to have administrative privileges on\r | |
105 | # their account already. If that's the case, then os.tmpfile() should\r | |
106 | # work. In order to make this test as useful as possible, rather than\r | |
107 | # trying to detect Windows versions or whether or not the user has the\r | |
108 | # right permissions, just try and create a file in the root directory\r | |
109 | # and see if it raises a 'Permission denied' OSError. If it does, then\r | |
110 | # test that a subsequent call to os.tmpfile() raises the same error. If\r | |
111 | # it doesn't, assume we're on XP or below and the user running the test\r | |
112 | # has administrative privileges, and proceed with the test as normal.\r | |
113 | with warnings.catch_warnings():\r | |
114 | warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning)\r | |
115 | \r | |
116 | if sys.platform == 'win32':\r | |
117 | name = '\\python_test_os_test_tmpfile.txt'\r | |
118 | if os.path.exists(name):\r | |
119 | os.remove(name)\r | |
120 | try:\r | |
121 | fp = open(name, 'w')\r | |
122 | except IOError, first:\r | |
123 | # open() failed, assert tmpfile() fails in the same way.\r | |
124 | # Although open() raises an IOError and os.tmpfile() raises an\r | |
125 | # OSError(), 'args' will be (13, 'Permission denied') in both\r | |
126 | # cases.\r | |
127 | try:\r | |
128 | fp = os.tmpfile()\r | |
129 | except OSError, second:\r | |
130 | self.assertEqual(first.args, second.args)\r | |
131 | else:\r | |
132 | self.fail("expected os.tmpfile() to raise OSError")\r | |
133 | return\r | |
134 | else:\r | |
135 | # open() worked, therefore, tmpfile() should work. Close our\r | |
136 | # dummy file and proceed with the test as normal.\r | |
137 | fp.close()\r | |
138 | os.remove(name)\r | |
139 | \r | |
140 | fp = os.tmpfile()\r | |
141 | fp.write("foobar")\r | |
142 | fp.seek(0,0)\r | |
143 | s = fp.read()\r | |
144 | fp.close()\r | |
145 | self.assertTrue(s == "foobar")\r | |
146 | \r | |
147 | def test_tmpnam(self):\r | |
148 | if not hasattr(os, "tmpnam"):\r | |
149 | return\r | |
150 | with warnings.catch_warnings():\r | |
151 | warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,\r | |
152 | r"test_os$")\r | |
153 | warnings.filterwarnings("ignore", "tmpnam", DeprecationWarning)\r | |
154 | \r | |
155 | name = os.tmpnam()\r | |
156 | if sys.platform in ("win32",):\r | |
157 | # The Windows tmpnam() seems useless. From the MS docs:\r | |
158 | #\r | |
159 | # The character string that tmpnam creates consists of\r | |
160 | # the path prefix, defined by the entry P_tmpdir in the\r | |
161 | # file STDIO.H, followed by a sequence consisting of the\r | |
162 | # digit characters '0' through '9'; the numerical value\r | |
163 | # of this string is in the range 1 - 65,535. Changing the\r | |
164 | # definitions of L_tmpnam or P_tmpdir in STDIO.H does not\r | |
165 | # change the operation of tmpnam.\r | |
166 | #\r | |
167 | # The really bizarre part is that, at least under MSVC6,\r | |
168 | # P_tmpdir is "\\". That is, the path returned refers to\r | |
169 | # the root of the current drive. That's a terrible place to\r | |
170 | # put temp files, and, depending on privileges, the user\r | |
171 | # may not even be able to open a file in the root directory.\r | |
172 | self.assertFalse(os.path.exists(name),\r | |
173 | "file already exists for temporary file")\r | |
174 | else:\r | |
175 | self.check_tempfile(name)\r | |
176 | \r | |
177 | # Test attributes on return values from os.*stat* family.\r | |
178 | class StatAttributeTests(unittest.TestCase):\r | |
179 | def setUp(self):\r | |
180 | os.mkdir(test_support.TESTFN)\r | |
181 | self.fname = os.path.join(test_support.TESTFN, "f1")\r | |
182 | f = open(self.fname, 'wb')\r | |
183 | f.write("ABC")\r | |
184 | f.close()\r | |
185 | \r | |
186 | def tearDown(self):\r | |
187 | os.unlink(self.fname)\r | |
188 | os.rmdir(test_support.TESTFN)\r | |
189 | \r | |
190 | def test_stat_attributes(self):\r | |
191 | if not hasattr(os, "stat"):\r | |
192 | return\r | |
193 | \r | |
194 | import stat\r | |
195 | result = os.stat(self.fname)\r | |
196 | \r | |
197 | # Make sure direct access works\r | |
198 | self.assertEqual(result[stat.ST_SIZE], 3)\r | |
199 | self.assertEqual(result.st_size, 3)\r | |
200 | \r | |
201 | # Make sure all the attributes are there\r | |
202 | members = dir(result)\r | |
203 | for name in dir(stat):\r | |
204 | if name[:3] == 'ST_':\r | |
205 | attr = name.lower()\r | |
206 | if name.endswith("TIME"):\r | |
207 | def trunc(x): return int(x)\r | |
208 | else:\r | |
209 | def trunc(x): return x\r | |
210 | self.assertEqual(trunc(getattr(result, attr)),\r | |
211 | result[getattr(stat, name)])\r | |
212 | self.assertIn(attr, members)\r | |
213 | \r | |
214 | try:\r | |
215 | result[200]\r | |
216 | self.fail("No exception thrown")\r | |
217 | except IndexError:\r | |
218 | pass\r | |
219 | \r | |
220 | # Make sure that assignment fails\r | |
221 | try:\r | |
222 | result.st_mode = 1\r | |
223 | self.fail("No exception thrown")\r | |
224 | except (AttributeError, TypeError):\r | |
225 | pass\r | |
226 | \r | |
227 | try:\r | |
228 | result.st_rdev = 1\r | |
229 | self.fail("No exception thrown")\r | |
230 | except (AttributeError, TypeError):\r | |
231 | pass\r | |
232 | \r | |
233 | try:\r | |
234 | result.parrot = 1\r | |
235 | self.fail("No exception thrown")\r | |
236 | except AttributeError:\r | |
237 | pass\r | |
238 | \r | |
239 | # Use the stat_result constructor with a too-short tuple.\r | |
240 | try:\r | |
241 | result2 = os.stat_result((10,))\r | |
242 | self.fail("No exception thrown")\r | |
243 | except TypeError:\r | |
244 | pass\r | |
245 | \r | |
246 | # Use the constructor with a too-long tuple.\r | |
247 | try:\r | |
248 | result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))\r | |
249 | except TypeError:\r | |
250 | pass\r | |
251 | \r | |
252 | \r | |
253 | def test_statvfs_attributes(self):\r | |
254 | if not hasattr(os, "statvfs"):\r | |
255 | return\r | |
256 | \r | |
257 | try:\r | |
258 | result = os.statvfs(self.fname)\r | |
259 | except OSError, e:\r | |
260 | # On AtheOS, glibc always returns ENOSYS\r | |
261 | if e.errno == errno.ENOSYS:\r | |
262 | return\r | |
263 | \r | |
264 | # Make sure direct access works\r | |
265 | self.assertEqual(result.f_bfree, result[3])\r | |
266 | \r | |
267 | # Make sure all the attributes are there.\r | |
268 | members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',\r | |
269 | 'ffree', 'favail', 'flag', 'namemax')\r | |
270 | for value, member in enumerate(members):\r | |
271 | self.assertEqual(getattr(result, 'f_' + member), result[value])\r | |
272 | \r | |
273 | # Make sure that assignment really fails\r | |
274 | try:\r | |
275 | result.f_bfree = 1\r | |
276 | self.fail("No exception thrown")\r | |
277 | except TypeError:\r | |
278 | pass\r | |
279 | \r | |
280 | try:\r | |
281 | result.parrot = 1\r | |
282 | self.fail("No exception thrown")\r | |
283 | except AttributeError:\r | |
284 | pass\r | |
285 | \r | |
286 | # Use the constructor with a too-short tuple.\r | |
287 | try:\r | |
288 | result2 = os.statvfs_result((10,))\r | |
289 | self.fail("No exception thrown")\r | |
290 | except TypeError:\r | |
291 | pass\r | |
292 | \r | |
293 | # Use the constructor with a too-long tuple.\r | |
294 | try:\r | |
295 | result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))\r | |
296 | except TypeError:\r | |
297 | pass\r | |
298 | \r | |
299 | def test_utime_dir(self):\r | |
300 | delta = 1000000\r | |
301 | st = os.stat(test_support.TESTFN)\r | |
302 | # round to int, because some systems may support sub-second\r | |
303 | # time stamps in stat, but not in utime.\r | |
304 | os.utime(test_support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))\r | |
305 | st2 = os.stat(test_support.TESTFN)\r | |
306 | self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))\r | |
307 | \r | |
308 | # Restrict test to Win32, since there is no guarantee other\r | |
309 | # systems support centiseconds\r | |
310 | if sys.platform == 'win32':\r | |
311 | def get_file_system(path):\r | |
312 | root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'\r | |
313 | import ctypes\r | |
314 | kernel32 = ctypes.windll.kernel32\r | |
315 | buf = ctypes.create_string_buffer("", 100)\r | |
316 | if kernel32.GetVolumeInformationA(root, None, 0, None, None, None, buf, len(buf)):\r | |
317 | return buf.value\r | |
318 | \r | |
319 | if get_file_system(test_support.TESTFN) == "NTFS":\r | |
320 | def test_1565150(self):\r | |
321 | t1 = 1159195039.25\r | |
322 | os.utime(self.fname, (t1, t1))\r | |
323 | self.assertEqual(os.stat(self.fname).st_mtime, t1)\r | |
324 | \r | |
325 | def test_large_time(self):\r | |
326 | t1 = 5000000000 # some day in 2128\r | |
327 | os.utime(self.fname, (t1, t1))\r | |
328 | self.assertEqual(os.stat(self.fname).st_mtime, t1)\r | |
329 | \r | |
330 | def test_1686475(self):\r | |
331 | # Verify that an open file can be stat'ed\r | |
332 | try:\r | |
333 | os.stat(r"c:\pagefile.sys")\r | |
334 | except WindowsError, e:\r | |
335 | if e.errno == 2: # file does not exist; cannot run test\r | |
336 | return\r | |
337 | self.fail("Could not stat pagefile.sys")\r | |
338 | \r | |
339 | from test import mapping_tests\r | |
340 | \r | |
341 | class EnvironTests(mapping_tests.BasicTestMappingProtocol):\r | |
342 | """check that os.environ object conform to mapping protocol"""\r | |
343 | type2test = None\r | |
344 | def _reference(self):\r | |
345 | return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}\r | |
346 | def _empty_mapping(self):\r | |
347 | os.environ.clear()\r | |
348 | return os.environ\r | |
349 | def setUp(self):\r | |
350 | self.__save = dict(os.environ)\r | |
351 | os.environ.clear()\r | |
352 | def tearDown(self):\r | |
353 | os.environ.clear()\r | |
354 | os.environ.update(self.__save)\r | |
355 | \r | |
356 | # Bug 1110478\r | |
357 | def test_update2(self):\r | |
358 | if os.path.exists("/bin/sh"):\r | |
359 | os.environ.update(HELLO="World")\r | |
360 | with os.popen("/bin/sh -c 'echo $HELLO'") as popen:\r | |
361 | value = popen.read().strip()\r | |
362 | self.assertEqual(value, "World")\r | |
363 | \r | |
364 | class WalkTests(unittest.TestCase):\r | |
365 | """Tests for os.walk()."""\r | |
366 | \r | |
367 | def test_traversal(self):\r | |
368 | import os\r | |
369 | from os.path import join\r | |
370 | \r | |
371 | # Build:\r | |
372 | # TESTFN/\r | |
373 | # TEST1/ a file kid and two directory kids\r | |
374 | # tmp1\r | |
375 | # SUB1/ a file kid and a directory kid\r | |
376 | # tmp2\r | |
377 | # SUB11/ no kids\r | |
378 | # SUB2/ a file kid and a dirsymlink kid\r | |
379 | # tmp3\r | |
380 | # link/ a symlink to TESTFN.2\r | |
381 | # TEST2/\r | |
382 | # tmp4 a lone file\r | |
383 | walk_path = join(test_support.TESTFN, "TEST1")\r | |
384 | sub1_path = join(walk_path, "SUB1")\r | |
385 | sub11_path = join(sub1_path, "SUB11")\r | |
386 | sub2_path = join(walk_path, "SUB2")\r | |
387 | tmp1_path = join(walk_path, "tmp1")\r | |
388 | tmp2_path = join(sub1_path, "tmp2")\r | |
389 | tmp3_path = join(sub2_path, "tmp3")\r | |
390 | link_path = join(sub2_path, "link")\r | |
391 | t2_path = join(test_support.TESTFN, "TEST2")\r | |
392 | tmp4_path = join(test_support.TESTFN, "TEST2", "tmp4")\r | |
393 | \r | |
394 | # Create stuff.\r | |
395 | os.makedirs(sub11_path)\r | |
396 | os.makedirs(sub2_path)\r | |
397 | os.makedirs(t2_path)\r | |
398 | for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:\r | |
399 | f = file(path, "w")\r | |
400 | f.write("I'm " + path + " and proud of it. Blame test_os.\n")\r | |
401 | f.close()\r | |
402 | if hasattr(os, "symlink"):\r | |
403 | os.symlink(os.path.abspath(t2_path), link_path)\r | |
404 | sub2_tree = (sub2_path, ["link"], ["tmp3"])\r | |
405 | else:\r | |
406 | sub2_tree = (sub2_path, [], ["tmp3"])\r | |
407 | \r | |
408 | # Walk top-down.\r | |
409 | all = list(os.walk(walk_path))\r | |
410 | self.assertEqual(len(all), 4)\r | |
411 | # We can't know which order SUB1 and SUB2 will appear in.\r | |
412 | # Not flipped: TESTFN, SUB1, SUB11, SUB2\r | |
413 | # flipped: TESTFN, SUB2, SUB1, SUB11\r | |
414 | flipped = all[0][1][0] != "SUB1"\r | |
415 | all[0][1].sort()\r | |
416 | self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))\r | |
417 | self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))\r | |
418 | self.assertEqual(all[2 + flipped], (sub11_path, [], []))\r | |
419 | self.assertEqual(all[3 - 2 * flipped], sub2_tree)\r | |
420 | \r | |
421 | # Prune the search.\r | |
422 | all = []\r | |
423 | for root, dirs, files in os.walk(walk_path):\r | |
424 | all.append((root, dirs, files))\r | |
425 | # Don't descend into SUB1.\r | |
426 | if 'SUB1' in dirs:\r | |
427 | # Note that this also mutates the dirs we appended to all!\r | |
428 | dirs.remove('SUB1')\r | |
429 | self.assertEqual(len(all), 2)\r | |
430 | self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))\r | |
431 | self.assertEqual(all[1], sub2_tree)\r | |
432 | \r | |
433 | # Walk bottom-up.\r | |
434 | all = list(os.walk(walk_path, topdown=False))\r | |
435 | self.assertEqual(len(all), 4)\r | |
436 | # We can't know which order SUB1 and SUB2 will appear in.\r | |
437 | # Not flipped: SUB11, SUB1, SUB2, TESTFN\r | |
438 | # flipped: SUB2, SUB11, SUB1, TESTFN\r | |
439 | flipped = all[3][1][0] != "SUB1"\r | |
440 | all[3][1].sort()\r | |
441 | self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))\r | |
442 | self.assertEqual(all[flipped], (sub11_path, [], []))\r | |
443 | self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))\r | |
444 | self.assertEqual(all[2 - 2 * flipped], sub2_tree)\r | |
445 | \r | |
446 | if hasattr(os, "symlink"):\r | |
447 | # Walk, following symlinks.\r | |
448 | for root, dirs, files in os.walk(walk_path, followlinks=True):\r | |
449 | if root == link_path:\r | |
450 | self.assertEqual(dirs, [])\r | |
451 | self.assertEqual(files, ["tmp4"])\r | |
452 | break\r | |
453 | else:\r | |
454 | self.fail("Didn't follow symlink with followlinks=True")\r | |
455 | \r | |
456 | def tearDown(self):\r | |
457 | # Tear everything down. This is a decent use for bottom-up on\r | |
458 | # Windows, which doesn't have a recursive delete command. The\r | |
459 | # (not so) subtlety is that rmdir will fail unless the dir's\r | |
460 | # kids are removed first, so bottom up is essential.\r | |
461 | for root, dirs, files in os.walk(test_support.TESTFN, topdown=False):\r | |
462 | for name in files:\r | |
463 | os.remove(os.path.join(root, name))\r | |
464 | for name in dirs:\r | |
465 | dirname = os.path.join(root, name)\r | |
466 | if not os.path.islink(dirname):\r | |
467 | os.rmdir(dirname)\r | |
468 | else:\r | |
469 | os.remove(dirname)\r | |
470 | os.rmdir(test_support.TESTFN)\r | |
471 | \r | |
472 | class MakedirTests (unittest.TestCase):\r | |
473 | def setUp(self):\r | |
474 | os.mkdir(test_support.TESTFN)\r | |
475 | \r | |
476 | def test_makedir(self):\r | |
477 | base = test_support.TESTFN\r | |
478 | path = os.path.join(base, 'dir1', 'dir2', 'dir3')\r | |
479 | os.makedirs(path) # Should work\r | |
480 | path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')\r | |
481 | os.makedirs(path)\r | |
482 | \r | |
483 | # Try paths with a '.' in them\r | |
484 | self.assertRaises(OSError, os.makedirs, os.curdir)\r | |
485 | path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)\r | |
486 | os.makedirs(path)\r | |
487 | path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',\r | |
488 | 'dir5', 'dir6')\r | |
489 | os.makedirs(path)\r | |
490 | \r | |
491 | \r | |
492 | \r | |
493 | \r | |
494 | def tearDown(self):\r | |
495 | path = os.path.join(test_support.TESTFN, 'dir1', 'dir2', 'dir3',\r | |
496 | 'dir4', 'dir5', 'dir6')\r | |
497 | # If the tests failed, the bottom-most directory ('../dir6')\r | |
498 | # may not have been created, so we look for the outermost directory\r | |
499 | # that exists.\r | |
500 | while not os.path.exists(path) and path != test_support.TESTFN:\r | |
501 | path = os.path.dirname(path)\r | |
502 | \r | |
503 | os.removedirs(path)\r | |
504 | \r | |
505 | class DevNullTests (unittest.TestCase):\r | |
506 | def test_devnull(self):\r | |
507 | f = file(os.devnull, 'w')\r | |
508 | f.write('hello')\r | |
509 | f.close()\r | |
510 | f = file(os.devnull, 'r')\r | |
511 | self.assertEqual(f.read(), '')\r | |
512 | f.close()\r | |
513 | \r | |
514 | class URandomTests (unittest.TestCase):\r | |
515 | def test_urandom(self):\r | |
516 | try:\r | |
517 | self.assertEqual(len(os.urandom(1)), 1)\r | |
518 | self.assertEqual(len(os.urandom(10)), 10)\r | |
519 | self.assertEqual(len(os.urandom(100)), 100)\r | |
520 | self.assertEqual(len(os.urandom(1000)), 1000)\r | |
521 | # see http://bugs.python.org/issue3708\r | |
522 | self.assertRaises(TypeError, os.urandom, 0.9)\r | |
523 | self.assertRaises(TypeError, os.urandom, 1.1)\r | |
524 | self.assertRaises(TypeError, os.urandom, 2.0)\r | |
525 | except NotImplementedError:\r | |
526 | pass\r | |
527 | \r | |
528 | def test_execvpe_with_bad_arglist(self):\r | |
529 | self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)\r | |
530 | \r | |
531 | class Win32ErrorTests(unittest.TestCase):\r | |
532 | def test_rename(self):\r | |
533 | self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak")\r | |
534 | \r | |
535 | def test_remove(self):\r | |
536 | self.assertRaises(WindowsError, os.remove, test_support.TESTFN)\r | |
537 | \r | |
538 | def test_chdir(self):\r | |
539 | self.assertRaises(WindowsError, os.chdir, test_support.TESTFN)\r | |
540 | \r | |
541 | def test_mkdir(self):\r | |
542 | f = open(test_support.TESTFN, "w")\r | |
543 | try:\r | |
544 | self.assertRaises(WindowsError, os.mkdir, test_support.TESTFN)\r | |
545 | finally:\r | |
546 | f.close()\r | |
547 | os.unlink(test_support.TESTFN)\r | |
548 | \r | |
549 | def test_utime(self):\r | |
550 | self.assertRaises(WindowsError, os.utime, test_support.TESTFN, None)\r | |
551 | \r | |
552 | def test_chmod(self):\r | |
553 | self.assertRaises(WindowsError, os.chmod, test_support.TESTFN, 0)\r | |
554 | \r | |
555 | class TestInvalidFD(unittest.TestCase):\r | |
556 | singles = ["fchdir", "fdopen", "dup", "fdatasync", "fstat",\r | |
557 | "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]\r | |
558 | #singles.append("close")\r | |
559 | #We omit close because it doesn'r raise an exception on some platforms\r | |
560 | def get_single(f):\r | |
561 | def helper(self):\r | |
562 | if hasattr(os, f):\r | |
563 | self.check(getattr(os, f))\r | |
564 | return helper\r | |
565 | for f in singles:\r | |
566 | locals()["test_"+f] = get_single(f)\r | |
567 | \r | |
568 | def check(self, f, *args):\r | |
569 | try:\r | |
570 | f(test_support.make_bad_fd(), *args)\r | |
571 | except OSError as e:\r | |
572 | self.assertEqual(e.errno, errno.EBADF)\r | |
573 | else:\r | |
574 | self.fail("%r didn't raise a OSError with a bad file descriptor"\r | |
575 | % f)\r | |
576 | \r | |
577 | def test_isatty(self):\r | |
578 | if hasattr(os, "isatty"):\r | |
579 | self.assertEqual(os.isatty(test_support.make_bad_fd()), False)\r | |
580 | \r | |
581 | def test_closerange(self):\r | |
582 | if hasattr(os, "closerange"):\r | |
583 | fd = test_support.make_bad_fd()\r | |
584 | # Make sure none of the descriptors we are about to close are\r | |
585 | # currently valid (issue 6542).\r | |
586 | for i in range(10):\r | |
587 | try: os.fstat(fd+i)\r | |
588 | except OSError:\r | |
589 | pass\r | |
590 | else:\r | |
591 | break\r | |
592 | if i < 2:\r | |
593 | raise unittest.SkipTest(\r | |
594 | "Unable to acquire a range of invalid file descriptors")\r | |
595 | self.assertEqual(os.closerange(fd, fd + i-1), None)\r | |
596 | \r | |
597 | def test_dup2(self):\r | |
598 | if hasattr(os, "dup2"):\r | |
599 | self.check(os.dup2, 20)\r | |
600 | \r | |
601 | def test_fchmod(self):\r | |
602 | if hasattr(os, "fchmod"):\r | |
603 | self.check(os.fchmod, 0)\r | |
604 | \r | |
605 | def test_fchown(self):\r | |
606 | if hasattr(os, "fchown"):\r | |
607 | self.check(os.fchown, -1, -1)\r | |
608 | \r | |
609 | def test_fpathconf(self):\r | |
610 | if hasattr(os, "fpathconf"):\r | |
611 | self.check(os.fpathconf, "PC_NAME_MAX")\r | |
612 | \r | |
613 | def test_ftruncate(self):\r | |
614 | if hasattr(os, "ftruncate"):\r | |
615 | self.check(os.ftruncate, 0)\r | |
616 | \r | |
617 | def test_lseek(self):\r | |
618 | if hasattr(os, "lseek"):\r | |
619 | self.check(os.lseek, 0, 0)\r | |
620 | \r | |
621 | def test_read(self):\r | |
622 | if hasattr(os, "read"):\r | |
623 | self.check(os.read, 1)\r | |
624 | \r | |
625 | def test_tcsetpgrpt(self):\r | |
626 | if hasattr(os, "tcsetpgrp"):\r | |
627 | self.check(os.tcsetpgrp, 0)\r | |
628 | \r | |
629 | def test_write(self):\r | |
630 | if hasattr(os, "write"):\r | |
631 | self.check(os.write, " ")\r | |
632 | \r | |
633 | if sys.platform != 'win32':\r | |
634 | class Win32ErrorTests(unittest.TestCase):\r | |
635 | pass\r | |
636 | \r | |
637 | class PosixUidGidTests(unittest.TestCase):\r | |
638 | if hasattr(os, 'setuid'):\r | |
639 | def test_setuid(self):\r | |
640 | if os.getuid() != 0:\r | |
641 | self.assertRaises(os.error, os.setuid, 0)\r | |
642 | self.assertRaises(OverflowError, os.setuid, 1<<32)\r | |
643 | \r | |
644 | if hasattr(os, 'setgid'):\r | |
645 | def test_setgid(self):\r | |
646 | if os.getuid() != 0:\r | |
647 | self.assertRaises(os.error, os.setgid, 0)\r | |
648 | self.assertRaises(OverflowError, os.setgid, 1<<32)\r | |
649 | \r | |
650 | if hasattr(os, 'seteuid'):\r | |
651 | def test_seteuid(self):\r | |
652 | if os.getuid() != 0:\r | |
653 | self.assertRaises(os.error, os.seteuid, 0)\r | |
654 | self.assertRaises(OverflowError, os.seteuid, 1<<32)\r | |
655 | \r | |
656 | if hasattr(os, 'setegid'):\r | |
657 | def test_setegid(self):\r | |
658 | if os.getuid() != 0:\r | |
659 | self.assertRaises(os.error, os.setegid, 0)\r | |
660 | self.assertRaises(OverflowError, os.setegid, 1<<32)\r | |
661 | \r | |
662 | if hasattr(os, 'setreuid'):\r | |
663 | def test_setreuid(self):\r | |
664 | if os.getuid() != 0:\r | |
665 | self.assertRaises(os.error, os.setreuid, 0, 0)\r | |
666 | self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)\r | |
667 | self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)\r | |
668 | \r | |
669 | def test_setreuid_neg1(self):\r | |
670 | # Needs to accept -1. We run this in a subprocess to avoid\r | |
671 | # altering the test runner's process state (issue8045).\r | |
672 | subprocess.check_call([\r | |
673 | sys.executable, '-c',\r | |
674 | 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])\r | |
675 | \r | |
676 | if hasattr(os, 'setregid'):\r | |
677 | def test_setregid(self):\r | |
678 | if os.getuid() != 0:\r | |
679 | self.assertRaises(os.error, os.setregid, 0, 0)\r | |
680 | self.assertRaises(OverflowError, os.setregid, 1<<32, 0)\r | |
681 | self.assertRaises(OverflowError, os.setregid, 0, 1<<32)\r | |
682 | \r | |
683 | def test_setregid_neg1(self):\r | |
684 | # Needs to accept -1. We run this in a subprocess to avoid\r | |
685 | # altering the test runner's process state (issue8045).\r | |
686 | subprocess.check_call([\r | |
687 | sys.executable, '-c',\r | |
688 | 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])\r | |
689 | else:\r | |
690 | class PosixUidGidTests(unittest.TestCase):\r | |
691 | pass\r | |
692 | \r | |
693 | @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")\r | |
694 | class Win32KillTests(unittest.TestCase):\r | |
695 | def _kill(self, sig):\r | |
696 | # Start sys.executable as a subprocess and communicate from the\r | |
697 | # subprocess to the parent that the interpreter is ready. When it\r | |
698 | # becomes ready, send *sig* via os.kill to the subprocess and check\r | |
699 | # that the return code is equal to *sig*.\r | |
700 | import ctypes\r | |
701 | from ctypes import wintypes\r | |
702 | import msvcrt\r | |
703 | \r | |
704 | # Since we can't access the contents of the process' stdout until the\r | |
705 | # process has exited, use PeekNamedPipe to see what's inside stdout\r | |
706 | # without waiting. This is done so we can tell that the interpreter\r | |
707 | # is started and running at a point where it could handle a signal.\r | |
708 | PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe\r | |
709 | PeekNamedPipe.restype = wintypes.BOOL\r | |
710 | PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle\r | |
711 | ctypes.POINTER(ctypes.c_char), # stdout buf\r | |
712 | wintypes.DWORD, # Buffer size\r | |
713 | ctypes.POINTER(wintypes.DWORD), # bytes read\r | |
714 | ctypes.POINTER(wintypes.DWORD), # bytes avail\r | |
715 | ctypes.POINTER(wintypes.DWORD)) # bytes left\r | |
716 | msg = "running"\r | |
717 | proc = subprocess.Popen([sys.executable, "-c",\r | |
718 | "import sys;"\r | |
719 | "sys.stdout.write('{}');"\r | |
720 | "sys.stdout.flush();"\r | |
721 | "input()".format(msg)],\r | |
722 | stdout=subprocess.PIPE,\r | |
723 | stderr=subprocess.PIPE,\r | |
724 | stdin=subprocess.PIPE)\r | |
725 | self.addCleanup(proc.stdout.close)\r | |
726 | self.addCleanup(proc.stderr.close)\r | |
727 | self.addCleanup(proc.stdin.close)\r | |
728 | \r | |
729 | count, max = 0, 100\r | |
730 | while count < max and proc.poll() is None:\r | |
731 | # Create a string buffer to store the result of stdout from the pipe\r | |
732 | buf = ctypes.create_string_buffer(len(msg))\r | |
733 | # Obtain the text currently in proc.stdout\r | |
734 | # Bytes read/avail/left are left as NULL and unused\r | |
735 | rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),\r | |
736 | buf, ctypes.sizeof(buf), None, None, None)\r | |
737 | self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")\r | |
738 | if buf.value:\r | |
739 | self.assertEqual(msg, buf.value)\r | |
740 | break\r | |
741 | time.sleep(0.1)\r | |
742 | count += 1\r | |
743 | else:\r | |
744 | self.fail("Did not receive communication from the subprocess")\r | |
745 | \r | |
746 | os.kill(proc.pid, sig)\r | |
747 | self.assertEqual(proc.wait(), sig)\r | |
748 | \r | |
749 | def test_kill_sigterm(self):\r | |
750 | # SIGTERM doesn't mean anything special, but make sure it works\r | |
751 | self._kill(signal.SIGTERM)\r | |
752 | \r | |
753 | def test_kill_int(self):\r | |
754 | # os.kill on Windows can take an int which gets set as the exit code\r | |
755 | self._kill(100)\r | |
756 | \r | |
757 | def _kill_with_event(self, event, name):\r | |
758 | tagname = "test_os_%s" % uuid.uuid1()\r | |
759 | m = mmap.mmap(-1, 1, tagname)\r | |
760 | m[0] = '0'\r | |
761 | # Run a script which has console control handling enabled.\r | |
762 | proc = subprocess.Popen([sys.executable,\r | |
763 | os.path.join(os.path.dirname(__file__),\r | |
764 | "win_console_handler.py"), tagname],\r | |
765 | creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)\r | |
766 | # Let the interpreter startup before we send signals. See #3137.\r | |
767 | count, max = 0, 20\r | |
768 | while count < max and proc.poll() is None:\r | |
769 | if m[0] == '1':\r | |
770 | break\r | |
771 | time.sleep(0.5)\r | |
772 | count += 1\r | |
773 | else:\r | |
774 | self.fail("Subprocess didn't finish initialization")\r | |
775 | os.kill(proc.pid, event)\r | |
776 | # proc.send_signal(event) could also be done here.\r | |
777 | # Allow time for the signal to be passed and the process to exit.\r | |
778 | time.sleep(0.5)\r | |
779 | if not proc.poll():\r | |
780 | # Forcefully kill the process if we weren't able to signal it.\r | |
781 | os.kill(proc.pid, signal.SIGINT)\r | |
782 | self.fail("subprocess did not stop on {}".format(name))\r | |
783 | \r | |
784 | @unittest.skip("subprocesses aren't inheriting CTRL+C property")\r | |
785 | def test_CTRL_C_EVENT(self):\r | |
786 | from ctypes import wintypes\r | |
787 | import ctypes\r | |
788 | \r | |
789 | # Make a NULL value by creating a pointer with no argument.\r | |
790 | NULL = ctypes.POINTER(ctypes.c_int)()\r | |
791 | SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler\r | |
792 | SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),\r | |
793 | wintypes.BOOL)\r | |
794 | SetConsoleCtrlHandler.restype = wintypes.BOOL\r | |
795 | \r | |
796 | # Calling this with NULL and FALSE causes the calling process to\r | |
797 | # handle CTRL+C, rather than ignore it. This property is inherited\r | |
798 | # by subprocesses.\r | |
799 | SetConsoleCtrlHandler(NULL, 0)\r | |
800 | \r | |
801 | self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")\r | |
802 | \r | |
803 | def test_CTRL_BREAK_EVENT(self):\r | |
804 | self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")\r | |
805 | \r | |
806 | \r | |
807 | def test_main():\r | |
808 | test_support.run_unittest(\r | |
809 | FileTests,\r | |
810 | TemporaryFileTests,\r | |
811 | StatAttributeTests,\r | |
812 | EnvironTests,\r | |
813 | WalkTests,\r | |
814 | MakedirTests,\r | |
815 | DevNullTests,\r | |
816 | URandomTests,\r | |
817 | Win32ErrorTests,\r | |
818 | TestInvalidFD,\r | |
819 | PosixUidGidTests,\r | |
820 | Win32KillTests\r | |
821 | )\r | |
822 | \r | |
823 | if __name__ == "__main__":\r | |
824 | test_main()\r |