]> git.proxmox.com Git - mirror_edk2.git/blame - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_os.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_os.py
CommitLineData
4710c53d 1# As a test suite for the os module, this is woefully inadequate, but this\r
2# does add tests for a few functions which have been determined to be more\r
3# portable than they had been thought to be.\r
4\r
5import os\r
6import errno\r
7import unittest\r
8import warnings\r
9import sys\r
10import signal\r
11import subprocess\r
12import time\r
13from test import test_support\r
14import mmap\r
15import uuid\r
16\r
17warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, __name__)\r
18warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, __name__)\r
19\r
20# Tests creating TESTFN\r
21class FileTests(unittest.TestCase):\r
22 def setUp(self):\r
23 if os.path.exists(test_support.TESTFN):\r
24 os.unlink(test_support.TESTFN)\r
25 tearDown = setUp\r
26\r
27 def test_access(self):\r
28 f = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)\r
29 os.close(f)\r
30 self.assertTrue(os.access(test_support.TESTFN, os.W_OK))\r
31\r
32 def test_closerange(self):\r
33 first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)\r
34 # We must allocate two consecutive file descriptors, otherwise\r
35 # it will mess up other file descriptors (perhaps even the three\r
36 # standard ones).\r
37 second = os.dup(first)\r
38 try:\r
39 retries = 0\r
40 while second != first + 1:\r
41 os.close(first)\r
42 retries += 1\r
43 if retries > 10:\r
44 # XXX test skipped\r
45 self.skipTest("couldn't allocate two consecutive fds")\r
46 first, second = second, os.dup(second)\r
47 finally:\r
48 os.close(second)\r
49 # close a fd that is open, and one that isn't\r
50 os.closerange(first, first + 2)\r
51 self.assertRaises(OSError, os.write, first, "a")\r
52\r
53 @test_support.cpython_only\r
54 def test_rename(self):\r
55 path = unicode(test_support.TESTFN)\r
56 old = sys.getrefcount(path)\r
57 self.assertRaises(TypeError, os.rename, path, 0)\r
58 new = sys.getrefcount(path)\r
59 self.assertEqual(old, new)\r
60\r
61\r
62class TemporaryFileTests(unittest.TestCase):\r
63 def setUp(self):\r
64 self.files = []\r
65 os.mkdir(test_support.TESTFN)\r
66\r
67 def tearDown(self):\r
68 for name in self.files:\r
69 os.unlink(name)\r
70 os.rmdir(test_support.TESTFN)\r
71\r
72 def check_tempfile(self, name):\r
73 # make sure it doesn't already exist:\r
74 self.assertFalse(os.path.exists(name),\r
75 "file already exists for temporary file")\r
76 # make sure we can create the file\r
77 open(name, "w")\r
78 self.files.append(name)\r
79\r
80 def test_tempnam(self):\r
81 if not hasattr(os, "tempnam"):\r
82 return\r
83 with warnings.catch_warnings():\r
84 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,\r
85 r"test_os$")\r
86 warnings.filterwarnings("ignore", "tempnam", DeprecationWarning)\r
87 self.check_tempfile(os.tempnam())\r
88\r
89 name = os.tempnam(test_support.TESTFN)\r
90 self.check_tempfile(name)\r
91\r
92 name = os.tempnam(test_support.TESTFN, "pfx")\r
93 self.assertTrue(os.path.basename(name)[:3] == "pfx")\r
94 self.check_tempfile(name)\r
95\r
96 def test_tmpfile(self):\r
97 if not hasattr(os, "tmpfile"):\r
98 return\r
99 # As with test_tmpnam() below, the Windows implementation of tmpfile()\r
100 # attempts to create a file in the root directory of the current drive.\r
101 # On Vista and Server 2008, this test will always fail for normal users\r
102 # as writing to the root directory requires elevated privileges. With\r
103 # XP and below, the semantics of tmpfile() are the same, but the user\r
104 # running the test is more likely to have administrative privileges on\r
105 # their account already. If that's the case, then os.tmpfile() should\r
106 # work. In order to make this test as useful as possible, rather than\r
107 # trying to detect Windows versions or whether or not the user has the\r
108 # right permissions, just try and create a file in the root directory\r
109 # and see if it raises a 'Permission denied' OSError. If it does, then\r
110 # test that a subsequent call to os.tmpfile() raises the same error. If\r
111 # it doesn't, assume we're on XP or below and the user running the test\r
112 # has administrative privileges, and proceed with the test as normal.\r
113 with warnings.catch_warnings():\r
114 warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning)\r
115\r
116 if sys.platform == 'win32':\r
117 name = '\\python_test_os_test_tmpfile.txt'\r
118 if os.path.exists(name):\r
119 os.remove(name)\r
120 try:\r
121 fp = open(name, 'w')\r
122 except IOError, first:\r
123 # open() failed, assert tmpfile() fails in the same way.\r
124 # Although open() raises an IOError and os.tmpfile() raises an\r
125 # OSError(), 'args' will be (13, 'Permission denied') in both\r
126 # cases.\r
127 try:\r
128 fp = os.tmpfile()\r
129 except OSError, second:\r
130 self.assertEqual(first.args, second.args)\r
131 else:\r
132 self.fail("expected os.tmpfile() to raise OSError")\r
133 return\r
134 else:\r
135 # open() worked, therefore, tmpfile() should work. Close our\r
136 # dummy file and proceed with the test as normal.\r
137 fp.close()\r
138 os.remove(name)\r
139\r
140 fp = os.tmpfile()\r
141 fp.write("foobar")\r
142 fp.seek(0,0)\r
143 s = fp.read()\r
144 fp.close()\r
145 self.assertTrue(s == "foobar")\r
146\r
147 def test_tmpnam(self):\r
148 if not hasattr(os, "tmpnam"):\r
149 return\r
150 with warnings.catch_warnings():\r
151 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,\r
152 r"test_os$")\r
153 warnings.filterwarnings("ignore", "tmpnam", DeprecationWarning)\r
154\r
155 name = os.tmpnam()\r
156 if sys.platform in ("win32",):\r
157 # The Windows tmpnam() seems useless. From the MS docs:\r
158 #\r
159 # The character string that tmpnam creates consists of\r
160 # the path prefix, defined by the entry P_tmpdir in the\r
161 # file STDIO.H, followed by a sequence consisting of the\r
162 # digit characters '0' through '9'; the numerical value\r
163 # of this string is in the range 1 - 65,535. Changing the\r
164 # definitions of L_tmpnam or P_tmpdir in STDIO.H does not\r
165 # change the operation of tmpnam.\r
166 #\r
167 # The really bizarre part is that, at least under MSVC6,\r
168 # P_tmpdir is "\\". That is, the path returned refers to\r
169 # the root of the current drive. That's a terrible place to\r
170 # put temp files, and, depending on privileges, the user\r
171 # may not even be able to open a file in the root directory.\r
172 self.assertFalse(os.path.exists(name),\r
173 "file already exists for temporary file")\r
174 else:\r
175 self.check_tempfile(name)\r
176\r
177# Test attributes on return values from os.*stat* family.\r
178class StatAttributeTests(unittest.TestCase):\r
179 def setUp(self):\r
180 os.mkdir(test_support.TESTFN)\r
181 self.fname = os.path.join(test_support.TESTFN, "f1")\r
182 f = open(self.fname, 'wb')\r
183 f.write("ABC")\r
184 f.close()\r
185\r
186 def tearDown(self):\r
187 os.unlink(self.fname)\r
188 os.rmdir(test_support.TESTFN)\r
189\r
190 def test_stat_attributes(self):\r
191 if not hasattr(os, "stat"):\r
192 return\r
193\r
194 import stat\r
195 result = os.stat(self.fname)\r
196\r
197 # Make sure direct access works\r
198 self.assertEqual(result[stat.ST_SIZE], 3)\r
199 self.assertEqual(result.st_size, 3)\r
200\r
201 # Make sure all the attributes are there\r
202 members = dir(result)\r
203 for name in dir(stat):\r
204 if name[:3] == 'ST_':\r
205 attr = name.lower()\r
206 if name.endswith("TIME"):\r
207 def trunc(x): return int(x)\r
208 else:\r
209 def trunc(x): return x\r
210 self.assertEqual(trunc(getattr(result, attr)),\r
211 result[getattr(stat, name)])\r
212 self.assertIn(attr, members)\r
213\r
214 try:\r
215 result[200]\r
216 self.fail("No exception thrown")\r
217 except IndexError:\r
218 pass\r
219\r
220 # Make sure that assignment fails\r
221 try:\r
222 result.st_mode = 1\r
223 self.fail("No exception thrown")\r
224 except (AttributeError, TypeError):\r
225 pass\r
226\r
227 try:\r
228 result.st_rdev = 1\r
229 self.fail("No exception thrown")\r
230 except (AttributeError, TypeError):\r
231 pass\r
232\r
233 try:\r
234 result.parrot = 1\r
235 self.fail("No exception thrown")\r
236 except AttributeError:\r
237 pass\r
238\r
239 # Use the stat_result constructor with a too-short tuple.\r
240 try:\r
241 result2 = os.stat_result((10,))\r
242 self.fail("No exception thrown")\r
243 except TypeError:\r
244 pass\r
245\r
246 # Use the constructor with a too-long tuple.\r
247 try:\r
248 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))\r
249 except TypeError:\r
250 pass\r
251\r
252\r
253 def test_statvfs_attributes(self):\r
254 if not hasattr(os, "statvfs"):\r
255 return\r
256\r
257 try:\r
258 result = os.statvfs(self.fname)\r
259 except OSError, e:\r
260 # On AtheOS, glibc always returns ENOSYS\r
261 if e.errno == errno.ENOSYS:\r
262 return\r
263\r
264 # Make sure direct access works\r
265 self.assertEqual(result.f_bfree, result[3])\r
266\r
267 # Make sure all the attributes are there.\r
268 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',\r
269 'ffree', 'favail', 'flag', 'namemax')\r
270 for value, member in enumerate(members):\r
271 self.assertEqual(getattr(result, 'f_' + member), result[value])\r
272\r
273 # Make sure that assignment really fails\r
274 try:\r
275 result.f_bfree = 1\r
276 self.fail("No exception thrown")\r
277 except TypeError:\r
278 pass\r
279\r
280 try:\r
281 result.parrot = 1\r
282 self.fail("No exception thrown")\r
283 except AttributeError:\r
284 pass\r
285\r
286 # Use the constructor with a too-short tuple.\r
287 try:\r
288 result2 = os.statvfs_result((10,))\r
289 self.fail("No exception thrown")\r
290 except TypeError:\r
291 pass\r
292\r
293 # Use the constructor with a too-long tuple.\r
294 try:\r
295 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))\r
296 except TypeError:\r
297 pass\r
298\r
299 def test_utime_dir(self):\r
300 delta = 1000000\r
301 st = os.stat(test_support.TESTFN)\r
302 # round to int, because some systems may support sub-second\r
303 # time stamps in stat, but not in utime.\r
304 os.utime(test_support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))\r
305 st2 = os.stat(test_support.TESTFN)\r
306 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))\r
307\r
308 # Restrict test to Win32, since there is no guarantee other\r
309 # systems support centiseconds\r
310 if sys.platform == 'win32':\r
311 def get_file_system(path):\r
312 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'\r
313 import ctypes\r
314 kernel32 = ctypes.windll.kernel32\r
315 buf = ctypes.create_string_buffer("", 100)\r
316 if kernel32.GetVolumeInformationA(root, None, 0, None, None, None, buf, len(buf)):\r
317 return buf.value\r
318\r
319 if get_file_system(test_support.TESTFN) == "NTFS":\r
320 def test_1565150(self):\r
321 t1 = 1159195039.25\r
322 os.utime(self.fname, (t1, t1))\r
323 self.assertEqual(os.stat(self.fname).st_mtime, t1)\r
324\r
325 def test_large_time(self):\r
326 t1 = 5000000000 # some day in 2128\r
327 os.utime(self.fname, (t1, t1))\r
328 self.assertEqual(os.stat(self.fname).st_mtime, t1)\r
329\r
330 def test_1686475(self):\r
331 # Verify that an open file can be stat'ed\r
332 try:\r
333 os.stat(r"c:\pagefile.sys")\r
334 except WindowsError, e:\r
335 if e.errno == 2: # file does not exist; cannot run test\r
336 return\r
337 self.fail("Could not stat pagefile.sys")\r
338\r
339from test import mapping_tests\r
340\r
341class EnvironTests(mapping_tests.BasicTestMappingProtocol):\r
342 """check that os.environ object conform to mapping protocol"""\r
343 type2test = None\r
344 def _reference(self):\r
345 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}\r
346 def _empty_mapping(self):\r
347 os.environ.clear()\r
348 return os.environ\r
349 def setUp(self):\r
350 self.__save = dict(os.environ)\r
351 os.environ.clear()\r
352 def tearDown(self):\r
353 os.environ.clear()\r
354 os.environ.update(self.__save)\r
355\r
356 # Bug 1110478\r
357 def test_update2(self):\r
358 if os.path.exists("/bin/sh"):\r
359 os.environ.update(HELLO="World")\r
360 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:\r
361 value = popen.read().strip()\r
362 self.assertEqual(value, "World")\r
363\r
364class WalkTests(unittest.TestCase):\r
365 """Tests for os.walk()."""\r
366\r
367 def test_traversal(self):\r
368 import os\r
369 from os.path import join\r
370\r
371 # Build:\r
372 # TESTFN/\r
373 # TEST1/ a file kid and two directory kids\r
374 # tmp1\r
375 # SUB1/ a file kid and a directory kid\r
376 # tmp2\r
377 # SUB11/ no kids\r
378 # SUB2/ a file kid and a dirsymlink kid\r
379 # tmp3\r
380 # link/ a symlink to TESTFN.2\r
381 # TEST2/\r
382 # tmp4 a lone file\r
383 walk_path = join(test_support.TESTFN, "TEST1")\r
384 sub1_path = join(walk_path, "SUB1")\r
385 sub11_path = join(sub1_path, "SUB11")\r
386 sub2_path = join(walk_path, "SUB2")\r
387 tmp1_path = join(walk_path, "tmp1")\r
388 tmp2_path = join(sub1_path, "tmp2")\r
389 tmp3_path = join(sub2_path, "tmp3")\r
390 link_path = join(sub2_path, "link")\r
391 t2_path = join(test_support.TESTFN, "TEST2")\r
392 tmp4_path = join(test_support.TESTFN, "TEST2", "tmp4")\r
393\r
394 # Create stuff.\r
395 os.makedirs(sub11_path)\r
396 os.makedirs(sub2_path)\r
397 os.makedirs(t2_path)\r
398 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:\r
399 f = file(path, "w")\r
400 f.write("I'm " + path + " and proud of it. Blame test_os.\n")\r
401 f.close()\r
402 if hasattr(os, "symlink"):\r
403 os.symlink(os.path.abspath(t2_path), link_path)\r
404 sub2_tree = (sub2_path, ["link"], ["tmp3"])\r
405 else:\r
406 sub2_tree = (sub2_path, [], ["tmp3"])\r
407\r
408 # Walk top-down.\r
409 all = list(os.walk(walk_path))\r
410 self.assertEqual(len(all), 4)\r
411 # We can't know which order SUB1 and SUB2 will appear in.\r
412 # Not flipped: TESTFN, SUB1, SUB11, SUB2\r
413 # flipped: TESTFN, SUB2, SUB1, SUB11\r
414 flipped = all[0][1][0] != "SUB1"\r
415 all[0][1].sort()\r
416 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))\r
417 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))\r
418 self.assertEqual(all[2 + flipped], (sub11_path, [], []))\r
419 self.assertEqual(all[3 - 2 * flipped], sub2_tree)\r
420\r
421 # Prune the search.\r
422 all = []\r
423 for root, dirs, files in os.walk(walk_path):\r
424 all.append((root, dirs, files))\r
425 # Don't descend into SUB1.\r
426 if 'SUB1' in dirs:\r
427 # Note that this also mutates the dirs we appended to all!\r
428 dirs.remove('SUB1')\r
429 self.assertEqual(len(all), 2)\r
430 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))\r
431 self.assertEqual(all[1], sub2_tree)\r
432\r
433 # Walk bottom-up.\r
434 all = list(os.walk(walk_path, topdown=False))\r
435 self.assertEqual(len(all), 4)\r
436 # We can't know which order SUB1 and SUB2 will appear in.\r
437 # Not flipped: SUB11, SUB1, SUB2, TESTFN\r
438 # flipped: SUB2, SUB11, SUB1, TESTFN\r
439 flipped = all[3][1][0] != "SUB1"\r
440 all[3][1].sort()\r
441 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))\r
442 self.assertEqual(all[flipped], (sub11_path, [], []))\r
443 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))\r
444 self.assertEqual(all[2 - 2 * flipped], sub2_tree)\r
445\r
446 if hasattr(os, "symlink"):\r
447 # Walk, following symlinks.\r
448 for root, dirs, files in os.walk(walk_path, followlinks=True):\r
449 if root == link_path:\r
450 self.assertEqual(dirs, [])\r
451 self.assertEqual(files, ["tmp4"])\r
452 break\r
453 else:\r
454 self.fail("Didn't follow symlink with followlinks=True")\r
455\r
456 def tearDown(self):\r
457 # Tear everything down. This is a decent use for bottom-up on\r
458 # Windows, which doesn't have a recursive delete command. The\r
459 # (not so) subtlety is that rmdir will fail unless the dir's\r
460 # kids are removed first, so bottom up is essential.\r
461 for root, dirs, files in os.walk(test_support.TESTFN, topdown=False):\r
462 for name in files:\r
463 os.remove(os.path.join(root, name))\r
464 for name in dirs:\r
465 dirname = os.path.join(root, name)\r
466 if not os.path.islink(dirname):\r
467 os.rmdir(dirname)\r
468 else:\r
469 os.remove(dirname)\r
470 os.rmdir(test_support.TESTFN)\r
471\r
472class MakedirTests (unittest.TestCase):\r
473 def setUp(self):\r
474 os.mkdir(test_support.TESTFN)\r
475\r
476 def test_makedir(self):\r
477 base = test_support.TESTFN\r
478 path = os.path.join(base, 'dir1', 'dir2', 'dir3')\r
479 os.makedirs(path) # Should work\r
480 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')\r
481 os.makedirs(path)\r
482\r
483 # Try paths with a '.' in them\r
484 self.assertRaises(OSError, os.makedirs, os.curdir)\r
485 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)\r
486 os.makedirs(path)\r
487 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',\r
488 'dir5', 'dir6')\r
489 os.makedirs(path)\r
490\r
491\r
492\r
493\r
494 def tearDown(self):\r
495 path = os.path.join(test_support.TESTFN, 'dir1', 'dir2', 'dir3',\r
496 'dir4', 'dir5', 'dir6')\r
497 # If the tests failed, the bottom-most directory ('../dir6')\r
498 # may not have been created, so we look for the outermost directory\r
499 # that exists.\r
500 while not os.path.exists(path) and path != test_support.TESTFN:\r
501 path = os.path.dirname(path)\r
502\r
503 os.removedirs(path)\r
504\r
505class DevNullTests (unittest.TestCase):\r
506 def test_devnull(self):\r
507 f = file(os.devnull, 'w')\r
508 f.write('hello')\r
509 f.close()\r
510 f = file(os.devnull, 'r')\r
511 self.assertEqual(f.read(), '')\r
512 f.close()\r
513\r
514class URandomTests (unittest.TestCase):\r
515 def test_urandom(self):\r
516 try:\r
517 self.assertEqual(len(os.urandom(1)), 1)\r
518 self.assertEqual(len(os.urandom(10)), 10)\r
519 self.assertEqual(len(os.urandom(100)), 100)\r
520 self.assertEqual(len(os.urandom(1000)), 1000)\r
521 # see http://bugs.python.org/issue3708\r
522 self.assertRaises(TypeError, os.urandom, 0.9)\r
523 self.assertRaises(TypeError, os.urandom, 1.1)\r
524 self.assertRaises(TypeError, os.urandom, 2.0)\r
525 except NotImplementedError:\r
526 pass\r
527\r
528 def test_execvpe_with_bad_arglist(self):\r
529 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)\r
530\r
531class Win32ErrorTests(unittest.TestCase):\r
532 def test_rename(self):\r
533 self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak")\r
534\r
535 def test_remove(self):\r
536 self.assertRaises(WindowsError, os.remove, test_support.TESTFN)\r
537\r
538 def test_chdir(self):\r
539 self.assertRaises(WindowsError, os.chdir, test_support.TESTFN)\r
540\r
541 def test_mkdir(self):\r
542 f = open(test_support.TESTFN, "w")\r
543 try:\r
544 self.assertRaises(WindowsError, os.mkdir, test_support.TESTFN)\r
545 finally:\r
546 f.close()\r
547 os.unlink(test_support.TESTFN)\r
548\r
549 def test_utime(self):\r
550 self.assertRaises(WindowsError, os.utime, test_support.TESTFN, None)\r
551\r
552 def test_chmod(self):\r
553 self.assertRaises(WindowsError, os.chmod, test_support.TESTFN, 0)\r
554\r
555class TestInvalidFD(unittest.TestCase):\r
556 singles = ["fchdir", "fdopen", "dup", "fdatasync", "fstat",\r
557 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]\r
558 #singles.append("close")\r
559 #We omit close because it doesn'r raise an exception on some platforms\r
560 def get_single(f):\r
561 def helper(self):\r
562 if hasattr(os, f):\r
563 self.check(getattr(os, f))\r
564 return helper\r
565 for f in singles:\r
566 locals()["test_"+f] = get_single(f)\r
567\r
568 def check(self, f, *args):\r
569 try:\r
570 f(test_support.make_bad_fd(), *args)\r
571 except OSError as e:\r
572 self.assertEqual(e.errno, errno.EBADF)\r
573 else:\r
574 self.fail("%r didn't raise a OSError with a bad file descriptor"\r
575 % f)\r
576\r
577 def test_isatty(self):\r
578 if hasattr(os, "isatty"):\r
579 self.assertEqual(os.isatty(test_support.make_bad_fd()), False)\r
580\r
581 def test_closerange(self):\r
582 if hasattr(os, "closerange"):\r
583 fd = test_support.make_bad_fd()\r
584 # Make sure none of the descriptors we are about to close are\r
585 # currently valid (issue 6542).\r
586 for i in range(10):\r
587 try: os.fstat(fd+i)\r
588 except OSError:\r
589 pass\r
590 else:\r
591 break\r
592 if i < 2:\r
593 raise unittest.SkipTest(\r
594 "Unable to acquire a range of invalid file descriptors")\r
595 self.assertEqual(os.closerange(fd, fd + i-1), None)\r
596\r
597 def test_dup2(self):\r
598 if hasattr(os, "dup2"):\r
599 self.check(os.dup2, 20)\r
600\r
601 def test_fchmod(self):\r
602 if hasattr(os, "fchmod"):\r
603 self.check(os.fchmod, 0)\r
604\r
605 def test_fchown(self):\r
606 if hasattr(os, "fchown"):\r
607 self.check(os.fchown, -1, -1)\r
608\r
609 def test_fpathconf(self):\r
610 if hasattr(os, "fpathconf"):\r
611 self.check(os.fpathconf, "PC_NAME_MAX")\r
612\r
613 def test_ftruncate(self):\r
614 if hasattr(os, "ftruncate"):\r
615 self.check(os.ftruncate, 0)\r
616\r
617 def test_lseek(self):\r
618 if hasattr(os, "lseek"):\r
619 self.check(os.lseek, 0, 0)\r
620\r
621 def test_read(self):\r
622 if hasattr(os, "read"):\r
623 self.check(os.read, 1)\r
624\r
625 def test_tcsetpgrpt(self):\r
626 if hasattr(os, "tcsetpgrp"):\r
627 self.check(os.tcsetpgrp, 0)\r
628\r
629 def test_write(self):\r
630 if hasattr(os, "write"):\r
631 self.check(os.write, " ")\r
632\r
633if sys.platform != 'win32':\r
634 class Win32ErrorTests(unittest.TestCase):\r
635 pass\r
636\r
637 class PosixUidGidTests(unittest.TestCase):\r
638 if hasattr(os, 'setuid'):\r
639 def test_setuid(self):\r
640 if os.getuid() != 0:\r
641 self.assertRaises(os.error, os.setuid, 0)\r
642 self.assertRaises(OverflowError, os.setuid, 1<<32)\r
643\r
644 if hasattr(os, 'setgid'):\r
645 def test_setgid(self):\r
646 if os.getuid() != 0:\r
647 self.assertRaises(os.error, os.setgid, 0)\r
648 self.assertRaises(OverflowError, os.setgid, 1<<32)\r
649\r
650 if hasattr(os, 'seteuid'):\r
651 def test_seteuid(self):\r
652 if os.getuid() != 0:\r
653 self.assertRaises(os.error, os.seteuid, 0)\r
654 self.assertRaises(OverflowError, os.seteuid, 1<<32)\r
655\r
656 if hasattr(os, 'setegid'):\r
657 def test_setegid(self):\r
658 if os.getuid() != 0:\r
659 self.assertRaises(os.error, os.setegid, 0)\r
660 self.assertRaises(OverflowError, os.setegid, 1<<32)\r
661\r
662 if hasattr(os, 'setreuid'):\r
663 def test_setreuid(self):\r
664 if os.getuid() != 0:\r
665 self.assertRaises(os.error, os.setreuid, 0, 0)\r
666 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)\r
667 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)\r
668\r
669 def test_setreuid_neg1(self):\r
670 # Needs to accept -1. We run this in a subprocess to avoid\r
671 # altering the test runner's process state (issue8045).\r
672 subprocess.check_call([\r
673 sys.executable, '-c',\r
674 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])\r
675\r
676 if hasattr(os, 'setregid'):\r
677 def test_setregid(self):\r
678 if os.getuid() != 0:\r
679 self.assertRaises(os.error, os.setregid, 0, 0)\r
680 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)\r
681 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)\r
682\r
683 def test_setregid_neg1(self):\r
684 # Needs to accept -1. We run this in a subprocess to avoid\r
685 # altering the test runner's process state (issue8045).\r
686 subprocess.check_call([\r
687 sys.executable, '-c',\r
688 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])\r
689else:\r
690 class PosixUidGidTests(unittest.TestCase):\r
691 pass\r
692\r
693@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")\r
694class Win32KillTests(unittest.TestCase):\r
695 def _kill(self, sig):\r
696 # Start sys.executable as a subprocess and communicate from the\r
697 # subprocess to the parent that the interpreter is ready. When it\r
698 # becomes ready, send *sig* via os.kill to the subprocess and check\r
699 # that the return code is equal to *sig*.\r
700 import ctypes\r
701 from ctypes import wintypes\r
702 import msvcrt\r
703\r
704 # Since we can't access the contents of the process' stdout until the\r
705 # process has exited, use PeekNamedPipe to see what's inside stdout\r
706 # without waiting. This is done so we can tell that the interpreter\r
707 # is started and running at a point where it could handle a signal.\r
708 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe\r
709 PeekNamedPipe.restype = wintypes.BOOL\r
710 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle\r
711 ctypes.POINTER(ctypes.c_char), # stdout buf\r
712 wintypes.DWORD, # Buffer size\r
713 ctypes.POINTER(wintypes.DWORD), # bytes read\r
714 ctypes.POINTER(wintypes.DWORD), # bytes avail\r
715 ctypes.POINTER(wintypes.DWORD)) # bytes left\r
716 msg = "running"\r
717 proc = subprocess.Popen([sys.executable, "-c",\r
718 "import sys;"\r
719 "sys.stdout.write('{}');"\r
720 "sys.stdout.flush();"\r
721 "input()".format(msg)],\r
722 stdout=subprocess.PIPE,\r
723 stderr=subprocess.PIPE,\r
724 stdin=subprocess.PIPE)\r
725 self.addCleanup(proc.stdout.close)\r
726 self.addCleanup(proc.stderr.close)\r
727 self.addCleanup(proc.stdin.close)\r
728\r
729 count, max = 0, 100\r
730 while count < max and proc.poll() is None:\r
731 # Create a string buffer to store the result of stdout from the pipe\r
732 buf = ctypes.create_string_buffer(len(msg))\r
733 # Obtain the text currently in proc.stdout\r
734 # Bytes read/avail/left are left as NULL and unused\r
735 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),\r
736 buf, ctypes.sizeof(buf), None, None, None)\r
737 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")\r
738 if buf.value:\r
739 self.assertEqual(msg, buf.value)\r
740 break\r
741 time.sleep(0.1)\r
742 count += 1\r
743 else:\r
744 self.fail("Did not receive communication from the subprocess")\r
745\r
746 os.kill(proc.pid, sig)\r
747 self.assertEqual(proc.wait(), sig)\r
748\r
749 def test_kill_sigterm(self):\r
750 # SIGTERM doesn't mean anything special, but make sure it works\r
751 self._kill(signal.SIGTERM)\r
752\r
753 def test_kill_int(self):\r
754 # os.kill on Windows can take an int which gets set as the exit code\r
755 self._kill(100)\r
756\r
757 def _kill_with_event(self, event, name):\r
758 tagname = "test_os_%s" % uuid.uuid1()\r
759 m = mmap.mmap(-1, 1, tagname)\r
760 m[0] = '0'\r
761 # Run a script which has console control handling enabled.\r
762 proc = subprocess.Popen([sys.executable,\r
763 os.path.join(os.path.dirname(__file__),\r
764 "win_console_handler.py"), tagname],\r
765 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)\r
766 # Let the interpreter startup before we send signals. See #3137.\r
767 count, max = 0, 20\r
768 while count < max and proc.poll() is None:\r
769 if m[0] == '1':\r
770 break\r
771 time.sleep(0.5)\r
772 count += 1\r
773 else:\r
774 self.fail("Subprocess didn't finish initialization")\r
775 os.kill(proc.pid, event)\r
776 # proc.send_signal(event) could also be done here.\r
777 # Allow time for the signal to be passed and the process to exit.\r
778 time.sleep(0.5)\r
779 if not proc.poll():\r
780 # Forcefully kill the process if we weren't able to signal it.\r
781 os.kill(proc.pid, signal.SIGINT)\r
782 self.fail("subprocess did not stop on {}".format(name))\r
783\r
784 @unittest.skip("subprocesses aren't inheriting CTRL+C property")\r
785 def test_CTRL_C_EVENT(self):\r
786 from ctypes import wintypes\r
787 import ctypes\r
788\r
789 # Make a NULL value by creating a pointer with no argument.\r
790 NULL = ctypes.POINTER(ctypes.c_int)()\r
791 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler\r
792 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),\r
793 wintypes.BOOL)\r
794 SetConsoleCtrlHandler.restype = wintypes.BOOL\r
795\r
796 # Calling this with NULL and FALSE causes the calling process to\r
797 # handle CTRL+C, rather than ignore it. This property is inherited\r
798 # by subprocesses.\r
799 SetConsoleCtrlHandler(NULL, 0)\r
800\r
801 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")\r
802\r
803 def test_CTRL_BREAK_EVENT(self):\r
804 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")\r
805\r
806\r
807def test_main():\r
808 test_support.run_unittest(\r
809 FileTests,\r
810 TemporaryFileTests,\r
811 StatAttributeTests,\r
812 EnvironTests,\r
813 WalkTests,\r
814 MakedirTests,\r
815 DevNullTests,\r
816 URandomTests,\r
817 Win32ErrorTests,\r
818 TestInvalidFD,\r
819 PosixUidGidTests,\r
820 Win32KillTests\r
821 )\r
822\r
823if __name__ == "__main__":\r
824 test_main()\r