+++ /dev/null
-import unittest\r
-from test import test_support\r
-import subprocess\r
-import sys\r
-import signal\r
-import os\r
-import errno\r
-import tempfile\r
-import time\r
-import re\r
-import sysconfig\r
-\r
-mswindows = (sys.platform == "win32")\r
-\r
-#\r
-# Depends on the following external programs: Python\r
-#\r
-\r
-if mswindows:\r
- SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '\r
- 'os.O_BINARY);')\r
-else:\r
- SETBINARY = ''\r
-\r
-\r
-try:\r
- mkstemp = tempfile.mkstemp\r
-except AttributeError:\r
- # tempfile.mkstemp is not available\r
- def mkstemp():\r
- """Replacement for mkstemp, calling mktemp."""\r
- fname = tempfile.mktemp()\r
- return os.open(fname, os.O_RDWR|os.O_CREAT), fname\r
-\r
-\r
-class BaseTestCase(unittest.TestCase):\r
- def setUp(self):\r
- # Try to minimize the number of children we have so this test\r
- # doesn't crash on some buildbots (Alphas in particular).\r
- test_support.reap_children()\r
-\r
- def tearDown(self):\r
- for inst in subprocess._active:\r
- inst.wait()\r
- subprocess._cleanup()\r
- self.assertFalse(subprocess._active, "subprocess._active not empty")\r
-\r
- def assertStderrEqual(self, stderr, expected, msg=None):\r
- # In a debug build, stuff like "[6580 refs]" is printed to stderr at\r
- # shutdown time. That frustrates tests trying to check stderr produced\r
- # from a spawned Python process.\r
- actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)\r
- self.assertEqual(actual, expected, msg)\r
-\r
-\r
-class ProcessTestCase(BaseTestCase):\r
-\r
- def test_call_seq(self):\r
- # call() function with sequence argument\r
- rc = subprocess.call([sys.executable, "-c",\r
- "import sys; sys.exit(47)"])\r
- self.assertEqual(rc, 47)\r
-\r
- def test_check_call_zero(self):\r
- # check_call() function with zero return code\r
- rc = subprocess.check_call([sys.executable, "-c",\r
- "import sys; sys.exit(0)"])\r
- self.assertEqual(rc, 0)\r
-\r
- def test_check_call_nonzero(self):\r
- # check_call() function with non-zero return code\r
- with self.assertRaises(subprocess.CalledProcessError) as c:\r
- subprocess.check_call([sys.executable, "-c",\r
- "import sys; sys.exit(47)"])\r
- self.assertEqual(c.exception.returncode, 47)\r
-\r
- def test_check_output(self):\r
- # check_output() function with zero return code\r
- output = subprocess.check_output(\r
- [sys.executable, "-c", "print 'BDFL'"])\r
- self.assertIn('BDFL', output)\r
-\r
- def test_check_output_nonzero(self):\r
- # check_call() function with non-zero return code\r
- with self.assertRaises(subprocess.CalledProcessError) as c:\r
- subprocess.check_output(\r
- [sys.executable, "-c", "import sys; sys.exit(5)"])\r
- self.assertEqual(c.exception.returncode, 5)\r
-\r
- def test_check_output_stderr(self):\r
- # check_output() function stderr redirected to stdout\r
- output = subprocess.check_output(\r
- [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],\r
- stderr=subprocess.STDOUT)\r
- self.assertIn('BDFL', output)\r
-\r
- def test_check_output_stdout_arg(self):\r
- # check_output() function stderr redirected to stdout\r
- with self.assertRaises(ValueError) as c:\r
- output = subprocess.check_output(\r
- [sys.executable, "-c", "print 'will not be run'"],\r
- stdout=sys.stdout)\r
- self.fail("Expected ValueError when stdout arg supplied.")\r
- self.assertIn('stdout', c.exception.args[0])\r
-\r
- def test_call_kwargs(self):\r
- # call() function with keyword args\r
- newenv = os.environ.copy()\r
- newenv["FRUIT"] = "banana"\r
- rc = subprocess.call([sys.executable, "-c",\r
- 'import sys, os;'\r
- 'sys.exit(os.getenv("FRUIT")=="banana")'],\r
- env=newenv)\r
- self.assertEqual(rc, 1)\r
-\r
- def test_stdin_none(self):\r
- # .stdin is None when not redirected\r
- p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],\r
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)\r
- self.addCleanup(p.stdout.close)\r
- self.addCleanup(p.stderr.close)\r
- p.wait()\r
- self.assertEqual(p.stdin, None)\r
-\r
- def test_stdout_none(self):\r
- # .stdout is None when not redirected\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'print " this bit of output is from a '\r
- 'test of stdout in a different '\r
- 'process ..."'],\r
- stdin=subprocess.PIPE, stderr=subprocess.PIPE)\r
- self.addCleanup(p.stdin.close)\r
- self.addCleanup(p.stderr.close)\r
- p.wait()\r
- self.assertEqual(p.stdout, None)\r
-\r
- def test_stderr_none(self):\r
- # .stderr is None when not redirected\r
- p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],\r
- stdin=subprocess.PIPE, stdout=subprocess.PIPE)\r
- self.addCleanup(p.stdout.close)\r
- self.addCleanup(p.stdin.close)\r
- p.wait()\r
- self.assertEqual(p.stderr, None)\r
-\r
- def test_executable_with_cwd(self):\r
- python_dir = os.path.dirname(os.path.realpath(sys.executable))\r
- p = subprocess.Popen(["somethingyoudonthave", "-c",\r
- "import sys; sys.exit(47)"],\r
- executable=sys.executable, cwd=python_dir)\r
- p.wait()\r
- self.assertEqual(p.returncode, 47)\r
-\r
- @unittest.skipIf(sysconfig.is_python_build(),\r
- "need an installed Python. See #7774")\r
- def test_executable_without_cwd(self):\r
- # For a normal installation, it should work without 'cwd'\r
- # argument. For test runs in the build directory, see #7774.\r
- p = subprocess.Popen(["somethingyoudonthave", "-c",\r
- "import sys; sys.exit(47)"],\r
- executable=sys.executable)\r
- p.wait()\r
- self.assertEqual(p.returncode, 47)\r
-\r
- def test_stdin_pipe(self):\r
- # stdin redirection\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys; sys.exit(sys.stdin.read() == "pear")'],\r
- stdin=subprocess.PIPE)\r
- p.stdin.write("pear")\r
- p.stdin.close()\r
- p.wait()\r
- self.assertEqual(p.returncode, 1)\r
-\r
- def test_stdin_filedes(self):\r
- # stdin is set to open file descriptor\r
- tf = tempfile.TemporaryFile()\r
- d = tf.fileno()\r
- os.write(d, "pear")\r
- os.lseek(d, 0, 0)\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys; sys.exit(sys.stdin.read() == "pear")'],\r
- stdin=d)\r
- p.wait()\r
- self.assertEqual(p.returncode, 1)\r
-\r
- def test_stdin_fileobj(self):\r
- # stdin is set to open file object\r
- tf = tempfile.TemporaryFile()\r
- tf.write("pear")\r
- tf.seek(0)\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys; sys.exit(sys.stdin.read() == "pear")'],\r
- stdin=tf)\r
- p.wait()\r
- self.assertEqual(p.returncode, 1)\r
-\r
- def test_stdout_pipe(self):\r
- # stdout redirection\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys; sys.stdout.write("orange")'],\r
- stdout=subprocess.PIPE)\r
- self.addCleanup(p.stdout.close)\r
- self.assertEqual(p.stdout.read(), "orange")\r
-\r
- def test_stdout_filedes(self):\r
- # stdout is set to open file descriptor\r
- tf = tempfile.TemporaryFile()\r
- d = tf.fileno()\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys; sys.stdout.write("orange")'],\r
- stdout=d)\r
- p.wait()\r
- os.lseek(d, 0, 0)\r
- self.assertEqual(os.read(d, 1024), "orange")\r
-\r
- def test_stdout_fileobj(self):\r
- # stdout is set to open file object\r
- tf = tempfile.TemporaryFile()\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys; sys.stdout.write("orange")'],\r
- stdout=tf)\r
- p.wait()\r
- tf.seek(0)\r
- self.assertEqual(tf.read(), "orange")\r
-\r
- def test_stderr_pipe(self):\r
- # stderr redirection\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys; sys.stderr.write("strawberry")'],\r
- stderr=subprocess.PIPE)\r
- self.addCleanup(p.stderr.close)\r
- self.assertStderrEqual(p.stderr.read(), "strawberry")\r
-\r
- def test_stderr_filedes(self):\r
- # stderr is set to open file descriptor\r
- tf = tempfile.TemporaryFile()\r
- d = tf.fileno()\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys; sys.stderr.write("strawberry")'],\r
- stderr=d)\r
- p.wait()\r
- os.lseek(d, 0, 0)\r
- self.assertStderrEqual(os.read(d, 1024), "strawberry")\r
-\r
- def test_stderr_fileobj(self):\r
- # stderr is set to open file object\r
- tf = tempfile.TemporaryFile()\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys; sys.stderr.write("strawberry")'],\r
- stderr=tf)\r
- p.wait()\r
- tf.seek(0)\r
- self.assertStderrEqual(tf.read(), "strawberry")\r
-\r
- def test_stdout_stderr_pipe(self):\r
- # capture stdout and stderr to the same pipe\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys;'\r
- 'sys.stdout.write("apple");'\r
- 'sys.stdout.flush();'\r
- 'sys.stderr.write("orange")'],\r
- stdout=subprocess.PIPE,\r
- stderr=subprocess.STDOUT)\r
- self.addCleanup(p.stdout.close)\r
- self.assertStderrEqual(p.stdout.read(), "appleorange")\r
-\r
- def test_stdout_stderr_file(self):\r
- # capture stdout and stderr to the same open file\r
- tf = tempfile.TemporaryFile()\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys;'\r
- 'sys.stdout.write("apple");'\r
- 'sys.stdout.flush();'\r
- 'sys.stderr.write("orange")'],\r
- stdout=tf,\r
- stderr=tf)\r
- p.wait()\r
- tf.seek(0)\r
- self.assertStderrEqual(tf.read(), "appleorange")\r
-\r
- def test_stdout_filedes_of_stdout(self):\r
- # stdout is set to 1 (#1531862).\r
- cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))"\r
- rc = subprocess.call([sys.executable, "-c", cmd], stdout=1)\r
- self.assertEqual(rc, 2)\r
-\r
- def test_cwd(self):\r
- tmpdir = tempfile.gettempdir()\r
- # We cannot use os.path.realpath to canonicalize the path,\r
- # since it doesn't expand Tru64 {memb} strings. See bug 1063571.\r
- cwd = os.getcwd()\r
- os.chdir(tmpdir)\r
- tmpdir = os.getcwd()\r
- os.chdir(cwd)\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys,os;'\r
- 'sys.stdout.write(os.getcwd())'],\r
- stdout=subprocess.PIPE,\r
- cwd=tmpdir)\r
- self.addCleanup(p.stdout.close)\r
- normcase = os.path.normcase\r
- self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))\r
-\r
- def test_env(self):\r
- newenv = os.environ.copy()\r
- newenv["FRUIT"] = "orange"\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys,os;'\r
- 'sys.stdout.write(os.getenv("FRUIT"))'],\r
- stdout=subprocess.PIPE,\r
- env=newenv)\r
- self.addCleanup(p.stdout.close)\r
- self.assertEqual(p.stdout.read(), "orange")\r
-\r
- def test_communicate_stdin(self):\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys;'\r
- 'sys.exit(sys.stdin.read() == "pear")'],\r
- stdin=subprocess.PIPE)\r
- p.communicate("pear")\r
- self.assertEqual(p.returncode, 1)\r
-\r
- def test_communicate_stdout(self):\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys; sys.stdout.write("pineapple")'],\r
- stdout=subprocess.PIPE)\r
- (stdout, stderr) = p.communicate()\r
- self.assertEqual(stdout, "pineapple")\r
- self.assertEqual(stderr, None)\r
-\r
- def test_communicate_stderr(self):\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys; sys.stderr.write("pineapple")'],\r
- stderr=subprocess.PIPE)\r
- (stdout, stderr) = p.communicate()\r
- self.assertEqual(stdout, None)\r
- self.assertStderrEqual(stderr, "pineapple")\r
-\r
- def test_communicate(self):\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys,os;'\r
- 'sys.stderr.write("pineapple");'\r
- 'sys.stdout.write(sys.stdin.read())'],\r
- stdin=subprocess.PIPE,\r
- stdout=subprocess.PIPE,\r
- stderr=subprocess.PIPE)\r
- self.addCleanup(p.stdout.close)\r
- self.addCleanup(p.stderr.close)\r
- self.addCleanup(p.stdin.close)\r
- (stdout, stderr) = p.communicate("banana")\r
- self.assertEqual(stdout, "banana")\r
- self.assertStderrEqual(stderr, "pineapple")\r
-\r
- # This test is Linux specific for simplicity to at least have\r
- # some coverage. It is not a platform specific bug.\r
- @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),\r
- "Linux specific")\r
- # Test for the fd leak reported in http://bugs.python.org/issue2791.\r
- def test_communicate_pipe_fd_leak(self):\r
- fd_directory = '/proc/%d/fd' % os.getpid()\r
- num_fds_before_popen = len(os.listdir(fd_directory))\r
- p = subprocess.Popen([sys.executable, "-c", "print()"],\r
- stdout=subprocess.PIPE)\r
- p.communicate()\r
- num_fds_after_communicate = len(os.listdir(fd_directory))\r
- del p\r
- num_fds_after_destruction = len(os.listdir(fd_directory))\r
- self.assertEqual(num_fds_before_popen, num_fds_after_destruction)\r
- self.assertEqual(num_fds_before_popen, num_fds_after_communicate)\r
-\r
- def test_communicate_returns(self):\r
- # communicate() should return None if no redirection is active\r
- p = subprocess.Popen([sys.executable, "-c",\r
- "import sys; sys.exit(47)"])\r
- (stdout, stderr) = p.communicate()\r
- self.assertEqual(stdout, None)\r
- self.assertEqual(stderr, None)\r
-\r
- def test_communicate_pipe_buf(self):\r
- # communicate() with writes larger than pipe_buf\r
- # This test will probably deadlock rather than fail, if\r
- # communicate() does not work properly.\r
- x, y = os.pipe()\r
- if mswindows:\r
- pipe_buf = 512\r
- else:\r
- pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")\r
- os.close(x)\r
- os.close(y)\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys,os;'\r
- 'sys.stdout.write(sys.stdin.read(47));'\r
- 'sys.stderr.write("xyz"*%d);'\r
- 'sys.stdout.write(sys.stdin.read())' % pipe_buf],\r
- stdin=subprocess.PIPE,\r
- stdout=subprocess.PIPE,\r
- stderr=subprocess.PIPE)\r
- self.addCleanup(p.stdout.close)\r
- self.addCleanup(p.stderr.close)\r
- self.addCleanup(p.stdin.close)\r
- string_to_write = "abc"*pipe_buf\r
- (stdout, stderr) = p.communicate(string_to_write)\r
- self.assertEqual(stdout, string_to_write)\r
-\r
- def test_writes_before_communicate(self):\r
- # stdin.write before communicate()\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys,os;'\r
- 'sys.stdout.write(sys.stdin.read())'],\r
- stdin=subprocess.PIPE,\r
- stdout=subprocess.PIPE,\r
- stderr=subprocess.PIPE)\r
- self.addCleanup(p.stdout.close)\r
- self.addCleanup(p.stderr.close)\r
- self.addCleanup(p.stdin.close)\r
- p.stdin.write("banana")\r
- (stdout, stderr) = p.communicate("split")\r
- self.assertEqual(stdout, "bananasplit")\r
- self.assertStderrEqual(stderr, "")\r
-\r
- def test_universal_newlines(self):\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys,os;' + SETBINARY +\r
- 'sys.stdout.write("line1\\n");'\r
- 'sys.stdout.flush();'\r
- 'sys.stdout.write("line2\\r");'\r
- 'sys.stdout.flush();'\r
- 'sys.stdout.write("line3\\r\\n");'\r
- 'sys.stdout.flush();'\r
- 'sys.stdout.write("line4\\r");'\r
- 'sys.stdout.flush();'\r
- 'sys.stdout.write("\\nline5");'\r
- 'sys.stdout.flush();'\r
- 'sys.stdout.write("\\nline6");'],\r
- stdout=subprocess.PIPE,\r
- universal_newlines=1)\r
- self.addCleanup(p.stdout.close)\r
- stdout = p.stdout.read()\r
- if hasattr(file, 'newlines'):\r
- # Interpreter with universal newline support\r
- self.assertEqual(stdout,\r
- "line1\nline2\nline3\nline4\nline5\nline6")\r
- else:\r
- # Interpreter without universal newline support\r
- self.assertEqual(stdout,\r
- "line1\nline2\rline3\r\nline4\r\nline5\nline6")\r
-\r
- def test_universal_newlines_communicate(self):\r
- # universal newlines through communicate()\r
- p = subprocess.Popen([sys.executable, "-c",\r
- 'import sys,os;' + SETBINARY +\r
- 'sys.stdout.write("line1\\n");'\r
- 'sys.stdout.flush();'\r
- 'sys.stdout.write("line2\\r");'\r
- 'sys.stdout.flush();'\r
- 'sys.stdout.write("line3\\r\\n");'\r
- 'sys.stdout.flush();'\r
- 'sys.stdout.write("line4\\r");'\r
- 'sys.stdout.flush();'\r
- 'sys.stdout.write("\\nline5");'\r
- 'sys.stdout.flush();'\r
- 'sys.stdout.write("\\nline6");'],\r
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,\r
- universal_newlines=1)\r
- self.addCleanup(p.stdout.close)\r
- self.addCleanup(p.stderr.close)\r
- (stdout, stderr) = p.communicate()\r
- if hasattr(file, 'newlines'):\r
- # Interpreter with universal newline support\r
- self.assertEqual(stdout,\r
- "line1\nline2\nline3\nline4\nline5\nline6")\r
- else:\r
- # Interpreter without universal newline support\r
- self.assertEqual(stdout,\r
- "line1\nline2\rline3\r\nline4\r\nline5\nline6")\r
-\r
- def test_no_leaking(self):\r
- # Make sure we leak no resources\r
- if not mswindows:\r
- max_handles = 1026 # too much for most UNIX systems\r
- else:\r
- max_handles = 2050 # too much for (at least some) Windows setups\r
- handles = []\r
- try:\r
- for i in range(max_handles):\r
- try:\r
- handles.append(os.open(test_support.TESTFN,\r
- os.O_WRONLY | os.O_CREAT))\r
- except OSError as e:\r
- if e.errno != errno.EMFILE:\r
- raise\r
- break\r
- else:\r
- self.skipTest("failed to reach the file descriptor limit "\r
- "(tried %d)" % max_handles)\r
- # Close a couple of them (should be enough for a subprocess)\r
- for i in range(10):\r
- os.close(handles.pop())\r
- # Loop creating some subprocesses. If one of them leaks some fds,\r
- # the next loop iteration will fail by reaching the max fd limit.\r
- for i in range(15):\r
- p = subprocess.Popen([sys.executable, "-c",\r
- "import sys;"\r
- "sys.stdout.write(sys.stdin.read())"],\r
- stdin=subprocess.PIPE,\r
- stdout=subprocess.PIPE,\r
- stderr=subprocess.PIPE)\r
- data = p.communicate(b"lime")[0]\r
- self.assertEqual(data, b"lime")\r
- finally:\r
- for h in handles:\r
- os.close(h)\r
-\r
- def test_list2cmdline(self):\r
- self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),\r
- '"a b c" d e')\r
- self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),\r
- 'ab\\"c \\ d')\r
- self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),\r
- 'ab\\"c " \\\\" d')\r
- self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),\r
- 'a\\\\\\b "de fg" h')\r
- self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),\r
- 'a\\\\\\"b c d')\r
- self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),\r
- '"a\\\\b c" d e')\r
- self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),\r
- '"a\\\\b\\ c" d e')\r
- self.assertEqual(subprocess.list2cmdline(['ab', '']),\r
- 'ab ""')\r
-\r
-\r
- def test_poll(self):\r
- p = subprocess.Popen([sys.executable,\r
- "-c", "import time; time.sleep(1)"])\r
- count = 0\r
- while p.poll() is None:\r
- time.sleep(0.1)\r
- count += 1\r
- # We expect that the poll loop probably went around about 10 times,\r
- # but, based on system scheduling we can't control, it's possible\r
- # poll() never returned None. It "should be" very rare that it\r
- # didn't go around at least twice.\r
- self.assertGreaterEqual(count, 2)\r
- # Subsequent invocations should just return the returncode\r
- self.assertEqual(p.poll(), 0)\r
-\r
-\r
- def test_wait(self):\r
- p = subprocess.Popen([sys.executable,\r
- "-c", "import time; time.sleep(2)"])\r
- self.assertEqual(p.wait(), 0)\r
- # Subsequent invocations should just return the returncode\r
- self.assertEqual(p.wait(), 0)\r
-\r
-\r
- def test_invalid_bufsize(self):\r
- # an invalid type of the bufsize argument should raise\r
- # TypeError.\r
- with self.assertRaises(TypeError):\r
- subprocess.Popen([sys.executable, "-c", "pass"], "orange")\r
-\r
- def test_leaking_fds_on_error(self):\r
- # see bug #5179: Popen leaks file descriptors to PIPEs if\r
- # the child fails to execute; this will eventually exhaust\r
- # the maximum number of open fds. 1024 seems a very common\r
- # value for that limit, but Windows has 2048, so we loop\r
- # 1024 times (each call leaked two fds).\r
- for i in range(1024):\r
- # Windows raises IOError. Others raise OSError.\r
- with self.assertRaises(EnvironmentError) as c:\r
- subprocess.Popen(['nonexisting_i_hope'],\r
- stdout=subprocess.PIPE,\r
- stderr=subprocess.PIPE)\r
- # ignore errors that indicate the command was not found\r
- if c.exception.errno not in (errno.ENOENT, errno.EACCES):\r
- raise c.exception\r
-\r
- def test_handles_closed_on_exception(self):\r
- # If CreateProcess exits with an error, ensure the\r
- # duplicate output handles are released\r
- ifhandle, ifname = mkstemp()\r
- ofhandle, ofname = mkstemp()\r
- efhandle, efname = mkstemp()\r
- try:\r
- subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,\r
- stderr=efhandle)\r
- except OSError:\r
- os.close(ifhandle)\r
- os.remove(ifname)\r
- os.close(ofhandle)\r
- os.remove(ofname)\r
- os.close(efhandle)\r
- os.remove(efname)\r
- self.assertFalse(os.path.exists(ifname))\r
- self.assertFalse(os.path.exists(ofname))\r
- self.assertFalse(os.path.exists(efname))\r
-\r
- def test_communicate_epipe(self):\r
- # Issue 10963: communicate() should hide EPIPE\r
- p = subprocess.Popen([sys.executable, "-c", 'pass'],\r
- stdin=subprocess.PIPE,\r
- stdout=subprocess.PIPE,\r
- stderr=subprocess.PIPE)\r
- self.addCleanup(p.stdout.close)\r
- self.addCleanup(p.stderr.close)\r
- self.addCleanup(p.stdin.close)\r
- p.communicate("x" * 2**20)\r
-\r
- def test_communicate_epipe_only_stdin(self):\r
- # Issue 10963: communicate() should hide EPIPE\r
- p = subprocess.Popen([sys.executable, "-c", 'pass'],\r
- stdin=subprocess.PIPE)\r
- self.addCleanup(p.stdin.close)\r
- time.sleep(2)\r
- p.communicate("x" * 2**20)\r
-\r
-# context manager\r
-class _SuppressCoreFiles(object):\r
- """Try to prevent core files from being created."""\r
- old_limit = None\r
-\r
- def __enter__(self):\r
- """Try to save previous ulimit, then set it to (0, 0)."""\r
- try:\r
- import resource\r
- self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)\r
- resource.setrlimit(resource.RLIMIT_CORE, (0, 0))\r
- except (ImportError, ValueError, resource.error):\r
- pass\r
-\r
- if sys.platform == 'darwin':\r
- # Check if the 'Crash Reporter' on OSX was configured\r
- # in 'Developer' mode and warn that it will get triggered\r
- # when it is.\r
- #\r
- # This assumes that this context manager is used in tests\r
- # that might trigger the next manager.\r
- value = subprocess.Popen(['/usr/bin/defaults', 'read',\r
- 'com.apple.CrashReporter', 'DialogType'],\r
- stdout=subprocess.PIPE).communicate()[0]\r
- if value.strip() == b'developer':\r
- print "this tests triggers the Crash Reporter, that is intentional"\r
- sys.stdout.flush()\r
-\r
- def __exit__(self, *args):\r
- """Return core file behavior to default."""\r
- if self.old_limit is None:\r
- return\r
- try:\r
- import resource\r
- resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)\r
- except (ImportError, ValueError, resource.error):\r
- pass\r
-\r
-\r
-@unittest.skipIf(mswindows, "POSIX specific tests")\r
-class POSIXProcessTestCase(BaseTestCase):\r
-\r
- def test_exceptions(self):\r
- # caught & re-raised exceptions\r
- with self.assertRaises(OSError) as c:\r
- p = subprocess.Popen([sys.executable, "-c", ""],\r
- cwd="/this/path/does/not/exist")\r
- # The attribute child_traceback should contain "os.chdir" somewhere.\r
- self.assertIn("os.chdir", c.exception.child_traceback)\r
-\r
- def test_run_abort(self):\r
- # returncode handles signal termination\r
- with _SuppressCoreFiles():\r
- p = subprocess.Popen([sys.executable, "-c",\r
- "import os; os.abort()"])\r
- p.wait()\r
- self.assertEqual(-p.returncode, signal.SIGABRT)\r
-\r
- def test_preexec(self):\r
- # preexec function\r
- p = subprocess.Popen([sys.executable, "-c",\r
- "import sys, os;"\r
- "sys.stdout.write(os.getenv('FRUIT'))"],\r
- stdout=subprocess.PIPE,\r
- preexec_fn=lambda: os.putenv("FRUIT", "apple"))\r
- self.addCleanup(p.stdout.close)\r
- self.assertEqual(p.stdout.read(), "apple")\r
-\r
- def test_args_string(self):\r
- # args is a string\r
- f, fname = mkstemp()\r
- os.write(f, "#!/bin/sh\n")\r
- os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %\r
- sys.executable)\r
- os.close(f)\r
- os.chmod(fname, 0o700)\r
- p = subprocess.Popen(fname)\r
- p.wait()\r
- os.remove(fname)\r
- self.assertEqual(p.returncode, 47)\r
-\r
- def test_invalid_args(self):\r
- # invalid arguments should raise ValueError\r
- self.assertRaises(ValueError, subprocess.call,\r
- [sys.executable, "-c",\r
- "import sys; sys.exit(47)"],\r
- startupinfo=47)\r
- self.assertRaises(ValueError, subprocess.call,\r
- [sys.executable, "-c",\r
- "import sys; sys.exit(47)"],\r
- creationflags=47)\r
-\r
- def test_shell_sequence(self):\r
- # Run command through the shell (sequence)\r
- newenv = os.environ.copy()\r
- newenv["FRUIT"] = "apple"\r
- p = subprocess.Popen(["echo $FRUIT"], shell=1,\r
- stdout=subprocess.PIPE,\r
- env=newenv)\r
- self.addCleanup(p.stdout.close)\r
- self.assertEqual(p.stdout.read().strip(), "apple")\r
-\r
- def test_shell_string(self):\r
- # Run command through the shell (string)\r
- newenv = os.environ.copy()\r
- newenv["FRUIT"] = "apple"\r
- p = subprocess.Popen("echo $FRUIT", shell=1,\r
- stdout=subprocess.PIPE,\r
- env=newenv)\r
- self.addCleanup(p.stdout.close)\r
- self.assertEqual(p.stdout.read().strip(), "apple")\r
-\r
- def test_call_string(self):\r
- # call() function with string argument on UNIX\r
- f, fname = mkstemp()\r
- os.write(f, "#!/bin/sh\n")\r
- os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %\r
- sys.executable)\r
- os.close(f)\r
- os.chmod(fname, 0700)\r
- rc = subprocess.call(fname)\r
- os.remove(fname)\r
- self.assertEqual(rc, 47)\r
-\r
- def test_specific_shell(self):\r
- # Issue #9265: Incorrect name passed as arg[0].\r
- shells = []\r
- for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:\r
- for name in ['bash', 'ksh']:\r
- sh = os.path.join(prefix, name)\r
- if os.path.isfile(sh):\r
- shells.append(sh)\r
- if not shells: # Will probably work for any shell but csh.\r
- self.skipTest("bash or ksh required for this test")\r
- sh = '/bin/sh'\r
- if os.path.isfile(sh) and not os.path.islink(sh):\r
- # Test will fail if /bin/sh is a symlink to csh.\r
- shells.append(sh)\r
- for sh in shells:\r
- p = subprocess.Popen("echo $0", executable=sh, shell=True,\r
- stdout=subprocess.PIPE)\r
- self.addCleanup(p.stdout.close)\r
- self.assertEqual(p.stdout.read().strip(), sh)\r
-\r
- def _kill_process(self, method, *args):\r
- # Do not inherit file handles from the parent.\r
- # It should fix failures on some platforms.\r
- p = subprocess.Popen([sys.executable, "-c", """if 1:\r
- import sys, time\r
- sys.stdout.write('x\\n')\r
- sys.stdout.flush()\r
- time.sleep(30)\r
- """],\r
- close_fds=True,\r
- stdin=subprocess.PIPE,\r
- stdout=subprocess.PIPE,\r
- stderr=subprocess.PIPE)\r
- # Wait for the interpreter to be completely initialized before\r
- # sending any signal.\r
- p.stdout.read(1)\r
- getattr(p, method)(*args)\r
- return p\r
-\r
- def test_send_signal(self):\r
- p = self._kill_process('send_signal', signal.SIGINT)\r
- _, stderr = p.communicate()\r
- self.assertIn('KeyboardInterrupt', stderr)\r
- self.assertNotEqual(p.wait(), 0)\r
-\r
- def test_kill(self):\r
- p = self._kill_process('kill')\r
- _, stderr = p.communicate()\r
- self.assertStderrEqual(stderr, '')\r
- self.assertEqual(p.wait(), -signal.SIGKILL)\r
-\r
- def test_terminate(self):\r
- p = self._kill_process('terminate')\r
- _, stderr = p.communicate()\r
- self.assertStderrEqual(stderr, '')\r
- self.assertEqual(p.wait(), -signal.SIGTERM)\r
-\r
- def check_close_std_fds(self, fds):\r
- # Issue #9905: test that subprocess pipes still work properly with\r
- # some standard fds closed\r
- stdin = 0\r
- newfds = []\r
- for a in fds:\r
- b = os.dup(a)\r
- newfds.append(b)\r
- if a == 0:\r
- stdin = b\r
- try:\r
- for fd in fds:\r
- os.close(fd)\r
- out, err = subprocess.Popen([sys.executable, "-c",\r
- 'import sys;'\r
- 'sys.stdout.write("apple");'\r
- 'sys.stdout.flush();'\r
- 'sys.stderr.write("orange")'],\r
- stdin=stdin,\r
- stdout=subprocess.PIPE,\r
- stderr=subprocess.PIPE).communicate()\r
- err = test_support.strip_python_stderr(err)\r
- self.assertEqual((out, err), (b'apple', b'orange'))\r
- finally:\r
- for b, a in zip(newfds, fds):\r
- os.dup2(b, a)\r
- for b in newfds:\r
- os.close(b)\r
-\r
- def test_close_fd_0(self):\r
- self.check_close_std_fds([0])\r
-\r
- def test_close_fd_1(self):\r
- self.check_close_std_fds([1])\r
-\r
- def test_close_fd_2(self):\r
- self.check_close_std_fds([2])\r
-\r
- def test_close_fds_0_1(self):\r
- self.check_close_std_fds([0, 1])\r
-\r
- def test_close_fds_0_2(self):\r
- self.check_close_std_fds([0, 2])\r
-\r
- def test_close_fds_1_2(self):\r
- self.check_close_std_fds([1, 2])\r
-\r
- def test_close_fds_0_1_2(self):\r
- # Issue #10806: test that subprocess pipes still work properly with\r
- # all standard fds closed.\r
- self.check_close_std_fds([0, 1, 2])\r
-\r
- def test_wait_when_sigchild_ignored(self):\r
- # NOTE: sigchild_ignore.py may not be an effective test on all OSes.\r
- sigchild_ignore = test_support.findfile("sigchild_ignore.py",\r
- subdir="subprocessdata")\r
- p = subprocess.Popen([sys.executable, sigchild_ignore],\r
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)\r
- stdout, stderr = p.communicate()\r
- self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"\r
- " non-zero with this error:\n%s" % stderr)\r
-\r
-\r
-@unittest.skipUnless(mswindows, "Windows specific tests")\r
-class Win32ProcessTestCase(BaseTestCase):\r
-\r
- def test_startupinfo(self):\r
- # startupinfo argument\r
- # We uses hardcoded constants, because we do not want to\r
- # depend on win32all.\r
- STARTF_USESHOWWINDOW = 1\r
- SW_MAXIMIZE = 3\r
- startupinfo = subprocess.STARTUPINFO()\r
- startupinfo.dwFlags = STARTF_USESHOWWINDOW\r
- startupinfo.wShowWindow = SW_MAXIMIZE\r
- # Since Python is a console process, it won't be affected\r
- # by wShowWindow, but the argument should be silently\r
- # ignored\r
- subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],\r
- startupinfo=startupinfo)\r
-\r
- def test_creationflags(self):\r
- # creationflags argument\r
- CREATE_NEW_CONSOLE = 16\r
- sys.stderr.write(" a DOS box should flash briefly ...\n")\r
- subprocess.call(sys.executable +\r
- ' -c "import time; time.sleep(0.25)"',\r
- creationflags=CREATE_NEW_CONSOLE)\r
-\r
- def test_invalid_args(self):\r
- # invalid arguments should raise ValueError\r
- self.assertRaises(ValueError, subprocess.call,\r
- [sys.executable, "-c",\r
- "import sys; sys.exit(47)"],\r
- preexec_fn=lambda: 1)\r
- self.assertRaises(ValueError, subprocess.call,\r
- [sys.executable, "-c",\r
- "import sys; sys.exit(47)"],\r
- stdout=subprocess.PIPE,\r
- close_fds=True)\r
-\r
- def test_close_fds(self):\r
- # close file descriptors\r
- rc = subprocess.call([sys.executable, "-c",\r
- "import sys; sys.exit(47)"],\r
- close_fds=True)\r
- self.assertEqual(rc, 47)\r
-\r
- def test_shell_sequence(self):\r
- # Run command through the shell (sequence)\r
- newenv = os.environ.copy()\r
- newenv["FRUIT"] = "physalis"\r
- p = subprocess.Popen(["set"], shell=1,\r
- stdout=subprocess.PIPE,\r
- env=newenv)\r
- self.addCleanup(p.stdout.close)\r
- self.assertIn("physalis", p.stdout.read())\r
-\r
- def test_shell_string(self):\r
- # Run command through the shell (string)\r
- newenv = os.environ.copy()\r
- newenv["FRUIT"] = "physalis"\r
- p = subprocess.Popen("set", shell=1,\r
- stdout=subprocess.PIPE,\r
- env=newenv)\r
- self.addCleanup(p.stdout.close)\r
- self.assertIn("physalis", p.stdout.read())\r
-\r
- def test_call_string(self):\r
- # call() function with string argument on Windows\r
- rc = subprocess.call(sys.executable +\r
- ' -c "import sys; sys.exit(47)"')\r
- self.assertEqual(rc, 47)\r
-\r
- def _kill_process(self, method, *args):\r
- # Some win32 buildbot raises EOFError if stdin is inherited\r
- p = subprocess.Popen([sys.executable, "-c", """if 1:\r
- import sys, time\r
- sys.stdout.write('x\\n')\r
- sys.stdout.flush()\r
- time.sleep(30)\r
- """],\r
- stdin=subprocess.PIPE,\r
- stdout=subprocess.PIPE,\r
- stderr=subprocess.PIPE)\r
- self.addCleanup(p.stdout.close)\r
- self.addCleanup(p.stderr.close)\r
- self.addCleanup(p.stdin.close)\r
- # Wait for the interpreter to be completely initialized before\r
- # sending any signal.\r
- p.stdout.read(1)\r
- getattr(p, method)(*args)\r
- _, stderr = p.communicate()\r
- self.assertStderrEqual(stderr, '')\r
- returncode = p.wait()\r
- self.assertNotEqual(returncode, 0)\r
-\r
- def test_send_signal(self):\r
- self._kill_process('send_signal', signal.SIGTERM)\r
-\r
- def test_kill(self):\r
- self._kill_process('kill')\r
-\r
- def test_terminate(self):\r
- self._kill_process('terminate')\r
-\r
-\r
-@unittest.skipUnless(getattr(subprocess, '_has_poll', False),\r
- "poll system call not supported")\r
-class ProcessTestCaseNoPoll(ProcessTestCase):\r
- def setUp(self):\r
- subprocess._has_poll = False\r
- ProcessTestCase.setUp(self)\r
-\r
- def tearDown(self):\r
- subprocess._has_poll = True\r
- ProcessTestCase.tearDown(self)\r
-\r
-\r
-class HelperFunctionTests(unittest.TestCase):\r
- @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")\r
- def test_eintr_retry_call(self):\r
- record_calls = []\r
- def fake_os_func(*args):\r
- record_calls.append(args)\r
- if len(record_calls) == 2:\r
- raise OSError(errno.EINTR, "fake interrupted system call")\r
- return tuple(reversed(args))\r
-\r
- self.assertEqual((999, 256),\r
- subprocess._eintr_retry_call(fake_os_func, 256, 999))\r
- self.assertEqual([(256, 999)], record_calls)\r
- # This time there will be an EINTR so it will loop once.\r
- self.assertEqual((666,),\r
- subprocess._eintr_retry_call(fake_os_func, 666))\r
- self.assertEqual([(256, 999), (666,), (666,)], record_calls)\r
-\r
-\r
-@unittest.skipUnless(mswindows, "mswindows only")\r
-class CommandsWithSpaces (BaseTestCase):\r
-\r
- def setUp(self):\r
- super(CommandsWithSpaces, self).setUp()\r
- f, fname = mkstemp(".py", "te st")\r
- self.fname = fname.lower ()\r
- os.write(f, b"import sys;"\r
- b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"\r
- )\r
- os.close(f)\r
-\r
- def tearDown(self):\r
- os.remove(self.fname)\r
- super(CommandsWithSpaces, self).tearDown()\r
-\r
- def with_spaces(self, *args, **kwargs):\r
- kwargs['stdout'] = subprocess.PIPE\r
- p = subprocess.Popen(*args, **kwargs)\r
- self.addCleanup(p.stdout.close)\r
- self.assertEqual(\r
- p.stdout.read ().decode("mbcs"),\r
- "2 [%r, 'ab cd']" % self.fname\r
- )\r
-\r
- def test_shell_string_with_spaces(self):\r
- # call() function with string argument with spaces on Windows\r
- self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,\r
- "ab cd"), shell=1)\r
-\r
- def test_shell_sequence_with_spaces(self):\r
- # call() function with sequence argument with spaces on Windows\r
- self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)\r
-\r
- def test_noshell_string_with_spaces(self):\r
- # call() function with string argument with spaces on Windows\r
- self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,\r
- "ab cd"))\r
-\r
- def test_noshell_sequence_with_spaces(self):\r
- # call() function with sequence argument with spaces on Windows\r
- self.with_spaces([sys.executable, self.fname, "ab cd"])\r
-\r
-def test_main():\r
- unit_tests = (ProcessTestCase,\r
- POSIXProcessTestCase,\r
- Win32ProcessTestCase,\r
- ProcessTestCaseNoPoll,\r
- HelperFunctionTests,\r
- CommandsWithSpaces)\r
-\r
- test_support.run_unittest(*unit_tests)\r
- test_support.reap_children()\r
-\r
-if __name__ == "__main__":\r
- test_main()\r