]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | import unittest\r |
2 | from test import test_support\r | |
3 | import subprocess\r | |
4 | import sys\r | |
5 | import signal\r | |
6 | import os\r | |
7 | import errno\r | |
8 | import tempfile\r | |
9 | import time\r | |
10 | import re\r | |
11 | import sysconfig\r | |
12 | \r | |
13 | mswindows = (sys.platform == "win32")\r | |
14 | \r | |
15 | #\r | |
16 | # Depends on the following external programs: Python\r | |
17 | #\r | |
18 | \r | |
19 | if mswindows:\r | |
20 | SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '\r | |
21 | 'os.O_BINARY);')\r | |
22 | else:\r | |
23 | SETBINARY = ''\r | |
24 | \r | |
25 | \r | |
26 | try:\r | |
27 | mkstemp = tempfile.mkstemp\r | |
28 | except AttributeError:\r | |
29 | # tempfile.mkstemp is not available\r | |
30 | def mkstemp():\r | |
31 | """Replacement for mkstemp, calling mktemp."""\r | |
32 | fname = tempfile.mktemp()\r | |
33 | return os.open(fname, os.O_RDWR|os.O_CREAT), fname\r | |
34 | \r | |
35 | \r | |
36 | class BaseTestCase(unittest.TestCase):\r | |
37 | def setUp(self):\r | |
38 | # Try to minimize the number of children we have so this test\r | |
39 | # doesn't crash on some buildbots (Alphas in particular).\r | |
40 | test_support.reap_children()\r | |
41 | \r | |
42 | def tearDown(self):\r | |
43 | for inst in subprocess._active:\r | |
44 | inst.wait()\r | |
45 | subprocess._cleanup()\r | |
46 | self.assertFalse(subprocess._active, "subprocess._active not empty")\r | |
47 | \r | |
48 | def assertStderrEqual(self, stderr, expected, msg=None):\r | |
49 | # In a debug build, stuff like "[6580 refs]" is printed to stderr at\r | |
50 | # shutdown time. That frustrates tests trying to check stderr produced\r | |
51 | # from a spawned Python process.\r | |
52 | actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)\r | |
53 | self.assertEqual(actual, expected, msg)\r | |
54 | \r | |
55 | \r | |
56 | class ProcessTestCase(BaseTestCase):\r | |
57 | \r | |
58 | def test_call_seq(self):\r | |
59 | # call() function with sequence argument\r | |
60 | rc = subprocess.call([sys.executable, "-c",\r | |
61 | "import sys; sys.exit(47)"])\r | |
62 | self.assertEqual(rc, 47)\r | |
63 | \r | |
64 | def test_check_call_zero(self):\r | |
65 | # check_call() function with zero return code\r | |
66 | rc = subprocess.check_call([sys.executable, "-c",\r | |
67 | "import sys; sys.exit(0)"])\r | |
68 | self.assertEqual(rc, 0)\r | |
69 | \r | |
70 | def test_check_call_nonzero(self):\r | |
71 | # check_call() function with non-zero return code\r | |
72 | with self.assertRaises(subprocess.CalledProcessError) as c:\r | |
73 | subprocess.check_call([sys.executable, "-c",\r | |
74 | "import sys; sys.exit(47)"])\r | |
75 | self.assertEqual(c.exception.returncode, 47)\r | |
76 | \r | |
77 | def test_check_output(self):\r | |
78 | # check_output() function with zero return code\r | |
79 | output = subprocess.check_output(\r | |
80 | [sys.executable, "-c", "print 'BDFL'"])\r | |
81 | self.assertIn('BDFL', output)\r | |
82 | \r | |
83 | def test_check_output_nonzero(self):\r | |
84 | # check_call() function with non-zero return code\r | |
85 | with self.assertRaises(subprocess.CalledProcessError) as c:\r | |
86 | subprocess.check_output(\r | |
87 | [sys.executable, "-c", "import sys; sys.exit(5)"])\r | |
88 | self.assertEqual(c.exception.returncode, 5)\r | |
89 | \r | |
90 | def test_check_output_stderr(self):\r | |
91 | # check_output() function stderr redirected to stdout\r | |
92 | output = subprocess.check_output(\r | |
93 | [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],\r | |
94 | stderr=subprocess.STDOUT)\r | |
95 | self.assertIn('BDFL', output)\r | |
96 | \r | |
97 | def test_check_output_stdout_arg(self):\r | |
98 | # check_output() function stderr redirected to stdout\r | |
99 | with self.assertRaises(ValueError) as c:\r | |
100 | output = subprocess.check_output(\r | |
101 | [sys.executable, "-c", "print 'will not be run'"],\r | |
102 | stdout=sys.stdout)\r | |
103 | self.fail("Expected ValueError when stdout arg supplied.")\r | |
104 | self.assertIn('stdout', c.exception.args[0])\r | |
105 | \r | |
106 | def test_call_kwargs(self):\r | |
107 | # call() function with keyword args\r | |
108 | newenv = os.environ.copy()\r | |
109 | newenv["FRUIT"] = "banana"\r | |
110 | rc = subprocess.call([sys.executable, "-c",\r | |
111 | 'import sys, os;'\r | |
112 | 'sys.exit(os.getenv("FRUIT")=="banana")'],\r | |
113 | env=newenv)\r | |
114 | self.assertEqual(rc, 1)\r | |
115 | \r | |
116 | def test_stdin_none(self):\r | |
117 | # .stdin is None when not redirected\r | |
118 | p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],\r | |
119 | stdout=subprocess.PIPE, stderr=subprocess.PIPE)\r | |
120 | self.addCleanup(p.stdout.close)\r | |
121 | self.addCleanup(p.stderr.close)\r | |
122 | p.wait()\r | |
123 | self.assertEqual(p.stdin, None)\r | |
124 | \r | |
125 | def test_stdout_none(self):\r | |
126 | # .stdout is None when not redirected\r | |
127 | p = subprocess.Popen([sys.executable, "-c",\r | |
128 | 'print " this bit of output is from a '\r | |
129 | 'test of stdout in a different '\r | |
130 | 'process ..."'],\r | |
131 | stdin=subprocess.PIPE, stderr=subprocess.PIPE)\r | |
132 | self.addCleanup(p.stdin.close)\r | |
133 | self.addCleanup(p.stderr.close)\r | |
134 | p.wait()\r | |
135 | self.assertEqual(p.stdout, None)\r | |
136 | \r | |
137 | def test_stderr_none(self):\r | |
138 | # .stderr is None when not redirected\r | |
139 | p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],\r | |
140 | stdin=subprocess.PIPE, stdout=subprocess.PIPE)\r | |
141 | self.addCleanup(p.stdout.close)\r | |
142 | self.addCleanup(p.stdin.close)\r | |
143 | p.wait()\r | |
144 | self.assertEqual(p.stderr, None)\r | |
145 | \r | |
146 | def test_executable_with_cwd(self):\r | |
147 | python_dir = os.path.dirname(os.path.realpath(sys.executable))\r | |
148 | p = subprocess.Popen(["somethingyoudonthave", "-c",\r | |
149 | "import sys; sys.exit(47)"],\r | |
150 | executable=sys.executable, cwd=python_dir)\r | |
151 | p.wait()\r | |
152 | self.assertEqual(p.returncode, 47)\r | |
153 | \r | |
154 | @unittest.skipIf(sysconfig.is_python_build(),\r | |
155 | "need an installed Python. See #7774")\r | |
156 | def test_executable_without_cwd(self):\r | |
157 | # For a normal installation, it should work without 'cwd'\r | |
158 | # argument. For test runs in the build directory, see #7774.\r | |
159 | p = subprocess.Popen(["somethingyoudonthave", "-c",\r | |
160 | "import sys; sys.exit(47)"],\r | |
161 | executable=sys.executable)\r | |
162 | p.wait()\r | |
163 | self.assertEqual(p.returncode, 47)\r | |
164 | \r | |
165 | def test_stdin_pipe(self):\r | |
166 | # stdin redirection\r | |
167 | p = subprocess.Popen([sys.executable, "-c",\r | |
168 | 'import sys; sys.exit(sys.stdin.read() == "pear")'],\r | |
169 | stdin=subprocess.PIPE)\r | |
170 | p.stdin.write("pear")\r | |
171 | p.stdin.close()\r | |
172 | p.wait()\r | |
173 | self.assertEqual(p.returncode, 1)\r | |
174 | \r | |
175 | def test_stdin_filedes(self):\r | |
176 | # stdin is set to open file descriptor\r | |
177 | tf = tempfile.TemporaryFile()\r | |
178 | d = tf.fileno()\r | |
179 | os.write(d, "pear")\r | |
180 | os.lseek(d, 0, 0)\r | |
181 | p = subprocess.Popen([sys.executable, "-c",\r | |
182 | 'import sys; sys.exit(sys.stdin.read() == "pear")'],\r | |
183 | stdin=d)\r | |
184 | p.wait()\r | |
185 | self.assertEqual(p.returncode, 1)\r | |
186 | \r | |
187 | def test_stdin_fileobj(self):\r | |
188 | # stdin is set to open file object\r | |
189 | tf = tempfile.TemporaryFile()\r | |
190 | tf.write("pear")\r | |
191 | tf.seek(0)\r | |
192 | p = subprocess.Popen([sys.executable, "-c",\r | |
193 | 'import sys; sys.exit(sys.stdin.read() == "pear")'],\r | |
194 | stdin=tf)\r | |
195 | p.wait()\r | |
196 | self.assertEqual(p.returncode, 1)\r | |
197 | \r | |
198 | def test_stdout_pipe(self):\r | |
199 | # stdout redirection\r | |
200 | p = subprocess.Popen([sys.executable, "-c",\r | |
201 | 'import sys; sys.stdout.write("orange")'],\r | |
202 | stdout=subprocess.PIPE)\r | |
203 | self.addCleanup(p.stdout.close)\r | |
204 | self.assertEqual(p.stdout.read(), "orange")\r | |
205 | \r | |
206 | def test_stdout_filedes(self):\r | |
207 | # stdout is set to open file descriptor\r | |
208 | tf = tempfile.TemporaryFile()\r | |
209 | d = tf.fileno()\r | |
210 | p = subprocess.Popen([sys.executable, "-c",\r | |
211 | 'import sys; sys.stdout.write("orange")'],\r | |
212 | stdout=d)\r | |
213 | p.wait()\r | |
214 | os.lseek(d, 0, 0)\r | |
215 | self.assertEqual(os.read(d, 1024), "orange")\r | |
216 | \r | |
217 | def test_stdout_fileobj(self):\r | |
218 | # stdout is set to open file object\r | |
219 | tf = tempfile.TemporaryFile()\r | |
220 | p = subprocess.Popen([sys.executable, "-c",\r | |
221 | 'import sys; sys.stdout.write("orange")'],\r | |
222 | stdout=tf)\r | |
223 | p.wait()\r | |
224 | tf.seek(0)\r | |
225 | self.assertEqual(tf.read(), "orange")\r | |
226 | \r | |
227 | def test_stderr_pipe(self):\r | |
228 | # stderr redirection\r | |
229 | p = subprocess.Popen([sys.executable, "-c",\r | |
230 | 'import sys; sys.stderr.write("strawberry")'],\r | |
231 | stderr=subprocess.PIPE)\r | |
232 | self.addCleanup(p.stderr.close)\r | |
233 | self.assertStderrEqual(p.stderr.read(), "strawberry")\r | |
234 | \r | |
235 | def test_stderr_filedes(self):\r | |
236 | # stderr is set to open file descriptor\r | |
237 | tf = tempfile.TemporaryFile()\r | |
238 | d = tf.fileno()\r | |
239 | p = subprocess.Popen([sys.executable, "-c",\r | |
240 | 'import sys; sys.stderr.write("strawberry")'],\r | |
241 | stderr=d)\r | |
242 | p.wait()\r | |
243 | os.lseek(d, 0, 0)\r | |
244 | self.assertStderrEqual(os.read(d, 1024), "strawberry")\r | |
245 | \r | |
246 | def test_stderr_fileobj(self):\r | |
247 | # stderr is set to open file object\r | |
248 | tf = tempfile.TemporaryFile()\r | |
249 | p = subprocess.Popen([sys.executable, "-c",\r | |
250 | 'import sys; sys.stderr.write("strawberry")'],\r | |
251 | stderr=tf)\r | |
252 | p.wait()\r | |
253 | tf.seek(0)\r | |
254 | self.assertStderrEqual(tf.read(), "strawberry")\r | |
255 | \r | |
256 | def test_stdout_stderr_pipe(self):\r | |
257 | # capture stdout and stderr to the same pipe\r | |
258 | p = subprocess.Popen([sys.executable, "-c",\r | |
259 | 'import sys;'\r | |
260 | 'sys.stdout.write("apple");'\r | |
261 | 'sys.stdout.flush();'\r | |
262 | 'sys.stderr.write("orange")'],\r | |
263 | stdout=subprocess.PIPE,\r | |
264 | stderr=subprocess.STDOUT)\r | |
265 | self.addCleanup(p.stdout.close)\r | |
266 | self.assertStderrEqual(p.stdout.read(), "appleorange")\r | |
267 | \r | |
268 | def test_stdout_stderr_file(self):\r | |
269 | # capture stdout and stderr to the same open file\r | |
270 | tf = tempfile.TemporaryFile()\r | |
271 | p = subprocess.Popen([sys.executable, "-c",\r | |
272 | 'import sys;'\r | |
273 | 'sys.stdout.write("apple");'\r | |
274 | 'sys.stdout.flush();'\r | |
275 | 'sys.stderr.write("orange")'],\r | |
276 | stdout=tf,\r | |
277 | stderr=tf)\r | |
278 | p.wait()\r | |
279 | tf.seek(0)\r | |
280 | self.assertStderrEqual(tf.read(), "appleorange")\r | |
281 | \r | |
282 | def test_stdout_filedes_of_stdout(self):\r | |
283 | # stdout is set to 1 (#1531862).\r | |
284 | cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))"\r | |
285 | rc = subprocess.call([sys.executable, "-c", cmd], stdout=1)\r | |
286 | self.assertEqual(rc, 2)\r | |
287 | \r | |
288 | def test_cwd(self):\r | |
289 | tmpdir = tempfile.gettempdir()\r | |
290 | # We cannot use os.path.realpath to canonicalize the path,\r | |
291 | # since it doesn't expand Tru64 {memb} strings. See bug 1063571.\r | |
292 | cwd = os.getcwd()\r | |
293 | os.chdir(tmpdir)\r | |
294 | tmpdir = os.getcwd()\r | |
295 | os.chdir(cwd)\r | |
296 | p = subprocess.Popen([sys.executable, "-c",\r | |
297 | 'import sys,os;'\r | |
298 | 'sys.stdout.write(os.getcwd())'],\r | |
299 | stdout=subprocess.PIPE,\r | |
300 | cwd=tmpdir)\r | |
301 | self.addCleanup(p.stdout.close)\r | |
302 | normcase = os.path.normcase\r | |
303 | self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))\r | |
304 | \r | |
305 | def test_env(self):\r | |
306 | newenv = os.environ.copy()\r | |
307 | newenv["FRUIT"] = "orange"\r | |
308 | p = subprocess.Popen([sys.executable, "-c",\r | |
309 | 'import sys,os;'\r | |
310 | 'sys.stdout.write(os.getenv("FRUIT"))'],\r | |
311 | stdout=subprocess.PIPE,\r | |
312 | env=newenv)\r | |
313 | self.addCleanup(p.stdout.close)\r | |
314 | self.assertEqual(p.stdout.read(), "orange")\r | |
315 | \r | |
316 | def test_communicate_stdin(self):\r | |
317 | p = subprocess.Popen([sys.executable, "-c",\r | |
318 | 'import sys;'\r | |
319 | 'sys.exit(sys.stdin.read() == "pear")'],\r | |
320 | stdin=subprocess.PIPE)\r | |
321 | p.communicate("pear")\r | |
322 | self.assertEqual(p.returncode, 1)\r | |
323 | \r | |
324 | def test_communicate_stdout(self):\r | |
325 | p = subprocess.Popen([sys.executable, "-c",\r | |
326 | 'import sys; sys.stdout.write("pineapple")'],\r | |
327 | stdout=subprocess.PIPE)\r | |
328 | (stdout, stderr) = p.communicate()\r | |
329 | self.assertEqual(stdout, "pineapple")\r | |
330 | self.assertEqual(stderr, None)\r | |
331 | \r | |
332 | def test_communicate_stderr(self):\r | |
333 | p = subprocess.Popen([sys.executable, "-c",\r | |
334 | 'import sys; sys.stderr.write("pineapple")'],\r | |
335 | stderr=subprocess.PIPE)\r | |
336 | (stdout, stderr) = p.communicate()\r | |
337 | self.assertEqual(stdout, None)\r | |
338 | self.assertStderrEqual(stderr, "pineapple")\r | |
339 | \r | |
340 | def test_communicate(self):\r | |
341 | p = subprocess.Popen([sys.executable, "-c",\r | |
342 | 'import sys,os;'\r | |
343 | 'sys.stderr.write("pineapple");'\r | |
344 | 'sys.stdout.write(sys.stdin.read())'],\r | |
345 | stdin=subprocess.PIPE,\r | |
346 | stdout=subprocess.PIPE,\r | |
347 | stderr=subprocess.PIPE)\r | |
348 | self.addCleanup(p.stdout.close)\r | |
349 | self.addCleanup(p.stderr.close)\r | |
350 | self.addCleanup(p.stdin.close)\r | |
351 | (stdout, stderr) = p.communicate("banana")\r | |
352 | self.assertEqual(stdout, "banana")\r | |
353 | self.assertStderrEqual(stderr, "pineapple")\r | |
354 | \r | |
355 | # This test is Linux specific for simplicity to at least have\r | |
356 | # some coverage. It is not a platform specific bug.\r | |
357 | @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),\r | |
358 | "Linux specific")\r | |
359 | # Test for the fd leak reported in http://bugs.python.org/issue2791.\r | |
360 | def test_communicate_pipe_fd_leak(self):\r | |
361 | fd_directory = '/proc/%d/fd' % os.getpid()\r | |
362 | num_fds_before_popen = len(os.listdir(fd_directory))\r | |
363 | p = subprocess.Popen([sys.executable, "-c", "print()"],\r | |
364 | stdout=subprocess.PIPE)\r | |
365 | p.communicate()\r | |
366 | num_fds_after_communicate = len(os.listdir(fd_directory))\r | |
367 | del p\r | |
368 | num_fds_after_destruction = len(os.listdir(fd_directory))\r | |
369 | self.assertEqual(num_fds_before_popen, num_fds_after_destruction)\r | |
370 | self.assertEqual(num_fds_before_popen, num_fds_after_communicate)\r | |
371 | \r | |
372 | def test_communicate_returns(self):\r | |
373 | # communicate() should return None if no redirection is active\r | |
374 | p = subprocess.Popen([sys.executable, "-c",\r | |
375 | "import sys; sys.exit(47)"])\r | |
376 | (stdout, stderr) = p.communicate()\r | |
377 | self.assertEqual(stdout, None)\r | |
378 | self.assertEqual(stderr, None)\r | |
379 | \r | |
380 | def test_communicate_pipe_buf(self):\r | |
381 | # communicate() with writes larger than pipe_buf\r | |
382 | # This test will probably deadlock rather than fail, if\r | |
383 | # communicate() does not work properly.\r | |
384 | x, y = os.pipe()\r | |
385 | if mswindows:\r | |
386 | pipe_buf = 512\r | |
387 | else:\r | |
388 | pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")\r | |
389 | os.close(x)\r | |
390 | os.close(y)\r | |
391 | p = subprocess.Popen([sys.executable, "-c",\r | |
392 | 'import sys,os;'\r | |
393 | 'sys.stdout.write(sys.stdin.read(47));'\r | |
394 | 'sys.stderr.write("xyz"*%d);'\r | |
395 | 'sys.stdout.write(sys.stdin.read())' % pipe_buf],\r | |
396 | stdin=subprocess.PIPE,\r | |
397 | stdout=subprocess.PIPE,\r | |
398 | stderr=subprocess.PIPE)\r | |
399 | self.addCleanup(p.stdout.close)\r | |
400 | self.addCleanup(p.stderr.close)\r | |
401 | self.addCleanup(p.stdin.close)\r | |
402 | string_to_write = "abc"*pipe_buf\r | |
403 | (stdout, stderr) = p.communicate(string_to_write)\r | |
404 | self.assertEqual(stdout, string_to_write)\r | |
405 | \r | |
406 | def test_writes_before_communicate(self):\r | |
407 | # stdin.write before communicate()\r | |
408 | p = subprocess.Popen([sys.executable, "-c",\r | |
409 | 'import sys,os;'\r | |
410 | 'sys.stdout.write(sys.stdin.read())'],\r | |
411 | stdin=subprocess.PIPE,\r | |
412 | stdout=subprocess.PIPE,\r | |
413 | stderr=subprocess.PIPE)\r | |
414 | self.addCleanup(p.stdout.close)\r | |
415 | self.addCleanup(p.stderr.close)\r | |
416 | self.addCleanup(p.stdin.close)\r | |
417 | p.stdin.write("banana")\r | |
418 | (stdout, stderr) = p.communicate("split")\r | |
419 | self.assertEqual(stdout, "bananasplit")\r | |
420 | self.assertStderrEqual(stderr, "")\r | |
421 | \r | |
422 | def test_universal_newlines(self):\r | |
423 | p = subprocess.Popen([sys.executable, "-c",\r | |
424 | 'import sys,os;' + SETBINARY +\r | |
425 | 'sys.stdout.write("line1\\n");'\r | |
426 | 'sys.stdout.flush();'\r | |
427 | 'sys.stdout.write("line2\\r");'\r | |
428 | 'sys.stdout.flush();'\r | |
429 | 'sys.stdout.write("line3\\r\\n");'\r | |
430 | 'sys.stdout.flush();'\r | |
431 | 'sys.stdout.write("line4\\r");'\r | |
432 | 'sys.stdout.flush();'\r | |
433 | 'sys.stdout.write("\\nline5");'\r | |
434 | 'sys.stdout.flush();'\r | |
435 | 'sys.stdout.write("\\nline6");'],\r | |
436 | stdout=subprocess.PIPE,\r | |
437 | universal_newlines=1)\r | |
438 | self.addCleanup(p.stdout.close)\r | |
439 | stdout = p.stdout.read()\r | |
440 | if hasattr(file, 'newlines'):\r | |
441 | # Interpreter with universal newline support\r | |
442 | self.assertEqual(stdout,\r | |
443 | "line1\nline2\nline3\nline4\nline5\nline6")\r | |
444 | else:\r | |
445 | # Interpreter without universal newline support\r | |
446 | self.assertEqual(stdout,\r | |
447 | "line1\nline2\rline3\r\nline4\r\nline5\nline6")\r | |
448 | \r | |
449 | def test_universal_newlines_communicate(self):\r | |
450 | # universal newlines through communicate()\r | |
451 | p = subprocess.Popen([sys.executable, "-c",\r | |
452 | 'import sys,os;' + SETBINARY +\r | |
453 | 'sys.stdout.write("line1\\n");'\r | |
454 | 'sys.stdout.flush();'\r | |
455 | 'sys.stdout.write("line2\\r");'\r | |
456 | 'sys.stdout.flush();'\r | |
457 | 'sys.stdout.write("line3\\r\\n");'\r | |
458 | 'sys.stdout.flush();'\r | |
459 | 'sys.stdout.write("line4\\r");'\r | |
460 | 'sys.stdout.flush();'\r | |
461 | 'sys.stdout.write("\\nline5");'\r | |
462 | 'sys.stdout.flush();'\r | |
463 | 'sys.stdout.write("\\nline6");'],\r | |
464 | stdout=subprocess.PIPE, stderr=subprocess.PIPE,\r | |
465 | universal_newlines=1)\r | |
466 | self.addCleanup(p.stdout.close)\r | |
467 | self.addCleanup(p.stderr.close)\r | |
468 | (stdout, stderr) = p.communicate()\r | |
469 | if hasattr(file, 'newlines'):\r | |
470 | # Interpreter with universal newline support\r | |
471 | self.assertEqual(stdout,\r | |
472 | "line1\nline2\nline3\nline4\nline5\nline6")\r | |
473 | else:\r | |
474 | # Interpreter without universal newline support\r | |
475 | self.assertEqual(stdout,\r | |
476 | "line1\nline2\rline3\r\nline4\r\nline5\nline6")\r | |
477 | \r | |
478 | def test_no_leaking(self):\r | |
479 | # Make sure we leak no resources\r | |
480 | if not mswindows:\r | |
481 | max_handles = 1026 # too much for most UNIX systems\r | |
482 | else:\r | |
483 | max_handles = 2050 # too much for (at least some) Windows setups\r | |
484 | handles = []\r | |
485 | try:\r | |
486 | for i in range(max_handles):\r | |
487 | try:\r | |
488 | handles.append(os.open(test_support.TESTFN,\r | |
489 | os.O_WRONLY | os.O_CREAT))\r | |
490 | except OSError as e:\r | |
491 | if e.errno != errno.EMFILE:\r | |
492 | raise\r | |
493 | break\r | |
494 | else:\r | |
495 | self.skipTest("failed to reach the file descriptor limit "\r | |
496 | "(tried %d)" % max_handles)\r | |
497 | # Close a couple of them (should be enough for a subprocess)\r | |
498 | for i in range(10):\r | |
499 | os.close(handles.pop())\r | |
500 | # Loop creating some subprocesses. If one of them leaks some fds,\r | |
501 | # the next loop iteration will fail by reaching the max fd limit.\r | |
502 | for i in range(15):\r | |
503 | p = subprocess.Popen([sys.executable, "-c",\r | |
504 | "import sys;"\r | |
505 | "sys.stdout.write(sys.stdin.read())"],\r | |
506 | stdin=subprocess.PIPE,\r | |
507 | stdout=subprocess.PIPE,\r | |
508 | stderr=subprocess.PIPE)\r | |
509 | data = p.communicate(b"lime")[0]\r | |
510 | self.assertEqual(data, b"lime")\r | |
511 | finally:\r | |
512 | for h in handles:\r | |
513 | os.close(h)\r | |
514 | \r | |
515 | def test_list2cmdline(self):\r | |
516 | self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),\r | |
517 | '"a b c" d e')\r | |
518 | self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),\r | |
519 | 'ab\\"c \\ d')\r | |
520 | self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),\r | |
521 | 'ab\\"c " \\\\" d')\r | |
522 | self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),\r | |
523 | 'a\\\\\\b "de fg" h')\r | |
524 | self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),\r | |
525 | 'a\\\\\\"b c d')\r | |
526 | self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),\r | |
527 | '"a\\\\b c" d e')\r | |
528 | self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),\r | |
529 | '"a\\\\b\\ c" d e')\r | |
530 | self.assertEqual(subprocess.list2cmdline(['ab', '']),\r | |
531 | 'ab ""')\r | |
532 | \r | |
533 | \r | |
534 | def test_poll(self):\r | |
535 | p = subprocess.Popen([sys.executable,\r | |
536 | "-c", "import time; time.sleep(1)"])\r | |
537 | count = 0\r | |
538 | while p.poll() is None:\r | |
539 | time.sleep(0.1)\r | |
540 | count += 1\r | |
541 | # We expect that the poll loop probably went around about 10 times,\r | |
542 | # but, based on system scheduling we can't control, it's possible\r | |
543 | # poll() never returned None. It "should be" very rare that it\r | |
544 | # didn't go around at least twice.\r | |
545 | self.assertGreaterEqual(count, 2)\r | |
546 | # Subsequent invocations should just return the returncode\r | |
547 | self.assertEqual(p.poll(), 0)\r | |
548 | \r | |
549 | \r | |
550 | def test_wait(self):\r | |
551 | p = subprocess.Popen([sys.executable,\r | |
552 | "-c", "import time; time.sleep(2)"])\r | |
553 | self.assertEqual(p.wait(), 0)\r | |
554 | # Subsequent invocations should just return the returncode\r | |
555 | self.assertEqual(p.wait(), 0)\r | |
556 | \r | |
557 | \r | |
558 | def test_invalid_bufsize(self):\r | |
559 | # an invalid type of the bufsize argument should raise\r | |
560 | # TypeError.\r | |
561 | with self.assertRaises(TypeError):\r | |
562 | subprocess.Popen([sys.executable, "-c", "pass"], "orange")\r | |
563 | \r | |
564 | def test_leaking_fds_on_error(self):\r | |
565 | # see bug #5179: Popen leaks file descriptors to PIPEs if\r | |
566 | # the child fails to execute; this will eventually exhaust\r | |
567 | # the maximum number of open fds. 1024 seems a very common\r | |
568 | # value for that limit, but Windows has 2048, so we loop\r | |
569 | # 1024 times (each call leaked two fds).\r | |
570 | for i in range(1024):\r | |
571 | # Windows raises IOError. Others raise OSError.\r | |
572 | with self.assertRaises(EnvironmentError) as c:\r | |
573 | subprocess.Popen(['nonexisting_i_hope'],\r | |
574 | stdout=subprocess.PIPE,\r | |
575 | stderr=subprocess.PIPE)\r | |
576 | # ignore errors that indicate the command was not found\r | |
577 | if c.exception.errno not in (errno.ENOENT, errno.EACCES):\r | |
578 | raise c.exception\r | |
579 | \r | |
580 | def test_handles_closed_on_exception(self):\r | |
581 | # If CreateProcess exits with an error, ensure the\r | |
582 | # duplicate output handles are released\r | |
583 | ifhandle, ifname = mkstemp()\r | |
584 | ofhandle, ofname = mkstemp()\r | |
585 | efhandle, efname = mkstemp()\r | |
586 | try:\r | |
587 | subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,\r | |
588 | stderr=efhandle)\r | |
589 | except OSError:\r | |
590 | os.close(ifhandle)\r | |
591 | os.remove(ifname)\r | |
592 | os.close(ofhandle)\r | |
593 | os.remove(ofname)\r | |
594 | os.close(efhandle)\r | |
595 | os.remove(efname)\r | |
596 | self.assertFalse(os.path.exists(ifname))\r | |
597 | self.assertFalse(os.path.exists(ofname))\r | |
598 | self.assertFalse(os.path.exists(efname))\r | |
599 | \r | |
600 | def test_communicate_epipe(self):\r | |
601 | # Issue 10963: communicate() should hide EPIPE\r | |
602 | p = subprocess.Popen([sys.executable, "-c", 'pass'],\r | |
603 | stdin=subprocess.PIPE,\r | |
604 | stdout=subprocess.PIPE,\r | |
605 | stderr=subprocess.PIPE)\r | |
606 | self.addCleanup(p.stdout.close)\r | |
607 | self.addCleanup(p.stderr.close)\r | |
608 | self.addCleanup(p.stdin.close)\r | |
609 | p.communicate("x" * 2**20)\r | |
610 | \r | |
611 | def test_communicate_epipe_only_stdin(self):\r | |
612 | # Issue 10963: communicate() should hide EPIPE\r | |
613 | p = subprocess.Popen([sys.executable, "-c", 'pass'],\r | |
614 | stdin=subprocess.PIPE)\r | |
615 | self.addCleanup(p.stdin.close)\r | |
616 | time.sleep(2)\r | |
617 | p.communicate("x" * 2**20)\r | |
618 | \r | |
619 | # context manager\r | |
620 | class _SuppressCoreFiles(object):\r | |
621 | """Try to prevent core files from being created."""\r | |
622 | old_limit = None\r | |
623 | \r | |
624 | def __enter__(self):\r | |
625 | """Try to save previous ulimit, then set it to (0, 0)."""\r | |
626 | try:\r | |
627 | import resource\r | |
628 | self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)\r | |
629 | resource.setrlimit(resource.RLIMIT_CORE, (0, 0))\r | |
630 | except (ImportError, ValueError, resource.error):\r | |
631 | pass\r | |
632 | \r | |
633 | if sys.platform == 'darwin':\r | |
634 | # Check if the 'Crash Reporter' on OSX was configured\r | |
635 | # in 'Developer' mode and warn that it will get triggered\r | |
636 | # when it is.\r | |
637 | #\r | |
638 | # This assumes that this context manager is used in tests\r | |
639 | # that might trigger the next manager.\r | |
640 | value = subprocess.Popen(['/usr/bin/defaults', 'read',\r | |
641 | 'com.apple.CrashReporter', 'DialogType'],\r | |
642 | stdout=subprocess.PIPE).communicate()[0]\r | |
643 | if value.strip() == b'developer':\r | |
644 | print "this tests triggers the Crash Reporter, that is intentional"\r | |
645 | sys.stdout.flush()\r | |
646 | \r | |
647 | def __exit__(self, *args):\r | |
648 | """Return core file behavior to default."""\r | |
649 | if self.old_limit is None:\r | |
650 | return\r | |
651 | try:\r | |
652 | import resource\r | |
653 | resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)\r | |
654 | except (ImportError, ValueError, resource.error):\r | |
655 | pass\r | |
656 | \r | |
657 | \r | |
658 | @unittest.skipIf(mswindows, "POSIX specific tests")\r | |
659 | class POSIXProcessTestCase(BaseTestCase):\r | |
660 | \r | |
661 | def test_exceptions(self):\r | |
662 | # caught & re-raised exceptions\r | |
663 | with self.assertRaises(OSError) as c:\r | |
664 | p = subprocess.Popen([sys.executable, "-c", ""],\r | |
665 | cwd="/this/path/does/not/exist")\r | |
666 | # The attribute child_traceback should contain "os.chdir" somewhere.\r | |
667 | self.assertIn("os.chdir", c.exception.child_traceback)\r | |
668 | \r | |
669 | def test_run_abort(self):\r | |
670 | # returncode handles signal termination\r | |
671 | with _SuppressCoreFiles():\r | |
672 | p = subprocess.Popen([sys.executable, "-c",\r | |
673 | "import os; os.abort()"])\r | |
674 | p.wait()\r | |
675 | self.assertEqual(-p.returncode, signal.SIGABRT)\r | |
676 | \r | |
677 | def test_preexec(self):\r | |
678 | # preexec function\r | |
679 | p = subprocess.Popen([sys.executable, "-c",\r | |
680 | "import sys, os;"\r | |
681 | "sys.stdout.write(os.getenv('FRUIT'))"],\r | |
682 | stdout=subprocess.PIPE,\r | |
683 | preexec_fn=lambda: os.putenv("FRUIT", "apple"))\r | |
684 | self.addCleanup(p.stdout.close)\r | |
685 | self.assertEqual(p.stdout.read(), "apple")\r | |
686 | \r | |
687 | def test_args_string(self):\r | |
688 | # args is a string\r | |
689 | f, fname = mkstemp()\r | |
690 | os.write(f, "#!/bin/sh\n")\r | |
691 | os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %\r | |
692 | sys.executable)\r | |
693 | os.close(f)\r | |
694 | os.chmod(fname, 0o700)\r | |
695 | p = subprocess.Popen(fname)\r | |
696 | p.wait()\r | |
697 | os.remove(fname)\r | |
698 | self.assertEqual(p.returncode, 47)\r | |
699 | \r | |
700 | def test_invalid_args(self):\r | |
701 | # invalid arguments should raise ValueError\r | |
702 | self.assertRaises(ValueError, subprocess.call,\r | |
703 | [sys.executable, "-c",\r | |
704 | "import sys; sys.exit(47)"],\r | |
705 | startupinfo=47)\r | |
706 | self.assertRaises(ValueError, subprocess.call,\r | |
707 | [sys.executable, "-c",\r | |
708 | "import sys; sys.exit(47)"],\r | |
709 | creationflags=47)\r | |
710 | \r | |
711 | def test_shell_sequence(self):\r | |
712 | # Run command through the shell (sequence)\r | |
713 | newenv = os.environ.copy()\r | |
714 | newenv["FRUIT"] = "apple"\r | |
715 | p = subprocess.Popen(["echo $FRUIT"], shell=1,\r | |
716 | stdout=subprocess.PIPE,\r | |
717 | env=newenv)\r | |
718 | self.addCleanup(p.stdout.close)\r | |
719 | self.assertEqual(p.stdout.read().strip(), "apple")\r | |
720 | \r | |
721 | def test_shell_string(self):\r | |
722 | # Run command through the shell (string)\r | |
723 | newenv = os.environ.copy()\r | |
724 | newenv["FRUIT"] = "apple"\r | |
725 | p = subprocess.Popen("echo $FRUIT", shell=1,\r | |
726 | stdout=subprocess.PIPE,\r | |
727 | env=newenv)\r | |
728 | self.addCleanup(p.stdout.close)\r | |
729 | self.assertEqual(p.stdout.read().strip(), "apple")\r | |
730 | \r | |
731 | def test_call_string(self):\r | |
732 | # call() function with string argument on UNIX\r | |
733 | f, fname = mkstemp()\r | |
734 | os.write(f, "#!/bin/sh\n")\r | |
735 | os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %\r | |
736 | sys.executable)\r | |
737 | os.close(f)\r | |
738 | os.chmod(fname, 0700)\r | |
739 | rc = subprocess.call(fname)\r | |
740 | os.remove(fname)\r | |
741 | self.assertEqual(rc, 47)\r | |
742 | \r | |
743 | def test_specific_shell(self):\r | |
744 | # Issue #9265: Incorrect name passed as arg[0].\r | |
745 | shells = []\r | |
746 | for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:\r | |
747 | for name in ['bash', 'ksh']:\r | |
748 | sh = os.path.join(prefix, name)\r | |
749 | if os.path.isfile(sh):\r | |
750 | shells.append(sh)\r | |
751 | if not shells: # Will probably work for any shell but csh.\r | |
752 | self.skipTest("bash or ksh required for this test")\r | |
753 | sh = '/bin/sh'\r | |
754 | if os.path.isfile(sh) and not os.path.islink(sh):\r | |
755 | # Test will fail if /bin/sh is a symlink to csh.\r | |
756 | shells.append(sh)\r | |
757 | for sh in shells:\r | |
758 | p = subprocess.Popen("echo $0", executable=sh, shell=True,\r | |
759 | stdout=subprocess.PIPE)\r | |
760 | self.addCleanup(p.stdout.close)\r | |
761 | self.assertEqual(p.stdout.read().strip(), sh)\r | |
762 | \r | |
763 | def _kill_process(self, method, *args):\r | |
764 | # Do not inherit file handles from the parent.\r | |
765 | # It should fix failures on some platforms.\r | |
766 | p = subprocess.Popen([sys.executable, "-c", """if 1:\r | |
767 | import sys, time\r | |
768 | sys.stdout.write('x\\n')\r | |
769 | sys.stdout.flush()\r | |
770 | time.sleep(30)\r | |
771 | """],\r | |
772 | close_fds=True,\r | |
773 | stdin=subprocess.PIPE,\r | |
774 | stdout=subprocess.PIPE,\r | |
775 | stderr=subprocess.PIPE)\r | |
776 | # Wait for the interpreter to be completely initialized before\r | |
777 | # sending any signal.\r | |
778 | p.stdout.read(1)\r | |
779 | getattr(p, method)(*args)\r | |
780 | return p\r | |
781 | \r | |
782 | def test_send_signal(self):\r | |
783 | p = self._kill_process('send_signal', signal.SIGINT)\r | |
784 | _, stderr = p.communicate()\r | |
785 | self.assertIn('KeyboardInterrupt', stderr)\r | |
786 | self.assertNotEqual(p.wait(), 0)\r | |
787 | \r | |
788 | def test_kill(self):\r | |
789 | p = self._kill_process('kill')\r | |
790 | _, stderr = p.communicate()\r | |
791 | self.assertStderrEqual(stderr, '')\r | |
792 | self.assertEqual(p.wait(), -signal.SIGKILL)\r | |
793 | \r | |
794 | def test_terminate(self):\r | |
795 | p = self._kill_process('terminate')\r | |
796 | _, stderr = p.communicate()\r | |
797 | self.assertStderrEqual(stderr, '')\r | |
798 | self.assertEqual(p.wait(), -signal.SIGTERM)\r | |
799 | \r | |
800 | def check_close_std_fds(self, fds):\r | |
801 | # Issue #9905: test that subprocess pipes still work properly with\r | |
802 | # some standard fds closed\r | |
803 | stdin = 0\r | |
804 | newfds = []\r | |
805 | for a in fds:\r | |
806 | b = os.dup(a)\r | |
807 | newfds.append(b)\r | |
808 | if a == 0:\r | |
809 | stdin = b\r | |
810 | try:\r | |
811 | for fd in fds:\r | |
812 | os.close(fd)\r | |
813 | out, err = subprocess.Popen([sys.executable, "-c",\r | |
814 | 'import sys;'\r | |
815 | 'sys.stdout.write("apple");'\r | |
816 | 'sys.stdout.flush();'\r | |
817 | 'sys.stderr.write("orange")'],\r | |
818 | stdin=stdin,\r | |
819 | stdout=subprocess.PIPE,\r | |
820 | stderr=subprocess.PIPE).communicate()\r | |
821 | err = test_support.strip_python_stderr(err)\r | |
822 | self.assertEqual((out, err), (b'apple', b'orange'))\r | |
823 | finally:\r | |
824 | for b, a in zip(newfds, fds):\r | |
825 | os.dup2(b, a)\r | |
826 | for b in newfds:\r | |
827 | os.close(b)\r | |
828 | \r | |
829 | def test_close_fd_0(self):\r | |
830 | self.check_close_std_fds([0])\r | |
831 | \r | |
832 | def test_close_fd_1(self):\r | |
833 | self.check_close_std_fds([1])\r | |
834 | \r | |
835 | def test_close_fd_2(self):\r | |
836 | self.check_close_std_fds([2])\r | |
837 | \r | |
838 | def test_close_fds_0_1(self):\r | |
839 | self.check_close_std_fds([0, 1])\r | |
840 | \r | |
841 | def test_close_fds_0_2(self):\r | |
842 | self.check_close_std_fds([0, 2])\r | |
843 | \r | |
844 | def test_close_fds_1_2(self):\r | |
845 | self.check_close_std_fds([1, 2])\r | |
846 | \r | |
847 | def test_close_fds_0_1_2(self):\r | |
848 | # Issue #10806: test that subprocess pipes still work properly with\r | |
849 | # all standard fds closed.\r | |
850 | self.check_close_std_fds([0, 1, 2])\r | |
851 | \r | |
852 | def test_wait_when_sigchild_ignored(self):\r | |
853 | # NOTE: sigchild_ignore.py may not be an effective test on all OSes.\r | |
854 | sigchild_ignore = test_support.findfile("sigchild_ignore.py",\r | |
855 | subdir="subprocessdata")\r | |
856 | p = subprocess.Popen([sys.executable, sigchild_ignore],\r | |
857 | stdout=subprocess.PIPE, stderr=subprocess.PIPE)\r | |
858 | stdout, stderr = p.communicate()\r | |
859 | self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"\r | |
860 | " non-zero with this error:\n%s" % stderr)\r | |
861 | \r | |
862 | \r | |
863 | @unittest.skipUnless(mswindows, "Windows specific tests")\r | |
864 | class Win32ProcessTestCase(BaseTestCase):\r | |
865 | \r | |
866 | def test_startupinfo(self):\r | |
867 | # startupinfo argument\r | |
868 | # We uses hardcoded constants, because we do not want to\r | |
869 | # depend on win32all.\r | |
870 | STARTF_USESHOWWINDOW = 1\r | |
871 | SW_MAXIMIZE = 3\r | |
872 | startupinfo = subprocess.STARTUPINFO()\r | |
873 | startupinfo.dwFlags = STARTF_USESHOWWINDOW\r | |
874 | startupinfo.wShowWindow = SW_MAXIMIZE\r | |
875 | # Since Python is a console process, it won't be affected\r | |
876 | # by wShowWindow, but the argument should be silently\r | |
877 | # ignored\r | |
878 | subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],\r | |
879 | startupinfo=startupinfo)\r | |
880 | \r | |
881 | def test_creationflags(self):\r | |
882 | # creationflags argument\r | |
883 | CREATE_NEW_CONSOLE = 16\r | |
884 | sys.stderr.write(" a DOS box should flash briefly ...\n")\r | |
885 | subprocess.call(sys.executable +\r | |
886 | ' -c "import time; time.sleep(0.25)"',\r | |
887 | creationflags=CREATE_NEW_CONSOLE)\r | |
888 | \r | |
889 | def test_invalid_args(self):\r | |
890 | # invalid arguments should raise ValueError\r | |
891 | self.assertRaises(ValueError, subprocess.call,\r | |
892 | [sys.executable, "-c",\r | |
893 | "import sys; sys.exit(47)"],\r | |
894 | preexec_fn=lambda: 1)\r | |
895 | self.assertRaises(ValueError, subprocess.call,\r | |
896 | [sys.executable, "-c",\r | |
897 | "import sys; sys.exit(47)"],\r | |
898 | stdout=subprocess.PIPE,\r | |
899 | close_fds=True)\r | |
900 | \r | |
901 | def test_close_fds(self):\r | |
902 | # close file descriptors\r | |
903 | rc = subprocess.call([sys.executable, "-c",\r | |
904 | "import sys; sys.exit(47)"],\r | |
905 | close_fds=True)\r | |
906 | self.assertEqual(rc, 47)\r | |
907 | \r | |
908 | def test_shell_sequence(self):\r | |
909 | # Run command through the shell (sequence)\r | |
910 | newenv = os.environ.copy()\r | |
911 | newenv["FRUIT"] = "physalis"\r | |
912 | p = subprocess.Popen(["set"], shell=1,\r | |
913 | stdout=subprocess.PIPE,\r | |
914 | env=newenv)\r | |
915 | self.addCleanup(p.stdout.close)\r | |
916 | self.assertIn("physalis", p.stdout.read())\r | |
917 | \r | |
918 | def test_shell_string(self):\r | |
919 | # Run command through the shell (string)\r | |
920 | newenv = os.environ.copy()\r | |
921 | newenv["FRUIT"] = "physalis"\r | |
922 | p = subprocess.Popen("set", shell=1,\r | |
923 | stdout=subprocess.PIPE,\r | |
924 | env=newenv)\r | |
925 | self.addCleanup(p.stdout.close)\r | |
926 | self.assertIn("physalis", p.stdout.read())\r | |
927 | \r | |
928 | def test_call_string(self):\r | |
929 | # call() function with string argument on Windows\r | |
930 | rc = subprocess.call(sys.executable +\r | |
931 | ' -c "import sys; sys.exit(47)"')\r | |
932 | self.assertEqual(rc, 47)\r | |
933 | \r | |
934 | def _kill_process(self, method, *args):\r | |
935 | # Some win32 buildbot raises EOFError if stdin is inherited\r | |
936 | p = subprocess.Popen([sys.executable, "-c", """if 1:\r | |
937 | import sys, time\r | |
938 | sys.stdout.write('x\\n')\r | |
939 | sys.stdout.flush()\r | |
940 | time.sleep(30)\r | |
941 | """],\r | |
942 | stdin=subprocess.PIPE,\r | |
943 | stdout=subprocess.PIPE,\r | |
944 | stderr=subprocess.PIPE)\r | |
945 | self.addCleanup(p.stdout.close)\r | |
946 | self.addCleanup(p.stderr.close)\r | |
947 | self.addCleanup(p.stdin.close)\r | |
948 | # Wait for the interpreter to be completely initialized before\r | |
949 | # sending any signal.\r | |
950 | p.stdout.read(1)\r | |
951 | getattr(p, method)(*args)\r | |
952 | _, stderr = p.communicate()\r | |
953 | self.assertStderrEqual(stderr, '')\r | |
954 | returncode = p.wait()\r | |
955 | self.assertNotEqual(returncode, 0)\r | |
956 | \r | |
957 | def test_send_signal(self):\r | |
958 | self._kill_process('send_signal', signal.SIGTERM)\r | |
959 | \r | |
960 | def test_kill(self):\r | |
961 | self._kill_process('kill')\r | |
962 | \r | |
963 | def test_terminate(self):\r | |
964 | self._kill_process('terminate')\r | |
965 | \r | |
966 | \r | |
967 | @unittest.skipUnless(getattr(subprocess, '_has_poll', False),\r | |
968 | "poll system call not supported")\r | |
969 | class ProcessTestCaseNoPoll(ProcessTestCase):\r | |
970 | def setUp(self):\r | |
971 | subprocess._has_poll = False\r | |
972 | ProcessTestCase.setUp(self)\r | |
973 | \r | |
974 | def tearDown(self):\r | |
975 | subprocess._has_poll = True\r | |
976 | ProcessTestCase.tearDown(self)\r | |
977 | \r | |
978 | \r | |
979 | class HelperFunctionTests(unittest.TestCase):\r | |
980 | @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")\r | |
981 | def test_eintr_retry_call(self):\r | |
982 | record_calls = []\r | |
983 | def fake_os_func(*args):\r | |
984 | record_calls.append(args)\r | |
985 | if len(record_calls) == 2:\r | |
986 | raise OSError(errno.EINTR, "fake interrupted system call")\r | |
987 | return tuple(reversed(args))\r | |
988 | \r | |
989 | self.assertEqual((999, 256),\r | |
990 | subprocess._eintr_retry_call(fake_os_func, 256, 999))\r | |
991 | self.assertEqual([(256, 999)], record_calls)\r | |
992 | # This time there will be an EINTR so it will loop once.\r | |
993 | self.assertEqual((666,),\r | |
994 | subprocess._eintr_retry_call(fake_os_func, 666))\r | |
995 | self.assertEqual([(256, 999), (666,), (666,)], record_calls)\r | |
996 | \r | |
997 | \r | |
998 | @unittest.skipUnless(mswindows, "mswindows only")\r | |
999 | class CommandsWithSpaces (BaseTestCase):\r | |
1000 | \r | |
1001 | def setUp(self):\r | |
1002 | super(CommandsWithSpaces, self).setUp()\r | |
1003 | f, fname = mkstemp(".py", "te st")\r | |
1004 | self.fname = fname.lower ()\r | |
1005 | os.write(f, b"import sys;"\r | |
1006 | b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"\r | |
1007 | )\r | |
1008 | os.close(f)\r | |
1009 | \r | |
1010 | def tearDown(self):\r | |
1011 | os.remove(self.fname)\r | |
1012 | super(CommandsWithSpaces, self).tearDown()\r | |
1013 | \r | |
1014 | def with_spaces(self, *args, **kwargs):\r | |
1015 | kwargs['stdout'] = subprocess.PIPE\r | |
1016 | p = subprocess.Popen(*args, **kwargs)\r | |
1017 | self.addCleanup(p.stdout.close)\r | |
1018 | self.assertEqual(\r | |
1019 | p.stdout.read ().decode("mbcs"),\r | |
1020 | "2 [%r, 'ab cd']" % self.fname\r | |
1021 | )\r | |
1022 | \r | |
1023 | def test_shell_string_with_spaces(self):\r | |
1024 | # call() function with string argument with spaces on Windows\r | |
1025 | self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,\r | |
1026 | "ab cd"), shell=1)\r | |
1027 | \r | |
1028 | def test_shell_sequence_with_spaces(self):\r | |
1029 | # call() function with sequence argument with spaces on Windows\r | |
1030 | self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)\r | |
1031 | \r | |
1032 | def test_noshell_string_with_spaces(self):\r | |
1033 | # call() function with string argument with spaces on Windows\r | |
1034 | self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,\r | |
1035 | "ab cd"))\r | |
1036 | \r | |
1037 | def test_noshell_sequence_with_spaces(self):\r | |
1038 | # call() function with sequence argument with spaces on Windows\r | |
1039 | self.with_spaces([sys.executable, self.fname, "ab cd"])\r | |
1040 | \r | |
1041 | def test_main():\r | |
1042 | unit_tests = (ProcessTestCase,\r | |
1043 | POSIXProcessTestCase,\r | |
1044 | Win32ProcessTestCase,\r | |
1045 | ProcessTestCaseNoPoll,\r | |
1046 | HelperFunctionTests,\r | |
1047 | CommandsWithSpaces)\r | |
1048 | \r | |
1049 | test_support.run_unittest(*unit_tests)\r | |
1050 | test_support.reap_children()\r | |
1051 | \r | |
1052 | if __name__ == "__main__":\r | |
1053 | test_main()\r |